Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/jami_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #include "conversationrepository.h"
25 : #ifdef ENABLE_PLUGIN
26 : #include "manager.h"
27 : #include "plugin/jamipluginmanager.h"
28 : #include "plugin/streamdata.h"
29 : #endif
30 : #include "jami/conversation_interface.h"
31 : #include "jami/configurationmanager_interface.h"
32 :
33 : #include "fileutils.h"
34 : #include "json_utils.h"
35 : #include "logger.h"
36 : #include "presence_manager.h"
37 :
38 : #include <opendht/thread_pool.h>
39 : #include <opendht/infohash.h>
40 : #include <fmt/compile.h>
41 : #include <asio/error_code.hpp>
42 :
43 : #include <algorithm>
44 : #include <memory>
45 : #include <charconv>
46 : #include <string_view>
47 : #include <tuple>
48 : #include <optional>
49 : #include <utility>
50 : #include <deque>
51 : #include <chrono>
52 : #include <ctime>
53 : #include <functional>
54 : #include <atomic>
55 : #include <regex>
56 :
57 : namespace jami {
58 :
59 : static const char* const LAST_MODIFIED = "lastModified";
60 :
61 13 : ConvInfo::ConvInfo(const Json::Value& json)
62 : {
63 13 : id = json[ConversationMapKeys::ID].asString();
64 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
65 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
66 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
67 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
68 20 : members.emplace(v["uri"].asString());
69 : }
70 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
71 13 : }
72 :
73 : Json::Value
74 25 : ConvInfo::toJson() const
75 : {
76 25 : Json::Value json;
77 25 : json[ConversationMapKeys::ID] = id;
78 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
79 25 : if (removed) {
80 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
81 : }
82 25 : if (erased) {
83 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
84 : }
85 64 : for (const auto& m : members) {
86 39 : Json::Value member;
87 39 : member["uri"] = m;
88 39 : json[ConversationMapKeys::MEMBERS].append(member);
89 39 : }
90 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
91 25 : return json;
92 0 : }
93 :
94 : // ConversationRequest
95 265 : ConversationRequest::ConversationRequest(const Json::Value& json)
96 : {
97 265 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
98 265 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
99 265 : from = json[ConversationMapKeys::FROM].asString();
100 265 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
101 265 : auto& md = json[ConversationMapKeys::METADATAS];
102 532 : for (const auto& member : md.getMemberNames()) {
103 267 : metadatas.emplace(member, md[member].asString());
104 265 : }
105 265 : }
106 :
107 : Json::Value
108 2 : ConversationRequest::toJson() const
109 : {
110 2 : Json::Value json;
111 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
112 2 : json[ConversationMapKeys::FROM] = from;
113 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
114 2 : if (declined)
115 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
116 4 : for (const auto& [key, value] : metadatas) {
117 2 : json[ConversationMapKeys::METADATAS][key] = value;
118 : }
119 2 : return json;
120 0 : }
121 :
122 : std::map<std::string, std::string>
123 241 : ConversationRequest::toMap() const
124 : {
125 241 : auto result = metadatas;
126 241 : result[ConversationMapKeys::ID] = conversationId;
127 241 : result[ConversationMapKeys::FROM] = from;
128 241 : if (declined)
129 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
130 241 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
131 241 : return result;
132 0 : }
133 :
134 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
135 :
136 : struct History
137 : {
138 : // While loading the history, we need to avoid:
139 : // - reloading history (can just be ignored)
140 : // - adding new commits (should wait for history to be loaded)
141 : std::mutex mutex {};
142 : std::condition_variable cv {};
143 : bool loading {false};
144 : MessageList messageList {};
145 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
146 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
147 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
148 : };
149 :
150 : class Conversation::Impl
151 : {
152 : private:
153 396 : Impl(std::unique_ptr<ConversationRepository>&& repository,
154 : const std::shared_ptr<JamiAccount>& account,
155 : std::vector<ConversationCommit>&& commits = {})
156 396 : : repository_(std::move(repository ? repository : throw std::logic_error("Invalid repository")))
157 391 : , account_(account)
158 391 : , accountId_(account->getAccountID())
159 391 : , userId_(account->getUsername())
160 391 : , deviceId_(account->currentDeviceId())
161 391 : , swarmManager_(std::make_shared<SwarmManager>(NodeId(deviceId_),
162 782 : Manager::instance().getSeededRandomEngine(),
163 391 : [account = account_](const DeviceId& deviceId) {
164 351 : if (auto acc = account.lock()) {
165 352 : return acc->isConnectedWith(deviceId);
166 352 : }
167 0 : return false;
168 : }))
169 391 : , transferManager_(std::make_shared<TransferManager>(accountId_,
170 : "",
171 391 : repository_->id(),
172 391 : Manager::instance().getSeededRandomEngine()))
173 391 : , repoPath_(fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id())
174 391 : , conversationDataPath_(fileutils::get_data_dir() / accountId_ / "conversation_data" / repository_->id())
175 391 : , fetchedPath_(conversationDataPath_ / ConversationDirectories::FETCHED)
176 391 : , sendingPath_(conversationDataPath_ / ConversationDirectories::SENDING)
177 391 : , preferencesPath_(conversationDataPath_ / ConversationDirectories::PREFERENCES)
178 391 : , statusPath_(conversationDataPath_ / ConversationDirectories::STATUS)
179 391 : , hostedCallsPath_(conversationDataPath_ / ConversationDirectories::HOSTED_CALLS)
180 391 : , activeCallsPath_(conversationDataPath_ / ConversationDirectories::ACTIVE_CALLS)
181 391 : , ioContext_(Manager::instance().ioContext())
182 2351 : , typers_(std::make_shared<Typers>(account, repository_->id()))
183 : {
184 391 : if (!commits.empty())
185 187 : initActiveCalls(repository_->convCommitsToMap(commits));
186 391 : loadStatus();
187 391 : setupMemberCallback();
188 406 : }
189 :
190 192 : Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&& repoAndCommits,
191 : const std::shared_ptr<JamiAccount>& account)
192 192 : : Impl(std::move(repoAndCommits.first), account, std::move(repoAndCommits.second))
193 187 : {}
194 :
195 : public:
196 186 : Impl(const std::shared_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "")
197 186 : : Impl(ConversationRepository::createConversation(account, mode, otherMember), account)
198 186 : {}
199 :
200 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
201 18 : : Impl(std::make_unique<ConversationRepository>(account, conversationId), account)
202 18 : {}
203 :
204 193 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& remoteDevice, const std::string& conversationId)
205 193 : : Impl(ConversationRepository::cloneConversation(account, remoteDevice, conversationId), account)
206 187 : {}
207 :
208 2906 : std::string toString() const
209 : {
210 11626 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
211 : }
212 :
213 0 : std::vector<std::map<std::string, std::string>> getConnectivity() const
214 : {
215 0 : return swarmManager_->getRoutingTableInfo();
216 : }
217 :
218 : mutable std::string fmtStr_;
219 :
220 391 : ~Impl() { stopTracking(); }
221 :
222 : /**
223 : * If, for whatever reason, the daemon is stopped while hosting a conference,
224 : * we need to announce the end of this call when restarting.
225 : * To avoid to keep active calls forever.
226 : */
227 : std::vector<std::string> commitsEndedCalls();
228 : bool isAdmin() const;
229 :
230 284 : void announce(const std::string& commitId, bool commitFromSelf = false)
231 : {
232 284 : std::vector<std::string> vec;
233 284 : if (!commitId.empty())
234 282 : vec.emplace_back(commitId);
235 284 : announce(vec, commitFromSelf);
236 284 : }
237 :
238 299 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
239 : {
240 299 : std::vector<ConversationCommit> convcommits;
241 299 : convcommits.reserve(commits.size());
242 611 : for (const auto& cid : commits) {
243 312 : if (auto commit = repository_->getCommit(cid)) {
244 312 : convcommits.emplace_back(*commit);
245 312 : }
246 : }
247 299 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
248 299 : }
249 :
250 : /**
251 : * Initialize activeCalls_ from the list of commits in the repository
252 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
253 : */
254 187 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
255 : {
256 187 : std::unordered_set<std::string> invalidHostUris;
257 187 : std::unordered_set<std::string> invalidCallIds;
258 :
259 187 : std::lock_guard lk(activeCallsMtx_);
260 1178 : for (const auto& commit : commits) {
261 991 : if (commit.at("type") == "member") {
262 : // Each commit of type "member" has an "action" field whose value can be one
263 : // of the following: "add", "join", "remove", "ban", "unban"
264 : // In the case of "remove" and "ban", we need to add the member's URI to
265 : // invalidHostUris to ensure that any call they may have started in the past
266 : // is no longer considered active.
267 : // For the other actions, there's no harm in adding the member's URI anyway,
268 : // since it's not possible to start hosting a call before joining the swarm (or
269 : // before getting unbanned in the case of previously banned members).
270 779 : invalidHostUris.emplace(commit.at("uri"));
271 425 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
272 425 : && commit.find("device") != commit.end()) {
273 : // The commit indicates either the end or the beginning of a call
274 : // (depending on whether there is a "duration" field or not).
275 1 : auto convId = repository_->id();
276 2 : auto confId = commit.at("confId");
277 2 : auto uri = commit.at("uri");
278 2 : auto device = commit.at("device");
279 :
280 3 : if (commit.find("duration") == commit.end() && invalidCallIds.find(confId) == invalidCallIds.end()
281 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
282 1 : std::map<std::string, std::string> activeCall;
283 1 : activeCall["id"] = confId;
284 1 : activeCall["uri"] = uri;
285 1 : activeCall["device"] = device;
286 1 : activeCalls_.emplace_back(activeCall);
287 1 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
288 : convId,
289 : confId,
290 : device,
291 : uri);
292 1 : }
293 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
294 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
295 : // there are sometimes multiple commits indicating the beginning of the same call.)
296 1 : invalidCallIds.emplace(confId);
297 1 : }
298 : }
299 187 : saveActiveCalls();
300 187 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_, repository_->id(), activeCalls_);
301 187 : }
302 :
303 : /**
304 : * Update activeCalls_ via announced commits (in load or via new commits)
305 : * @param commit Commit to check
306 : * @param eraseOnly If we want to ignore added commits
307 : * @param emitSig If we want to emit to client
308 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
309 : */
310 78 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
311 : bool eraseOnly = false,
312 : bool emitSig = true) const
313 : {
314 78 : if (!repository_)
315 0 : return;
316 78 : if (commit.at("type") == "member") {
317 : // In this case, we need to check if we are not removing a hosting member or device
318 19 : std::lock_guard lk(activeCallsMtx_);
319 19 : auto it = activeCalls_.begin();
320 19 : auto updateActives = false;
321 20 : while (it != activeCalls_.end()) {
322 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
323 4 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left", it->at("id"), commit.at("uri"));
324 1 : it = activeCalls_.erase(it);
325 1 : updateActives = true;
326 : } else {
327 0 : ++it;
328 : }
329 : }
330 19 : if (updateActives) {
331 1 : saveActiveCalls();
332 1 : if (emitSig)
333 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
334 1 : repository_->id(),
335 1 : activeCalls_);
336 : }
337 19 : return;
338 19 : }
339 : // Else, it's a call information
340 174 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
341 174 : && commit.find("device") != commit.end()) {
342 56 : auto convId = repository_->id();
343 112 : auto confId = commit.at("confId");
344 112 : auto uri = commit.at("uri");
345 112 : auto device = commit.at("device");
346 56 : std::lock_guard lk(activeCallsMtx_);
347 56 : auto itActive = std::find_if(activeCalls_.begin(), activeCalls_.end(), [&](const auto& value) {
348 25 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
349 : });
350 56 : if (commit.find("duration") == commit.end()) {
351 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
352 124 : JAMI_DEBUG("swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
353 : convId,
354 : confId,
355 : device,
356 : uri);
357 155 : activeCalls_.emplace_back(std::map<std::string, std::string> {
358 : {"id", confId},
359 : {"uri", uri},
360 : {"device", device},
361 124 : });
362 31 : saveActiveCalls();
363 31 : if (emitSig)
364 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
365 31 : repository_->id(),
366 31 : activeCalls_);
367 : }
368 : } else {
369 25 : if (itActive != activeCalls_.end()) {
370 25 : itActive = activeCalls_.erase(itActive);
371 : // Unlikely, but we must ensure that no duplicate exists
372 25 : while (itActive != activeCalls_.end()) {
373 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
374 0 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
375 : });
376 0 : if (itActive != activeCalls_.end()) {
377 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
378 0 : itActive = activeCalls_.erase(itActive);
379 : }
380 : }
381 :
382 25 : if (eraseOnly) {
383 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
384 : "{:s}, account {:s}",
385 : convId,
386 : confId,
387 : device,
388 : uri);
389 : } else {
390 100 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
391 : convId,
392 : confId,
393 : device,
394 : uri);
395 : }
396 : }
397 25 : saveActiveCalls();
398 25 : if (emitSig)
399 25 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
400 25 : repository_->id(),
401 25 : activeCalls_);
402 : }
403 56 : }
404 : }
405 :
406 1205 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
407 : {
408 1205 : if (!repository_)
409 0 : return;
410 1204 : auto convId = repository_->id();
411 1203 : auto ok = !commits.empty();
412 3610 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
413 1204 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
414 1205 : if (ok) {
415 1203 : bool announceMember = false;
416 2443 : for (const auto& c : commits) {
417 : // Announce member events
418 1240 : if (c.at("type") == "member") {
419 921 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
420 921 : const auto& uri = c.at("uri");
421 921 : const auto& actionStr = c.at("action");
422 921 : auto action = -1;
423 921 : if (actionStr == "add")
424 442 : action = 0;
425 479 : else if (actionStr == "join")
426 458 : action = 1;
427 21 : else if (actionStr == "remove")
428 1 : action = 2;
429 20 : else if (actionStr == "ban")
430 18 : action = 3;
431 2 : else if (actionStr == "unban")
432 2 : action = 4;
433 921 : if (actionStr == "ban" || actionStr == "remove") {
434 : // In this case, a potential host was removed during a call.
435 19 : updateActiveCalls(c);
436 19 : typers_->removeTyper(uri);
437 : }
438 921 : if (action != -1) {
439 921 : announceMember = true;
440 921 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(accountId_,
441 : convId,
442 : uri,
443 : action);
444 : }
445 : }
446 319 : } else if (c.at("type") == "application/call-history+json") {
447 59 : updateActiveCalls(c);
448 : }
449 : #ifdef ENABLE_PLUGIN
450 1239 : auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager();
451 1240 : if (pluginChatManager.hasHandlers()) {
452 0 : auto cm = std::make_shared<JamiMessage>(accountId_, convId, c.at("author") != userId_, c, false);
453 0 : cm->isSwarm = true;
454 0 : pluginChatManager.publishMessage(std::move(cm));
455 0 : }
456 : #endif
457 : }
458 :
459 1203 : if (announceMember && onMembersChanged_) {
460 921 : onMembersChanged_(repository_->memberUris("", {}));
461 : }
462 : }
463 1205 : }
464 :
465 391 : void loadStatus()
466 : {
467 : try {
468 : // read file
469 780 : auto file = fileutils::loadFile(statusPath_);
470 : // load values
471 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
472 2 : std::lock_guard lk {messageStatusMtx_};
473 2 : oh.get().convert(messagesStatus_);
474 391 : } catch (const std::exception& e) {
475 389 : }
476 391 : }
477 1114 : void saveStatus()
478 : {
479 1114 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
480 1114 : msgpack::pack(file, messagesStatus_);
481 1114 : }
482 :
483 18 : void loadActiveCalls() const
484 : {
485 : try {
486 : // read file
487 29 : auto file = fileutils::loadFile(activeCallsPath_);
488 : // load values
489 7 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
490 7 : std::lock_guard lk {activeCallsMtx_};
491 7 : oh.get().convert(activeCalls_);
492 18 : } catch (const std::exception& e) {
493 11 : return;
494 11 : }
495 : }
496 :
497 262 : void saveActiveCalls() const
498 : {
499 262 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
500 262 : msgpack::pack(file, activeCalls_);
501 262 : }
502 :
503 18 : void loadHostedCalls() const
504 : {
505 : try {
506 : // read file
507 30 : auto file = fileutils::loadFile(hostedCallsPath_);
508 : // load values
509 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
510 6 : std::lock_guard lk {activeCallsMtx_};
511 6 : oh.get().convert(hostedCalls_);
512 18 : } catch (const std::exception& e) {
513 12 : return;
514 12 : }
515 : }
516 :
517 42 : void saveHostedCalls() const
518 : {
519 42 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
520 42 : msgpack::pack(file, hostedCalls_);
521 42 : }
522 :
523 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
524 :
525 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
526 : bool includeLeft,
527 : bool includeBanned) const;
528 :
529 : std::vector<std::map<std::string, std::string>> getTrackedMembers() const;
530 :
531 5101 : std::string_view bannedType(const std::string& uri) const
532 : {
533 5101 : auto crt = fmt::format("{}.crt", uri);
534 10203 : auto bannedMember = repoPath_ / MemberPath::BANNED / MemberPath::MEMBERS / crt;
535 5102 : if (std::filesystem::is_regular_file(bannedMember))
536 11 : return "members"sv;
537 10180 : auto bannedAdmin = repoPath_ / MemberPath::BANNED / MemberPath::ADMINS / crt;
538 5088 : if (std::filesystem::is_regular_file(bannedAdmin))
539 0 : return "admins"sv;
540 10182 : auto bannedInvited = repoPath_ / MemberPath::BANNED / MemberPath::INVITED / uri;
541 5091 : if (std::filesystem::is_regular_file(bannedInvited))
542 2 : return "invited"sv;
543 10175 : auto bannedDevice = repoPath_ / MemberPath::BANNED / MemberPath::DEVICES / crt;
544 5085 : if (std::filesystem::is_regular_file(bannedDevice))
545 0 : return "devices"sv;
546 5089 : return {};
547 5102 : }
548 :
549 4462 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
550 : {
551 4462 : auto deviceSockets = gitSocketList_.find(deviceId);
552 8926 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
553 : }
554 :
555 2037 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
556 : {
557 2037 : gitSocketList_[deviceId] = socket;
558 2036 : }
559 813 : void removeGitSocket(const DeviceId& deviceId)
560 : {
561 813 : auto deviceSockets = gitSocketList_.find(deviceId);
562 813 : if (deviceSockets != gitSocketList_.end())
563 410 : gitSocketList_.erase(deviceSockets);
564 813 : }
565 :
566 : /**
567 : * Remove all git sockets and all DRT nodes associated with the given peer.
568 : * This is used when a swarm member is banned to ensure that we stop syncing
569 : * with them or sending them message notifications.
570 : */
571 : void disconnectFromPeer(const std::string& peerUri);
572 :
573 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, bool includeLeft) const;
574 :
575 : std::function<void()> bootstrapCb_;
576 : std::mutex bootstrapMtx_ {};
577 : #ifdef LIBJAMI_TEST
578 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
579 : #endif
580 :
581 : std::mutex writeMtx_ {};
582 : const std::unique_ptr<ConversationRepository> repository_;
583 : const std::weak_ptr<JamiAccount> account_;
584 : const std::string accountId_;
585 : const std::string userId_;
586 : const std::string deviceId_;
587 : const std::shared_ptr<SwarmManager> swarmManager_;
588 : std::atomic_bool isRemoving_ {false};
589 : std::vector<libjami::SwarmMessage> loadMessages(const LogOptions& options, History* optHistory = nullptr);
590 : void pull(const std::string& deviceId);
591 :
592 : // Avoid multiple fetch/merges at the same time.
593 : std::mutex pullcbsMtx_ {};
594 : // store current remote in fetch
595 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {};
596 : const std::shared_ptr<TransferManager> transferManager_ {};
597 : const std::filesystem::path repoPath_ {};
598 : const std::filesystem::path conversationDataPath_ {};
599 : const std::filesystem::path fetchedPath_ {};
600 :
601 : // Manage last message displayed and status
602 : const std::filesystem::path sendingPath_ {};
603 : const std::filesystem::path preferencesPath_ {};
604 : const std::filesystem::path statusPath_ {};
605 :
606 : OnMembersChanged onMembersChanged_ {};
607 : struct TrackedMember
608 : {
609 : std::set<DeviceId> devices;
610 : std::set<DeviceId> failedDevices;
611 : };
612 : std::map<std::string, TrackedMember> trackedMembers_;
613 : mutable std::mutex trackedMembersMtx_;
614 : bool isTracking_ {false};
615 : void setupMemberCallback();
616 : void startTracking(std::weak_ptr<Conversation> w);
617 : void stopTracking();
618 : void rotateTrackedMembers(const std::string& memberUri = "", const DeviceId& deviceId = {});
619 : void monitorConnection(std::weak_ptr<Conversation> w);
620 : void onConnectionFailed(const DeviceId& deviceId, const std::string& memberUri = "");
621 :
622 : uint64_t presenceDeviceListenerToken_ {0};
623 :
624 : // Manage hosted calls on this device
625 : std::filesystem::path hostedCallsPath_ {};
626 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
627 : // Manage active calls for this conversation (can be hosted by other devices)
628 : std::filesystem::path activeCallsPath_ {};
629 : mutable std::mutex activeCallsMtx_ {};
630 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
631 :
632 : GitSocketList gitSocketList_ {};
633 :
634 : // Bootstrap
635 : const std::shared_ptr<asio::io_context> ioContext_;
636 :
637 : /**
638 : * Loaded history represents the linearized history to show for clients
639 : */
640 : History loadedHistory_ {};
641 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
642 : History& history,
643 : const std::vector<std::map<std::string, std::string>>& commits,
644 : bool messageReceived = false,
645 : bool commitFromSelf = false);
646 :
647 : void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
648 : void handleEdition(History& history,
649 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
650 : bool messageReceived) const;
651 : bool handleMessage(History& history,
652 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
653 : bool messageReceived) const;
654 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const;
655 : /**
656 : * {uri, {
657 : * {"fetch", "commitId"},
658 : * {"fetched_ts", "timestamp"},
659 : * {"read", "commitId"},
660 : * {"read_ts", "timestamp"}
661 : * }
662 : * }
663 : */
664 : mutable std::mutex messageStatusMtx_;
665 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
666 : std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
667 : /**
668 : * Status: 0 = commited, 1 = fetched, 2 = read
669 : * This cache the curent status to add in the messages
670 : */
671 : // Note: only store int32_t cause it's easy to pass to dbus this way
672 : // memberToStatus serves as a cache for loading messages
673 : std::map<std::string, int32_t> memberToStatus;
674 :
675 : // futureStatus is used to store the status for receiving messages
676 : // (because we're not sure to fetch the commit before receiving a status change for this)
677 : std::map<std::string, std::map<std::string, int32_t>> futureStatus;
678 : // Update internal structures regarding status
679 : void updateStatus(const std::string& uri,
680 : libjami::Account::MessageStates status,
681 : const std::string& commitId,
682 : const std::string& ts,
683 : bool emit = false);
684 :
685 : std::shared_ptr<Typers> typers_;
686 : };
687 :
688 : void
689 391 : Conversation::Impl::setupMemberCallback()
690 : {
691 391 : repository_->onMembersChanged([this](const std::set<std::string>& memberUris) {
692 : {
693 1083 : std::lock_guard lk(trackedMembersMtx_);
694 1083 : if (isTracking_) {
695 239 : if (auto acc = account_.lock()) {
696 521 : for (auto it = trackedMembers_.begin(); it != trackedMembers_.end();) {
697 282 : if (memberUris.find(it->first) == memberUris.end()) {
698 6 : acc->presenceManager()->untrackBuddy(it->first);
699 6 : it = trackedMembers_.erase(it);
700 : } else {
701 276 : ++it;
702 : }
703 : }
704 239 : }
705 : }
706 1083 : }
707 :
708 1083 : if (onMembersChanged_)
709 1083 : onMembersChanged_(memberUris);
710 1083 : });
711 391 : }
712 :
713 : void
714 551 : Conversation::Impl::startTracking(std::weak_ptr<Conversation> w)
715 : {
716 551 : auto acc = account_.lock();
717 551 : if (!acc)
718 0 : return;
719 :
720 : {
721 551 : std::lock_guard lk(trackedMembersMtx_);
722 551 : if (isTracking_)
723 150 : return;
724 401 : isTracking_ = true;
725 401 : presenceDeviceListenerToken_ = acc->presenceManager()->addDeviceListener(
726 722 : [w](const std::string& uri, const DeviceId& deviceId, bool online) {
727 722 : if (auto sthis = w.lock()) {
728 722 : if (online && sthis->isMember(uri)) {
729 569 : sthis->addKnownDevices({deviceId}, uri);
730 : }
731 722 : }
732 722 : });
733 551 : }
734 :
735 401 : rotateTrackedMembers();
736 551 : }
737 :
738 : void
739 611 : Conversation::Impl::stopTracking()
740 : {
741 : // Collect data under trackedMembersMtx_, then release it before calling
742 : // into PresenceManager to avoid lock-ordering inversion with the
743 : // PresenceManager mutex (which is held when notifyListeners calls
744 : // addKnownDevices, which in turn acquires trackedMembersMtx_).
745 611 : uint64_t token = 0;
746 611 : std::vector<std::string> urisToUntrack;
747 : {
748 611 : std::lock_guard lk(trackedMembersMtx_);
749 611 : if (!isTracking_)
750 210 : return;
751 401 : isTracking_ = false;
752 401 : token = presenceDeviceListenerToken_;
753 401 : presenceDeviceListenerToken_ = 0;
754 1161 : for (const auto& [uri, _] : trackedMembers_) {
755 760 : urisToUntrack.push_back(uri);
756 : }
757 401 : trackedMembers_.clear();
758 611 : }
759 :
760 401 : auto acc = account_.lock();
761 401 : if (!acc)
762 210 : return;
763 :
764 191 : if (token) {
765 191 : acc->presenceManager()->removeDeviceListener(token);
766 : }
767 :
768 635 : for (const auto& uri : urisToUntrack) {
769 444 : acc->presenceManager()->untrackBuddy(uri);
770 : }
771 821 : }
772 :
773 : void
774 678 : Conversation::Impl::rotateTrackedMembers(const std::string& memberUri, const DeviceId& deviceId)
775 : {
776 678 : auto acc = account_.lock();
777 678 : if (!acc)
778 0 : return;
779 :
780 678 : std::lock_guard lk(trackedMembersMtx_);
781 678 : if (!isTracking_)
782 206 : return;
783 :
784 472 : if (!memberUri.empty()) {
785 71 : if (auto it = trackedMembers_.find(memberUri); it != trackedMembers_.end()) {
786 284 : JAMI_WARNING("{} [device {}] Rotating tracked members after connection failure", toString(), deviceId);
787 71 : auto& info = it->second;
788 71 : info.failedDevices.insert(deviceId);
789 71 : if (std::includes(info.failedDevices.begin(),
790 : info.failedDevices.end(),
791 : info.devices.begin(),
792 : info.devices.end())) {
793 32 : acc->presenceManager()->untrackBuddy(it->first);
794 32 : trackedMembers_.erase(it);
795 : }
796 : } else {
797 0 : return;
798 : }
799 : }
800 :
801 472 : auto members = repository_->members();
802 472 : size_t N = members.size();
803 472 : size_t K = std::min(N, 3 + (size_t) std::log2(N));
804 :
805 : // If we are already trying enough devices, don't rotate
806 472 : auto activeDevices = swarmManager_->getActiveNodesCount();
807 472 : if (activeDevices >= 2 * K)
808 0 : return;
809 :
810 1888 : JAMI_WARNING("{} Refreshing tracked members: {}/{} active ({}/{} devices)",
811 : toString(),
812 : trackedMembers_.size(),
813 : K,
814 : activeDevices,
815 : 2 * K);
816 :
817 : // Add new members if we have space
818 472 : if (trackedMembers_.size() < K) {
819 434 : std::vector<std::string> candidates;
820 434 : candidates.reserve(N - trackedMembers_.size());
821 1499 : for (const auto& m : members) {
822 1065 : if (m.uri != memberUri && trackedMembers_.find(m.uri) == trackedMembers_.end()) {
823 999 : candidates.push_back(m.uri);
824 : }
825 : }
826 434 : if (!candidates.empty()) {
827 405 : std::vector<std::string> chosen;
828 405 : std::sample(candidates.begin(),
829 : candidates.end(),
830 : std::back_inserter(chosen),
831 405 : K - trackedMembers_.size(),
832 405 : Manager::instance().getSeededRandomEngine());
833 1203 : for (const auto& uri : chosen) {
834 798 : acc->presenceManager()->trackBuddy(uri);
835 798 : trackedMembers_.emplace(uri, TrackedMember {});
836 : }
837 405 : }
838 434 : }
839 884 : }
840 :
841 : void
842 277 : Conversation::Impl::onConnectionFailed(const DeviceId& deviceId, const std::string& memberUri)
843 : {
844 277 : rotateTrackedMembers(memberUri, deviceId);
845 277 : }
846 :
847 : void
848 514 : Conversation::Impl::monitorConnection(std::weak_ptr<Conversation> w)
849 : {
850 514 : if (!swarmManager_->isConnected()) {
851 505 : startTracking(w);
852 : }
853 :
854 514 : swarmManager_->onConnectionChanged([w](bool ok) {
855 266 : dht::ThreadPool::io().run([w, ok] {
856 266 : if (auto sthis = w.lock()) {
857 : // Check the current connection state rather than relying on the
858 : // captured `ok` value, because the thread pool does not guarantee
859 : // execution order. Two rapid state changes (connected then
860 : // disconnected) could otherwise cause stopTracking to run after
861 : // startTracking, leaving the conversation untracked while the DRT
862 : // has no connected nodes.
863 266 : if (sthis->pimpl_->swarmManager_->isConnected()) {
864 220 : sthis->pimpl_->stopTracking();
865 : } else {
866 46 : sthis->pimpl_->startTracking(w);
867 : }
868 266 : if (ok) {
869 219 : if (sthis->pimpl_->bootstrapCb_)
870 219 : sthis->pimpl_->bootstrapCb_();
871 : }
872 : #ifdef LIBJAMI_TEST
873 266 : if (sthis->pimpl_->bootstrapCbTest_)
874 13 : sthis->pimpl_->bootstrapCbTest_(sthis->id(),
875 : ok ? BootstrapStatus::SUCCESS : BootstrapStatus::FAILED);
876 : #endif
877 266 : }
878 266 : });
879 266 : });
880 514 : }
881 :
882 : bool
883 20 : Conversation::Impl::isAdmin() const
884 : {
885 20 : auto adminsPath = repoPath_ / MemberPath::ADMINS;
886 40 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
887 20 : }
888 :
889 : void
890 18 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
891 : {
892 : // Remove nodes from swarmManager
893 18 : const auto nodes = swarmManager_->getAllNodes();
894 18 : std::vector<NodeId> toRemove;
895 22 : for (const auto node : nodes)
896 4 : if (peerUri == repository_->uriFromDevice(node.toString()))
897 2 : toRemove.emplace_back(node);
898 18 : swarmManager_->deleteNode(toRemove);
899 :
900 : // Remove git sockets with this member
901 39 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
902 21 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
903 11 : it = gitSocketList_.erase(it);
904 : else
905 10 : ++it;
906 : }
907 18 : }
908 :
909 : std::vector<std::map<std::string, std::string>>
910 101 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
911 : {
912 101 : std::vector<std::map<std::string, std::string>> result;
913 101 : auto members = repository_->members();
914 101 : std::lock_guard lk(messageStatusMtx_);
915 304 : for (const auto& member : members) {
916 203 : if (member.role == MemberRole::BANNED && !includeBanned) {
917 0 : continue;
918 : }
919 203 : if (member.role == MemberRole::INVITED && !includeInvited)
920 0 : continue;
921 203 : if (member.role == MemberRole::LEFT && !includeLeft)
922 0 : continue;
923 203 : auto mm = member.map();
924 203 : auto it = messagesStatus_.find(member.uri);
925 203 : if (it != messagesStatus_.end()) {
926 139 : auto readIt = it->second.find("read");
927 139 : if (readIt != it->second.end())
928 89 : mm[ConversationMapKeys::LAST_DISPLAYED] = readIt->second;
929 : }
930 203 : result.emplace_back(std::move(mm));
931 203 : }
932 202 : return result;
933 101 : }
934 :
935 : std::vector<std::map<std::string, std::string>>
936 0 : Conversation::Impl::getTrackedMembers() const
937 : {
938 0 : std::vector<std::map<std::string, std::string>> result;
939 0 : std::lock_guard lk(trackedMembersMtx_);
940 0 : for (const auto& [uri, member] : trackedMembers_) {
941 0 : std::map<std::string, std::string> map;
942 0 : map["uri"] = uri;
943 0 : std::string devicesStr;
944 0 : for (const auto& dev : member.devices) {
945 0 : if (!devicesStr.empty())
946 0 : devicesStr += ";";
947 0 : devicesStr += dev.toString();
948 : }
949 0 : map["devices"] = devicesStr;
950 0 : result.emplace_back(std::move(map));
951 0 : }
952 0 : return result;
953 0 : }
954 :
955 : std::vector<std::string>
956 18 : Conversation::Impl::commitsEndedCalls()
957 : {
958 : // Handle current calls
959 18 : std::vector<std::string> commits {};
960 18 : std::unique_lock lk(writeMtx_);
961 18 : std::unique_lock lkA(activeCallsMtx_);
962 18 : for (const auto& hostedCall : hostedCalls_) {
963 : // In this case, this means that we left
964 : // the conference while still hosting it, so activeCalls
965 : // will not be correctly updated
966 : // We don't need to send notifications there, as peers will sync with presence
967 0 : Json::Value value;
968 0 : value["uri"] = userId_;
969 0 : value["device"] = deviceId_;
970 0 : value["confId"] = hostedCall.first;
971 0 : value["type"] = "application/call-history+json";
972 0 : auto now = std::chrono::system_clock::now();
973 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
974 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
975 0 : auto itActive = std::find_if(activeCalls_.begin(),
976 : activeCalls_.end(),
977 0 : [this, confId = hostedCall.first](const auto& value) {
978 0 : return value.at("id") == confId && value.at("uri") == userId_
979 0 : && value.at("device") == deviceId_;
980 : });
981 0 : if (itActive != activeCalls_.end())
982 0 : activeCalls_.erase(itActive);
983 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
984 :
985 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
986 0 : }
987 18 : hostedCalls_.clear();
988 18 : saveActiveCalls();
989 18 : saveHostedCalls();
990 36 : return commits;
991 18 : }
992 :
993 : std::vector<libjami::SwarmMessage>
994 1499 : Conversation::Impl::loadMessages(const LogOptions& options, History* optHistory)
995 : {
996 1499 : auto history = optHistory ? optHistory : &loadedHistory_;
997 :
998 : // history->mutex is locked by the caller
999 1499 : if (!repository_ || history->loading) {
1000 0 : return {};
1001 : }
1002 1499 : history->loading = true;
1003 :
1004 : // By convention, if options.nbOfCommits is zero, then we
1005 : // don't impose a limit on the number of commits returned.
1006 1499 : bool limitNbOfCommits = options.nbOfCommits > 0;
1007 :
1008 1499 : auto startLogging = options.from == "";
1009 1499 : auto breakLogging = false;
1010 1499 : auto currentHistorySize = loadedHistory_.messageList.size();
1011 1499 : std::vector<std::string> replies;
1012 1499 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
1013 2998 : repository_->log(
1014 : /* preCondition */
1015 8515 : [&](const auto& id, const auto& author, const auto& commit) {
1016 8515 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
1017 1 : return CallbackResult::Skip;
1018 : }
1019 8514 : if (id == options.to) {
1020 703 : if (options.includeTo)
1021 0 : breakLogging = true; // For the next commit
1022 : }
1023 8508 : if (replies.empty()) { // This avoid load until
1024 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
1025 : // until this commit
1026 17020 : if ((limitNbOfCommits
1027 8510 : && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
1028 1 : return CallbackResult::Break; // Stop logging
1029 8509 : if (breakLogging)
1030 0 : return CallbackResult::Break; // Stop logging
1031 8509 : if (id == options.to && !options.includeTo) {
1032 702 : return CallbackResult::Break; // Stop logging
1033 : }
1034 : }
1035 :
1036 7800 : if (!startLogging && options.from != "" && options.from == id)
1037 1110 : startLogging = true;
1038 7806 : if (!startLogging)
1039 24 : return CallbackResult::Skip; // Start logging after this one
1040 :
1041 7782 : if (options.fastLog) {
1042 7106 : if (options.authorUri != "") {
1043 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
1044 1 : return CallbackResult::Break; // Found author, stop
1045 : }
1046 : }
1047 : }
1048 :
1049 7781 : return CallbackResult::Ok; // Continue
1050 : },
1051 : /* emplaceCb */
1052 7787 : [&](auto&& cc) {
1053 7787 : if (limitNbOfCommits && (msgList.size() == options.nbOfCommits))
1054 292 : return;
1055 7496 : auto optMessage = repository_->convCommitToMap(cc);
1056 7494 : if (!optMessage.has_value())
1057 1 : return;
1058 7492 : auto message = optMessage.value();
1059 7494 : if (message.find("reply-to") != message.end()) {
1060 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
1061 1 : if (it == replies.end()) {
1062 1 : replies.emplace_back(message.at("reply-to"));
1063 : }
1064 : }
1065 7489 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
1066 7494 : if (it != replies.end()) {
1067 1 : replies.erase(it);
1068 : }
1069 7497 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
1070 7497 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
1071 0 : firstMsg = *loadedHistory_.messageList.rbegin();
1072 : }
1073 22493 : auto added = addToHistory(*history, {message}, false, false);
1074 7496 : if (!added.empty() && firstMsg) {
1075 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg);
1076 : }
1077 7498 : msgList.insert(msgList.end(), added.begin(), added.end());
1078 7498 : },
1079 : /* postCondition */
1080 7777 : [&](auto, auto, auto) {
1081 : // Stop logging if there was a limit set on the number of commits
1082 : // to return and we reached it. This isn't strictly necessary since
1083 : // the check at the beginning of `emplaceCb` ensures that we won't
1084 : // return too many messages, but it prevents us from needlessly
1085 : // iterating over a (potentially) large number of commits.
1086 7777 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
1087 : },
1088 1499 : options.from,
1089 1499 : options.logIfNotFound);
1090 :
1091 1499 : history->loading = false;
1092 1499 : history->cv.notify_all();
1093 :
1094 : // Convert for client (remove ptr)
1095 1499 : std::vector<libjami::SwarmMessage> ret;
1096 1499 : ret.reserve(msgList.size());
1097 8989 : for (const auto& msg : msgList) {
1098 7489 : ret.emplace_back(*msg);
1099 : }
1100 1500 : return ret;
1101 1499 : }
1102 :
1103 : void
1104 3 : Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
1105 : {
1106 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
1107 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1108 3 : if (peditIt != history.pendingEditions.end()) {
1109 0 : auto oldBody = sharedCommit->body;
1110 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1111 0 : if (sharedCommit->body.at("body").empty())
1112 0 : return;
1113 0 : history.pendingEditions.erase(peditIt);
1114 0 : }
1115 3 : if (it != history.quickAccess.end()) {
1116 3 : it->second->reactions.emplace_back(sharedCommit->body);
1117 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
1118 3 : repository_->id(),
1119 3 : it->second->id,
1120 3 : sharedCommit->body);
1121 : } else {
1122 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
1123 : }
1124 : }
1125 :
1126 : void
1127 9 : Conversation::Impl::handleEdition(History& history,
1128 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1129 : bool messageReceived) const
1130 : {
1131 18 : auto editId = sharedCommit->body.at("edit");
1132 9 : auto it = history.quickAccess.find(editId);
1133 9 : if (it != history.quickAccess.end()) {
1134 7 : auto baseCommit = it->second;
1135 7 : if (baseCommit) {
1136 7 : auto itReact = baseCommit->body.find("react-to");
1137 7 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ? "tid" : "body";
1138 7 : auto body = sharedCommit->body.at(toReplace);
1139 : // Edit reaction
1140 7 : if (itReact != baseCommit->body.end()) {
1141 1 : baseCommit->body[toReplace] = body; // Replace body if pending
1142 1 : it = history.quickAccess.find(itReact->second);
1143 1 : auto itPending = history.pendingReactions.find(itReact->second);
1144 1 : if (it != history.quickAccess.end()) {
1145 1 : baseCommit = it->second; // Base commit
1146 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1147 1 : baseCommit->reactions.end(),
1148 1 : [&](const auto& reaction) {
1149 1 : return reaction.at("id") == editId;
1150 : });
1151 1 : if (itPreviousReact != baseCommit->reactions.end()) {
1152 1 : (*itPreviousReact)[toReplace] = body;
1153 1 : if (body.empty()) {
1154 1 : baseCommit->reactions.erase(itPreviousReact);
1155 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
1156 1 : repository_->id(),
1157 1 : baseCommit->id,
1158 : editId);
1159 : }
1160 : }
1161 0 : } else if (itPending != history.pendingReactions.end()) {
1162 : // Else edit if pending
1163 0 : auto itReaction = std::find_if(itPending->second.begin(),
1164 0 : itPending->second.end(),
1165 0 : [&](const auto& reaction) { return reaction.at("id") == editId; });
1166 0 : if (itReaction != itPending->second.end()) {
1167 0 : (*itReaction)[toReplace] = body;
1168 0 : if (body.empty())
1169 0 : itPending->second.erase(itReaction);
1170 : }
1171 : } else {
1172 : // Add to pending edtions
1173 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1174 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1175 : }
1176 : } else {
1177 : // Normal message
1178 6 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1179 6 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1180 6 : if (toReplace == "tid") {
1181 : // Avoid to replace fileId in client
1182 2 : it->second->body["fileId"] = "";
1183 : }
1184 : // Remove reactions
1185 6 : if (sharedCommit->body.at(toReplace).empty())
1186 3 : it->second->reactions.clear();
1187 6 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1188 : }
1189 7 : }
1190 7 : } else {
1191 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1192 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1193 : }
1194 9 : }
1195 :
1196 : bool
1197 8722 : Conversation::Impl::handleMessage(History& history,
1198 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1199 : bool messageReceived) const
1200 : {
1201 8722 : if (messageReceived) {
1202 : // For a received message, we place it at the beginning of the list
1203 1213 : if (!history.messageList.empty())
1204 968 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1205 1213 : history.messageList.emplace_front(sharedCommit);
1206 : } else {
1207 : // For a loaded message, we load from newest to oldest
1208 : // So we change the parent of the last message.
1209 7509 : if (!history.messageList.empty())
1210 6017 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1211 7502 : history.messageList.emplace_back(sharedCommit);
1212 : }
1213 : // Handle pending reactions/editions
1214 8719 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1215 8719 : if (reactIt != history.pendingReactions.end()) {
1216 0 : for (const auto& commitBody : reactIt->second)
1217 0 : sharedCommit->reactions.emplace_back(commitBody);
1218 0 : history.pendingReactions.erase(reactIt);
1219 : }
1220 8720 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1221 8715 : if (peditIt != history.pendingEditions.end()) {
1222 0 : auto oldBody = sharedCommit->body;
1223 0 : if (sharedCommit->type == "application/data-transfer+json") {
1224 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1225 0 : sharedCommit->body["fileId"] = "";
1226 : } else {
1227 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1228 : }
1229 0 : peditIt->second.pop_front();
1230 0 : for (const auto& commit : peditIt->second) {
1231 0 : sharedCommit->editions.emplace_back(commit->body);
1232 : }
1233 0 : sharedCommit->editions.emplace_back(oldBody);
1234 0 : history.pendingEditions.erase(peditIt);
1235 0 : }
1236 : // Announce to client
1237 8716 : if (messageReceived)
1238 1213 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
1239 8720 : return !messageReceived;
1240 : }
1241 :
1242 : void
1243 8730 : Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const
1244 : {
1245 8730 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1246 8732 : auto currentMessage = message;
1247 :
1248 20758 : while (parentIt != history.quickAccess.end()) {
1249 12033 : const auto& parent = parentIt->second;
1250 14355 : for (const auto& [peer, value] : message->status) {
1251 13246 : auto parentStatusIt = parent->status.find(peer);
1252 13217 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1253 2320 : parent->status[peer] = value;
1254 4634 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
1255 2318 : repository_->id(),
1256 : peer,
1257 2320 : parent->id,
1258 : value);
1259 10923 : } else if (parentStatusIt->second >= value) {
1260 10924 : break;
1261 : }
1262 : }
1263 12037 : currentMessage = parent;
1264 12037 : parentIt = history.quickAccess.find(parent->linearizedParent);
1265 : }
1266 8729 : }
1267 :
1268 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1269 8719 : Conversation::Impl::addToHistory(History& history,
1270 : const std::vector<std::map<std::string, std::string>>& commits,
1271 : bool messageReceived,
1272 : bool commitFromSelf)
1273 : {
1274 : //
1275 : // NOTE: This function makes the following assumptions on its arguments:
1276 : // - The messages in "history" are in reverse chronological order (newest message
1277 : // first, oldest message last).
1278 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1279 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1280 : // They are therefore inserted at the beginning of the message list.
1281 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1282 : // reverse chronological order (newest to oldest) and to be older than the ones in
1283 : // "history". They are therefore appended at the end of the message list.
1284 : //
1285 8719 : auto acc = account_.lock();
1286 8712 : if (!acc)
1287 0 : return {};
1288 8711 : auto username = acc->getUsername();
1289 8711 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1290 0 : std::unique_lock lk(history.mutex);
1291 0 : history.cv.wait(lk, [&] { return !history.loading; });
1292 0 : }
1293 :
1294 : // Only set messages' status on history for client
1295 8711 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1296 :
1297 8711 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1298 17465 : for (const auto& commit : commits) {
1299 17498 : auto commitId = commit.at("id");
1300 8745 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1301 1 : continue; // Already present
1302 8750 : auto typeIt = commit.find("type");
1303 : // Nothing to show for the client, skip
1304 8744 : if (typeIt != commit.end() && typeIt->second == "merge")
1305 20 : continue;
1306 :
1307 8731 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1308 8732 : sharedCommit->fromMapStringString(commit);
1309 :
1310 8733 : if (needToSetMessageStatus) {
1311 918 : std::lock_guard lk(messageStatusMtx_);
1312 : // Check if we already have status information for the commit.
1313 919 : auto itFuture = futureStatus.find(sharedCommit->id);
1314 919 : if (itFuture != futureStatus.end()) {
1315 9 : sharedCommit->status = std::move(itFuture->second);
1316 9 : futureStatus.erase(itFuture);
1317 : }
1318 919 : }
1319 8734 : sharedCommits.emplace_back(sharedCommit);
1320 8747 : }
1321 :
1322 8717 : if (needToSetMessageStatus) {
1323 914 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1324 914 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1325 914 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1326 :
1327 914 : std::lock_guard lk(messageStatusMtx_);
1328 13514 : for (const auto& member : repository_->members()) {
1329 : // For each member, we iterate over the commits to add in reverse chronological
1330 : // order (i.e. from newest to oldest) and set their status from the point of view
1331 : // of that member (as best we can given the information we have).
1332 : //
1333 : // The key assumption made in order to compute the status is that it can never decrease
1334 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1335 : // therefore start by setting the "status" variable below to the lowest possible value,
1336 : // and increase it when we encounter a commit for which it is justified to do so.
1337 : //
1338 : // If messageReceived is true, then the commits we are adding are the most recent in the
1339 : // conversation history, so the lowest possible value is SENDING.
1340 : //
1341 : // If messageReceived is false, then the commits we are adding are older than the ones
1342 : // that are already in the history, so the lowest possible value is the status of the
1343 : // oldest message in the history so far, which is stored in memberToStatus.
1344 12604 : auto status = SENDING;
1345 12604 : if (!messageReceived) {
1346 12 : auto cache = memberToStatus[member.uri];
1347 12 : if (cache > status)
1348 9 : status = cache;
1349 : }
1350 12604 : auto& messagesStatus = messagesStatus_[member.uri];
1351 :
1352 25214 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1353 12620 : auto sharedCommit = *it;
1354 12619 : auto previousStatus = status;
1355 12619 : auto& commitStatus = sharedCommit->status[member.uri];
1356 :
1357 : // Compute status for the current commit.
1358 12619 : if (status < SENT && messagesStatus["fetched"] == sharedCommit->id) {
1359 9 : status = SENT;
1360 : }
1361 12606 : if (messagesStatus["read"] == sharedCommit->id) {
1362 1 : status = DISPLAYED;
1363 : }
1364 12603 : if (member.uri == sharedCommit->body.at("author")) {
1365 917 : status = DISPLAYED;
1366 : }
1367 12603 : if (status < commitStatus) {
1368 0 : status = commitStatus;
1369 : }
1370 :
1371 : // Store computed value.
1372 12603 : commitStatus = status;
1373 :
1374 : // Update messagesStatus_ if needed.
1375 12603 : if (previousStatus == SENDING && status >= SENT) {
1376 913 : messagesStatus["fetched"] = sharedCommit->id;
1377 : }
1378 12604 : if (previousStatus <= SENT && status == DISPLAYED) {
1379 906 : messagesStatus["read"] = sharedCommit->id;
1380 : }
1381 12604 : }
1382 :
1383 12600 : if (!messageReceived) {
1384 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1385 12 : memberToStatus[member.uri] = status;
1386 : }
1387 910 : }
1388 914 : }
1389 :
1390 8717 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1391 17448 : for (const auto& sharedCommit : sharedCommits) {
1392 8730 : history.quickAccess[sharedCommit->id] = sharedCommit;
1393 :
1394 8731 : auto reactToIt = sharedCommit->body.find("react-to");
1395 8724 : auto editIt = sharedCommit->body.find("edit");
1396 8726 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1397 3 : handleReaction(history, sharedCommit);
1398 8728 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1399 9 : handleEdition(history, sharedCommit, messageReceived);
1400 8721 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1401 7508 : messages.emplace_back(sharedCommit);
1402 : }
1403 8728 : rectifyStatus(sharedCommit, history);
1404 : }
1405 :
1406 8720 : return messages;
1407 8720 : }
1408 :
1409 186 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1410 : ConversationMode mode,
1411 186 : const std::string& otherMember)
1412 186 : : pimpl_ {new Impl {account, mode, otherMember}}
1413 186 : {}
1414 :
1415 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
1416 18 : : pimpl_ {new Impl {account, conversationId}}
1417 18 : {}
1418 :
1419 193 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1420 : const std::string& remoteDevice,
1421 193 : const std::string& conversationId)
1422 193 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1423 193 : {}
1424 :
1425 391 : Conversation::~Conversation() {}
1426 :
1427 : std::string
1428 4915 : Conversation::id() const
1429 : {
1430 4915 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1431 : }
1432 :
1433 : void
1434 139 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1435 : {
1436 : try {
1437 139 : if (mode() == ConversationMode::ONE_TO_ONE) {
1438 : // Only authorize to add left members
1439 1 : auto initialMembers = getInitialMembers();
1440 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1441 1 : if (it == initialMembers.end()) {
1442 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1443 1 : cb(false, "");
1444 1 : return;
1445 : }
1446 1 : }
1447 0 : } catch (const std::exception& e) {
1448 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1449 0 : cb(false, "");
1450 0 : return;
1451 0 : }
1452 138 : if (isMember(contactUri, true)) {
1453 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1454 0 : cb(false, "");
1455 0 : return;
1456 : }
1457 138 : if (isBanned(contactUri)) {
1458 3 : if (pimpl_->isAdmin()) {
1459 2 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1460 2 : if (auto sthis = w.lock()) {
1461 2 : auto members = sthis->pimpl_->repository_->members();
1462 2 : auto type = sthis->pimpl_->bannedType(contactUri);
1463 2 : if (type.empty()) {
1464 0 : cb(false, {});
1465 0 : return;
1466 : }
1467 2 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1468 4 : }
1469 : });
1470 : } else {
1471 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1472 1 : cb(false, "");
1473 : }
1474 3 : return;
1475 : }
1476 :
1477 135 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1478 135 : if (auto sthis = w.lock()) {
1479 : // Add member files and commit
1480 135 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1481 135 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1482 135 : if (not commit.empty())
1483 135 : sthis->pimpl_->announce(commit, true);
1484 135 : lk.unlock();
1485 135 : if (cb)
1486 135 : cb(!commit.empty(), commit);
1487 270 : }
1488 135 : });
1489 : }
1490 :
1491 : std::shared_ptr<dhtnet::ChannelSocket>
1492 4462 : Conversation::gitSocket(const DeviceId& deviceId) const
1493 : {
1494 4462 : return pimpl_->gitSocket(deviceId);
1495 : }
1496 :
1497 : void
1498 2037 : Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1499 : {
1500 2037 : pimpl_->addGitSocket(deviceId, socket);
1501 2036 : }
1502 :
1503 : void
1504 813 : Conversation::removeGitSocket(const DeviceId& deviceId)
1505 : {
1506 813 : pimpl_->removeGitSocket(deviceId);
1507 813 : }
1508 :
1509 : void
1510 382 : Conversation::shutdownConnections()
1511 : {
1512 382 : pimpl_->gitSocketList_.clear();
1513 382 : if (pimpl_->swarmManager_)
1514 382 : pimpl_->swarmManager_->shutdown();
1515 382 : }
1516 :
1517 : void
1518 0 : Conversation::connectivityChanged()
1519 : {
1520 0 : if (pimpl_->swarmManager_)
1521 0 : pimpl_->swarmManager_->maintainBuckets();
1522 0 : }
1523 :
1524 : std::vector<jami::DeviceId>
1525 0 : Conversation::getDeviceIdList() const
1526 : {
1527 0 : return pimpl_->swarmManager_->getAllNodes();
1528 : }
1529 :
1530 : std::shared_ptr<Typers>
1531 9 : Conversation::typers() const
1532 : {
1533 9 : return pimpl_->typers_;
1534 : }
1535 :
1536 : std::vector<std::map<std::string, std::string>>
1537 0 : Conversation::getConnectivity() const
1538 : {
1539 0 : return pimpl_->getConnectivity();
1540 : }
1541 :
1542 : bool
1543 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1544 : {
1545 0 : if (!pimpl_->swarmManager_)
1546 0 : return false;
1547 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1548 : }
1549 :
1550 : void
1551 2 : Conversation::Impl::voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb)
1552 : {
1553 : // Check if admin
1554 2 : if (!isAdmin()) {
1555 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1556 0 : cb(false, {});
1557 0 : return;
1558 : }
1559 :
1560 : // Vote for removal
1561 2 : std::unique_lock lk(writeMtx_);
1562 2 : auto voteCommit = repository_->voteUnban(contactUri, type);
1563 2 : if (voteCommit.empty()) {
1564 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1565 0 : cb(false, "");
1566 0 : return;
1567 : }
1568 :
1569 2 : auto lastId = voteCommit;
1570 2 : std::vector<std::string> commits;
1571 2 : commits.emplace_back(voteCommit);
1572 :
1573 : // If admin, check vote
1574 4 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1575 2 : if (!resolveCommit.empty()) {
1576 2 : commits.emplace_back(resolveCommit);
1577 2 : lastId = resolveCommit;
1578 2 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1579 : }
1580 2 : announce(commits, true);
1581 2 : lk.unlock();
1582 2 : if (cb)
1583 2 : cb(!lastId.empty(), lastId);
1584 2 : }
1585 :
1586 : void
1587 15 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1588 : {
1589 45 : dht::ThreadPool::io().run(
1590 30 : [w = weak(), contactUri = std::move(contactUri), isDevice = std::move(isDevice), cb = std::move(cb)] {
1591 15 : if (auto sthis = w.lock()) {
1592 : // Check if admin
1593 15 : if (!sthis->pimpl_->isAdmin()) {
1594 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1595 1 : cb(false, {});
1596 2 : return;
1597 : }
1598 :
1599 : // Get current user type
1600 14 : std::string type;
1601 14 : if (isDevice) {
1602 0 : type = "devices";
1603 : } else {
1604 14 : auto members = sthis->pimpl_->repository_->members();
1605 28 : for (const auto& member : members) {
1606 28 : if (member.uri == contactUri) {
1607 14 : if (member.role == MemberRole::INVITED) {
1608 3 : type = "invited";
1609 11 : } else if (member.role == MemberRole::ADMIN) {
1610 1 : type = "admins";
1611 10 : } else if (member.role == MemberRole::MEMBER) {
1612 10 : type = "members";
1613 : }
1614 14 : break;
1615 : }
1616 : }
1617 14 : if (type.empty()) {
1618 0 : cb(false, {});
1619 0 : return;
1620 : }
1621 14 : }
1622 :
1623 : // Vote for removal
1624 14 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1625 14 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1626 14 : if (voteCommit.empty()) {
1627 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1628 1 : cb(false, "");
1629 1 : return;
1630 : }
1631 :
1632 13 : auto lastId = voteCommit;
1633 13 : std::vector<std::string> commits;
1634 13 : commits.emplace_back(voteCommit);
1635 :
1636 : // If admin, check vote
1637 26 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1638 13 : if (!resolveCommit.empty()) {
1639 13 : commits.emplace_back(resolveCommit);
1640 13 : lastId = resolveCommit;
1641 13 : JAMI_WARN("Vote solved for %s. %s banned", contactUri.c_str(), isDevice ? "Device" : "Member");
1642 13 : sthis->pimpl_->disconnectFromPeer(contactUri);
1643 : }
1644 :
1645 13 : sthis->pimpl_->announce(commits, true);
1646 13 : lk.unlock();
1647 13 : cb(!lastId.empty(), lastId);
1648 31 : }
1649 : });
1650 15 : }
1651 :
1652 : std::vector<std::map<std::string, std::string>>
1653 101 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1654 : {
1655 101 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1656 : }
1657 :
1658 : std::vector<std::map<std::string, std::string>>
1659 0 : Conversation::getTrackedMembers() const
1660 : {
1661 0 : return pimpl_->getTrackedMembers();
1662 : }
1663 :
1664 : std::set<std::string>
1665 2485 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1666 : {
1667 2485 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1668 : }
1669 :
1670 : std::vector<NodeId>
1671 1692 : Conversation::peersToSyncWith() const
1672 : {
1673 1692 : auto s = pimpl_->swarmManager_->getConnectedNodes();
1674 13576 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1675 11885 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1676 5601 : s.emplace_back(deviceId);
1677 1692 : return s;
1678 0 : }
1679 :
1680 : bool
1681 1691 : Conversation::isBootstrapped() const
1682 : {
1683 1691 : return pimpl_->swarmManager_->isConnected();
1684 : }
1685 :
1686 : std::string
1687 13970 : Conversation::uriFromDevice(const std::string& deviceId) const
1688 : {
1689 13970 : return pimpl_->repository_->uriFromDevice(deviceId);
1690 : }
1691 :
1692 : void
1693 0 : Conversation::monitor()
1694 : {
1695 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1696 0 : }
1697 :
1698 : std::string
1699 187 : Conversation::join()
1700 : {
1701 187 : return pimpl_->repository_->join();
1702 : }
1703 :
1704 : bool
1705 3909 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1706 : {
1707 3909 : auto uriCrt = uri + ".crt"sv;
1708 7814 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::ADMINS / uriCrt)
1709 7817 : || std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::MEMBERS / uriCrt)) {
1710 2870 : return true;
1711 : }
1712 1036 : if (includeInvited) {
1713 765 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::INVITED / uri)) {
1714 486 : return true;
1715 : }
1716 279 : if (mode() == ConversationMode::ONE_TO_ONE) {
1717 3 : for (const auto& member : getInitialMembers()) {
1718 2 : if (member == uri)
1719 0 : return true;
1720 1 : }
1721 : }
1722 : }
1723 550 : return false;
1724 3906 : }
1725 :
1726 : bool
1727 5099 : Conversation::isBanned(const std::string& uri) const
1728 : {
1729 5099 : return !pimpl_->bannedType(uri).empty();
1730 : }
1731 :
1732 : void
1733 141 : Conversation::sendMessage(Json::Value&& value, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1734 : {
1735 141 : if (!replyTo.empty()) {
1736 2 : if (!pimpl_->repository_->hasCommit(replyTo)) {
1737 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1738 1 : return;
1739 : }
1740 1 : value["reply-to"] = replyTo;
1741 : }
1742 560 : dht::ThreadPool::io().run(
1743 420 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1744 140 : if (auto sthis = w.lock()) {
1745 140 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1746 140 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1747 140 : lk.unlock();
1748 140 : if (onCommit)
1749 14 : onCommit(commit);
1750 140 : sthis->pimpl_->announce(commit, true);
1751 140 : if (cb)
1752 140 : cb(!commit.empty(), commit);
1753 280 : }
1754 140 : });
1755 : }
1756 :
1757 : bool
1758 12617 : Conversation::hasCommit(const std::string& commitId) const
1759 : {
1760 12617 : return pimpl_->repository_->hasCommit(commitId);
1761 : }
1762 :
1763 : std::optional<std::map<std::string, std::string>>
1764 35 : Conversation::getCommit(const std::string& commitId) const
1765 : {
1766 35 : auto commit = pimpl_->repository_->getCommit(commitId);
1767 35 : if (commit == std::nullopt)
1768 1 : return std::nullopt;
1769 34 : return pimpl_->repository_->convCommitToMap(*commit);
1770 35 : }
1771 :
1772 : void
1773 2 : Conversation::loadMessages(const OnLoadMessages& cb, const LogOptions& options)
1774 : {
1775 2 : if (!cb)
1776 0 : return;
1777 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1778 2 : if (auto sthis = w.lock()) {
1779 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1780 2 : auto result = sthis->pimpl_->loadMessages(options);
1781 2 : lk.unlock();
1782 2 : cb(std::move(result));
1783 4 : }
1784 2 : });
1785 : }
1786 :
1787 : void
1788 0 : Conversation::clearCache()
1789 : {
1790 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1791 0 : pimpl_->loadedHistory_.messageList.clear();
1792 0 : pimpl_->loadedHistory_.quickAccess.clear();
1793 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1794 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1795 : {
1796 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1797 0 : pimpl_->memberToStatus.clear();
1798 0 : }
1799 0 : }
1800 :
1801 : std::string
1802 2078 : Conversation::lastCommitId() const
1803 : {
1804 : {
1805 2078 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1806 2079 : if (!pimpl_->loadedHistory_.messageList.empty())
1807 3399 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1808 2078 : }
1809 379 : LogOptions options;
1810 379 : options.nbOfCommits = 1;
1811 379 : options.skipMerge = true;
1812 379 : History optHistory;
1813 379 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1814 379 : auto res = pimpl_->loadMessages(options, &optHistory);
1815 379 : if (res.empty())
1816 1 : return {};
1817 756 : return (*optHistory.messageList.begin())->id;
1818 379 : }
1819 :
1820 : bool
1821 1848 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1822 : {
1823 1848 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1824 5549 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId,
1825 3700 : std::deque<std::pair<std::string, OnPullCb>>());
1826 1849 : auto& pullcbs = it->second;
1827 1850 : auto itPull = std::find_if(pullcbs.begin(), pullcbs.end(), [&](const auto& elem) {
1828 0 : return std::get<0>(elem) == commitId;
1829 3700 : });
1830 1850 : if (itPull != pullcbs.end()) {
1831 0 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1832 : pimpl_->toString(),
1833 : deviceId,
1834 : commitId);
1835 0 : cb(false);
1836 0 : return false;
1837 : }
1838 7393 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1839 1850 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1840 1850 : if (notInProgress)
1841 1821 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1842 1821 : if (auto sthis_ = w.lock())
1843 1821 : sthis_->pimpl_->pull(deviceId);
1844 1821 : });
1845 1850 : return true;
1846 1850 : }
1847 :
1848 : void
1849 1821 : Conversation::Impl::pull(const std::string& deviceId)
1850 : {
1851 1821 : auto& repo = repository_;
1852 :
1853 1821 : std::string commitId;
1854 1821 : OnPullCb cb;
1855 : while (true) {
1856 : {
1857 3670 : std::lock_guard lk(pullcbsMtx_);
1858 3671 : auto it = fetchingRemotes_.find(deviceId);
1859 3671 : if (it == fetchingRemotes_.end()) {
1860 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1861 0 : break;
1862 : }
1863 3671 : auto& pullcbs = it->second;
1864 3671 : if (pullcbs.empty()) {
1865 1821 : fetchingRemotes_.erase(it);
1866 1821 : break;
1867 : }
1868 1850 : auto& elem = pullcbs.front();
1869 1850 : commitId = std::move(std::get<0>(elem));
1870 1849 : cb = std::move(std::get<1>(elem));
1871 1849 : pullcbs.pop_front();
1872 3670 : }
1873 : // If recently fetched, the commit can already be there, so no need to do complex operations
1874 1850 : if (commitId != "" && repo->hasCommit(commitId)) {
1875 115 : cb(true);
1876 137 : continue;
1877 : }
1878 : // Pull from remote
1879 1735 : auto fetched = repo->fetch(deviceId);
1880 1735 : if (!fetched) {
1881 22 : cb(false);
1882 22 : continue;
1883 : }
1884 :
1885 1713 : auto oldHead = repo->getHead();
1886 :
1887 1713 : std::unique_lock lk(writeMtx_);
1888 : auto commits = repo->mergeHistory(deviceId,
1889 1718 : [this](const std::string& peerUri) { this->disconnectFromPeer(peerUri); });
1890 1712 : if (!commits.empty()) {
1891 905 : announce(commits);
1892 : }
1893 1713 : lk.unlock();
1894 :
1895 1713 : bool commitFound = false;
1896 1713 : if (commitId != "") {
1897 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1898 : // We need to check if we actually got it; the fact that the fetch above was
1899 : // successful doesn't guarantee that we did.
1900 1350 : for (const auto& commit : commits) {
1901 911 : if (commit.at("id") == commitId) {
1902 902 : commitFound = true;
1903 902 : break;
1904 : }
1905 : }
1906 : } else {
1907 371 : commitFound = true;
1908 : }
1909 1712 : if (!commitFound)
1910 1757 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1911 : deviceId,
1912 : commitId);
1913 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1914 : // for commit `commitId` to other members of the swarm. It's important that we only
1915 : // send these notifications if we actually have the commit. Otherwise, we can end up
1916 : // in a situation where the members of the swarm keep sending notifications to each
1917 : // other for a commit that none of them have (note that we are unable to rule this out, as
1918 : // nothing prevents a malicious user from intentionally sending a notification with
1919 : // a fake commit ID).
1920 1713 : if (cb)
1921 1713 : cb(commitFound);
1922 :
1923 : // Announce if profile changed
1924 1712 : if (!commits.empty()) {
1925 1812 : auto diffStats = repo->diffStats("HEAD", oldHead);
1926 906 : auto changedFiles = repo->changedFiles(diffStats);
1927 906 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") != changedFiles.end()) {
1928 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(accountId_,
1929 5 : repo->id(),
1930 10 : repo->infos());
1931 : }
1932 906 : }
1933 3560 : }
1934 1821 : }
1935 :
1936 : void
1937 1848 : Conversation::sync(const std::string& member, const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1938 : {
1939 1848 : pull(deviceId, std::move(cb), commitId);
1940 1850 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1941 1850 : auto sthis = w.lock();
1942 : // For waiting request, downloadFile
1943 1851 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1944 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1945 1850 : }
1946 1849 : });
1947 1850 : }
1948 :
1949 : std::map<std::string, std::string>
1950 269 : Conversation::generateInvitation() const
1951 : {
1952 : // Invite the new member to the conversation
1953 269 : Json::Value root;
1954 269 : auto& metadata = root[ConversationMapKeys::METADATAS];
1955 544 : for (const auto& [k, v] : infos()) {
1956 275 : if (v.size() >= 64000) {
1957 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1958 0 : continue;
1959 0 : }
1960 275 : metadata[k] = v;
1961 269 : }
1962 269 : root[ConversationMapKeys::CONVERSATIONID] = id();
1963 807 : return {{"application/invite+json", json::toString(root)}};
1964 269 : }
1965 :
1966 : std::string
1967 7 : Conversation::leave()
1968 : {
1969 7 : setRemovingFlag();
1970 7 : std::lock_guard lk(pimpl_->writeMtx_);
1971 14 : return pimpl_->repository_->leave();
1972 7 : }
1973 :
1974 : void
1975 10 : Conversation::setRemovingFlag()
1976 : {
1977 10 : pimpl_->isRemoving_ = true;
1978 10 : }
1979 :
1980 : bool
1981 3767 : Conversation::isRemoving()
1982 : {
1983 3767 : return pimpl_->isRemoving_;
1984 : }
1985 :
1986 : void
1987 21 : Conversation::erase()
1988 : {
1989 21 : if (pimpl_->conversationDataPath_ != "")
1990 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1991 21 : if (!pimpl_->repository_)
1992 0 : return;
1993 21 : std::lock_guard lk(pimpl_->writeMtx_);
1994 21 : pimpl_->repository_->erase();
1995 21 : }
1996 :
1997 : ConversationMode
1998 4420 : Conversation::mode() const
1999 : {
2000 4420 : return pimpl_->repository_->mode();
2001 : }
2002 :
2003 : std::vector<std::string>
2004 30 : Conversation::getInitialMembers() const
2005 : {
2006 30 : return pimpl_->repository_->getInitialMembers();
2007 : }
2008 :
2009 : bool
2010 0 : Conversation::isInitialMember(const std::string& uri) const
2011 : {
2012 0 : auto members = getInitialMembers();
2013 0 : return std::find(members.begin(), members.end(), uri) != members.end();
2014 0 : }
2015 :
2016 : void
2017 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2018 : {
2019 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2020 9 : if (auto sthis = w.lock()) {
2021 9 : auto& repo = sthis->pimpl_->repository_;
2022 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
2023 9 : auto commit = repo->updateInfos(map);
2024 9 : sthis->pimpl_->announce(commit, true);
2025 9 : lk.unlock();
2026 9 : if (cb)
2027 9 : cb(!commit.empty(), commit);
2028 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
2029 9 : repo->id(),
2030 18 : repo->infos());
2031 18 : }
2032 9 : });
2033 9 : }
2034 :
2035 : std::map<std::string, std::string>
2036 316 : Conversation::infos() const
2037 : {
2038 316 : return pimpl_->repository_->infos();
2039 : }
2040 :
2041 : void
2042 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2043 : {
2044 8 : const auto& filePath = pimpl_->preferencesPath_;
2045 8 : auto prefs = map;
2046 8 : auto itLast = prefs.find(LAST_MODIFIED);
2047 8 : if (itLast != prefs.end()) {
2048 3 : std::error_code ec;
2049 3 : if (std::filesystem::is_regular_file(filePath, ec)) {
2050 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
2051 : try {
2052 1 : if (lastModified >= to_int<uint64_t>(itLast->second))
2053 0 : return;
2054 0 : } catch (...) {
2055 0 : return;
2056 0 : }
2057 : }
2058 3 : prefs.erase(itLast);
2059 : }
2060 :
2061 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2062 8 : msgpack::pack(file, prefs);
2063 8 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_, id(), std::move(prefs));
2064 8 : }
2065 :
2066 : std::map<std::string, std::string>
2067 106 : Conversation::preferences(bool includeLastModified) const
2068 : {
2069 : try {
2070 106 : std::map<std::string, std::string> preferences;
2071 106 : const auto& filePath = pimpl_->preferencesPath_;
2072 200 : auto file = fileutils::loadFile(filePath);
2073 11 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2074 11 : oh.get().convert(preferences);
2075 10 : if (includeLastModified)
2076 8 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
2077 10 : return preferences;
2078 198 : } catch (const std::exception& e) {
2079 94 : }
2080 95 : return {};
2081 : }
2082 :
2083 : std::vector<uint8_t>
2084 0 : Conversation::vCard() const
2085 : {
2086 : try {
2087 0 : return fileutils::loadFile(pimpl_->repoPath_ / "profile.vcf");
2088 0 : } catch (...) {
2089 0 : }
2090 0 : return {};
2091 : }
2092 :
2093 : std::shared_ptr<TransferManager>
2094 1960 : Conversation::dataTransfer() const
2095 : {
2096 1960 : return pimpl_->transferManager_;
2097 : }
2098 :
2099 : bool
2100 13 : Conversation::onFileChannelRequest(const std::string& member,
2101 : const std::string& fileId,
2102 : std::filesystem::path& path,
2103 : std::string& sha3sum) const
2104 : {
2105 13 : if (!isMember(member))
2106 0 : return false;
2107 :
2108 13 : auto sep = fileId.find('_');
2109 13 : if (sep == std::string::npos)
2110 0 : return false;
2111 :
2112 13 : auto interactionId = fileId.substr(0, sep);
2113 13 : auto commit = getCommit(interactionId);
2114 26 : if (commit == std::nullopt || commit->find("type") == commit->end() || commit->find("tid") == commit->end()
2115 26 : || commit->find("sha3sum") == commit->end() || commit->at("type") != "application/data-transfer+json") {
2116 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2117 : pimpl_->accountId_,
2118 : member,
2119 : interactionId);
2120 0 : return false;
2121 : }
2122 :
2123 13 : path = dataTransfer()->path(fileId);
2124 13 : sha3sum = commit->at("sha3sum");
2125 :
2126 13 : return true;
2127 13 : }
2128 :
2129 : bool
2130 14 : Conversation::downloadFile(const std::string& interactionId,
2131 : const std::string& fileId,
2132 : const std::string& path,
2133 : const std::string&,
2134 : const std::string& deviceId)
2135 : {
2136 14 : auto commit = getCommit(interactionId);
2137 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2138 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2139 0 : return false;
2140 : }
2141 14 : auto tid = commit->find("tid");
2142 14 : auto sha3sum = commit->find("sha3sum");
2143 14 : auto size_str = commit->find("totalSize");
2144 :
2145 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2146 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2147 0 : return false;
2148 : }
2149 :
2150 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2151 14 : if (totalSize < 0) {
2152 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2153 0 : return false;
2154 : }
2155 :
2156 : // Be sure to not lock conversation
2157 28 : dht::ThreadPool().io().run(
2158 14 : [w = weak(), deviceId, fileId, interactionId, sha3sum = sha3sum->second, path, totalSize] {
2159 14 : if (auto shared = w.lock()) {
2160 14 : std::filesystem::path filePath(path);
2161 14 : if (filePath.empty()) {
2162 0 : filePath = shared->dataTransfer()->path(fileId);
2163 : }
2164 :
2165 14 : std::error_code ec;
2166 14 : if (std::filesystem::file_size(filePath, ec) == static_cast<size_t>(totalSize)) {
2167 1 : if (fileutils::sha3File(filePath) == sha3sum) {
2168 4 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2169 1 : return;
2170 : }
2171 : }
2172 :
2173 13 : std::filesystem::path tempFilePath(filePath);
2174 13 : tempFilePath += ".tmp";
2175 13 : auto start = std::filesystem::file_size(tempFilePath, ec);
2176 13 : if (ec || start == static_cast<decltype(start)>(-1)) {
2177 11 : start = 0;
2178 : }
2179 13 : size_t end = 0;
2180 :
2181 13 : auto acc = shared->pimpl_->account_.lock();
2182 13 : if (!acc)
2183 0 : return;
2184 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2185 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2186 28 : }
2187 : });
2188 14 : return true;
2189 14 : }
2190 :
2191 : void
2192 1115 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2193 : {
2194 1115 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2195 1115 : auto sthis = w.lock();
2196 1115 : if (!sthis)
2197 0 : return;
2198 : // Update fetched for Uri
2199 1115 : auto uri = sthis->uriFromDevice(deviceId);
2200 1115 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2201 46 : return;
2202 : // When a user fetches a commit, the message is sent for this person
2203 1069 : sthis->pimpl_->updateStatus(uri,
2204 : libjami::Account::MessageStates::SENT,
2205 1069 : commitId,
2206 2138 : std::to_string(std::time(nullptr)),
2207 : true);
2208 1161 : });
2209 1115 : }
2210 :
2211 : void
2212 1115 : Conversation::Impl::updateStatus(const std::string& uri,
2213 : libjami::Account::MessageStates st,
2214 : const std::string& commitId,
2215 : const std::string& ts,
2216 : bool emit)
2217 : {
2218 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer
2219 : // send us a status and will emit to other connected devices.
2220 1115 : LogOptions options;
2221 1115 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2222 : {
2223 : // Update internal structures.
2224 1115 : std::lock_guard lk(messageStatusMtx_);
2225 1115 : auto& status = messagesStatus_[uri];
2226 1115 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2227 1115 : if (oldStatus == commitId)
2228 1 : return; // Nothing to do
2229 1114 : options.to = oldStatus;
2230 1114 : options.from = commitId;
2231 1114 : oldStatus = commitId;
2232 1114 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2233 1114 : saveStatus();
2234 1114 : if (emit)
2235 1084 : newStatus[uri].insert(status.begin(), status.end());
2236 1115 : }
2237 1114 : if (emit && messageStatusCb_) {
2238 1084 : messageStatusCb_(newStatus);
2239 : }
2240 : // Update messages status for all commit between the old and new one
2241 1114 : options.logIfNotFound = false;
2242 1114 : options.fastLog = true;
2243 1114 : History optHistory;
2244 1114 : std::unique_lock lk(optHistory.mutex); // Avoid to announce messages while updating status.
2245 1114 : auto res = loadMessages(options, &optHistory);
2246 1114 : std::unique_lock mlk(messageStatusMtx_);
2247 1114 : std::vector<std::pair<std::string, int32_t>> statusToUpdate;
2248 1114 : if (res.size() == 0) {
2249 : // In this case, commit is not received yet, so we cache it
2250 6 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2251 : }
2252 8217 : for (const auto& [cid, _] : optHistory.quickAccess) {
2253 7103 : auto message = loadedHistory_.quickAccess.find(cid);
2254 7103 : if (message != loadedHistory_.quickAccess.end()) {
2255 : // Update message and emit to client,
2256 3886 : if (static_cast<int32_t>(st) > message->second->status[uri]) {
2257 3883 : message->second->status[uri] = static_cast<int32_t>(st);
2258 3883 : statusToUpdate.emplace_back(cid, static_cast<int32_t>(st));
2259 : }
2260 : } else {
2261 : // In this case, commit is not loaded by client, so we cache it
2262 : // No need to emit to client, they will get a correct status on load.
2263 3217 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2264 : }
2265 : }
2266 1114 : mlk.unlock();
2267 1114 : lk.unlock();
2268 4997 : for (const auto& [cid, status] : statusToUpdate)
2269 7764 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
2270 3883 : repository_->id(),
2271 : uri,
2272 : cid,
2273 : static_cast<int>(status));
2274 1116 : }
2275 :
2276 : bool
2277 18 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2278 : {
2279 18 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2280 18 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2281 2 : return false; // Nothing to do
2282 16 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2283 16 : auto sthis = w.lock();
2284 16 : if (!sthis)
2285 0 : return;
2286 16 : sthis->pimpl_->updateStatus(uri,
2287 : libjami::Account::MessageStates::DISPLAYED,
2288 16 : interactionId,
2289 32 : std::to_string(std::time(nullptr)),
2290 : true);
2291 16 : });
2292 16 : return true;
2293 18 : }
2294 :
2295 : std::map<std::string, std::map<std::string, std::string>>
2296 88 : Conversation::messageStatus() const
2297 : {
2298 88 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2299 176 : return pimpl_->messagesStatus_;
2300 88 : }
2301 :
2302 : void
2303 46 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2304 : {
2305 46 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2306 46 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2307 98 : for (const auto& [uri, status] : messageStatus) {
2308 52 : auto& oldMs = pimpl_->messagesStatus_[uri];
2309 52 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2310 26 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2311 0 : stVec.emplace_back(libjami::Account::MessageStates::SENT,
2312 : uri,
2313 26 : status.at("fetched"),
2314 52 : status.at("fetched_ts"));
2315 : }
2316 : }
2317 52 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2318 4 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2319 0 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED,
2320 : uri,
2321 4 : status.at("read"),
2322 8 : status.at("read_ts"));
2323 : }
2324 : }
2325 : }
2326 46 : lk.unlock();
2327 :
2328 76 : for (const auto& [status, uri, commitId, ts] : stVec) {
2329 30 : pimpl_->updateStatus(uri, status, commitId, ts);
2330 : }
2331 46 : }
2332 :
2333 : void
2334 391 : Conversation::onMessageStatusChanged(
2335 : const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2336 : {
2337 391 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2338 391 : pimpl_->messageStatusCb_ = cb;
2339 391 : }
2340 :
2341 : #ifdef LIBJAMI_TEST
2342 : void
2343 515 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2344 : {
2345 515 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2346 515 : pimpl_->bootstrapCbTest_ = cb;
2347 515 : }
2348 :
2349 : std::vector<libjami::SwarmMessage>
2350 0 : Conversation::loadMessagesSync(const LogOptions& options)
2351 : {
2352 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
2353 0 : auto result = pimpl_->loadMessages(options);
2354 0 : return result;
2355 0 : }
2356 :
2357 : void
2358 0 : Conversation::announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf)
2359 : {
2360 0 : pimpl_->announce(commits, commitFromSelf);
2361 0 : }
2362 :
2363 : void
2364 0 : Conversation::announce(const std::string& commitId, bool commitFromSelf)
2365 : {
2366 0 : pimpl_->announce(commitId, commitFromSelf);
2367 0 : }
2368 : #endif
2369 :
2370 : void
2371 514 : Conversation::bootstrap(std::function<void()> onBootstrapped, const std::vector<DeviceId>& knownDevices)
2372 : {
2373 514 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2374 514 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2375 0 : return;
2376 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2377 : // If it works, the callback onConnectionChanged will be called with ok=true
2378 514 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2379 514 : std::vector<DeviceId> devices = knownDevices;
2380 2056 : JAMI_DEBUG("{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2381 :
2382 514 : if (!devices.empty()) {
2383 0 : pimpl_->swarmManager_->setKnownNodes(devices);
2384 : }
2385 :
2386 514 : pimpl_->monitorConnection(weak_from_this());
2387 :
2388 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic
2389 : // connectivity change
2390 514 : if (pimpl_->swarmManager_->isShutdown()) {
2391 22 : pimpl_->swarmManager_->restart();
2392 22 : pimpl_->swarmManager_->maintainBuckets();
2393 : }
2394 514 : }
2395 :
2396 : void
2397 1599 : Conversation::addKnownDevices(const std::vector<DeviceId>& devices, const std::string& memberUri)
2398 : {
2399 1599 : if (devices.empty())
2400 890 : return;
2401 709 : if (!memberUri.empty()) {
2402 : // JAMI_WARNING("{} Adding {} known devices for member {}", pimpl_->toString(), devices.size(), memberUri);
2403 709 : std::lock_guard lk(pimpl_->trackedMembersMtx_);
2404 709 : auto it = pimpl_->trackedMembers_.find(memberUri);
2405 709 : if (it != pimpl_->trackedMembers_.end()) {
2406 706 : it->second.devices.insert(devices.begin(), devices.end());
2407 : }
2408 709 : } else {
2409 0 : JAMI_ERROR("{} Adding {} known devices without member URI", pimpl_->toString(), devices.size());
2410 : }
2411 709 : pimpl_->swarmManager_->setKnownNodes(devices);
2412 : }
2413 :
2414 : std::vector<std::string>
2415 18 : Conversation::commitsEndedCalls()
2416 : {
2417 18 : pimpl_->loadActiveCalls();
2418 18 : pimpl_->loadHostedCalls();
2419 18 : auto commits = pimpl_->commitsEndedCalls();
2420 18 : if (!commits.empty()) {
2421 : // Announce to client
2422 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2423 0 : if (auto sthis = w.lock())
2424 0 : sthis->pimpl_->announce(commits, true);
2425 0 : });
2426 : }
2427 18 : return commits;
2428 0 : }
2429 :
2430 : void
2431 391 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2432 : {
2433 391 : pimpl_->onMembersChanged_ = std::move(cb);
2434 391 : }
2435 :
2436 : void
2437 391 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2438 : {
2439 782 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket), w = weak()](const std::string& deviceId,
2440 : ChannelCb&& cb) {
2441 546 : if (auto sthis = w.lock()) {
2442 546 : auto wrappedCb = [cb = std::move(cb), w, deviceId](const std::shared_ptr<dhtnet::ChannelSocket>& socket) {
2443 525 : if (auto sthis = w.lock()) {
2444 520 : if (!socket) {
2445 275 : if (auto acc = sthis->pimpl_->account_.lock()) {
2446 276 : if (auto cert = acc->certStore().getCertificate(deviceId)) {
2447 277 : sthis->pimpl_->onConnectionFailed(DeviceId(deviceId), cert->issuer->getId().toString());
2448 : } else {
2449 0 : JAMI_WARNING("{} Unable to get member URI from device ID {}",
2450 : sthis->pimpl_->toString(),
2451 : deviceId);
2452 277 : }
2453 : } else {
2454 0 : return false;
2455 277 : }
2456 : }
2457 525 : }
2458 525 : return cb(socket);
2459 545 : };
2460 545 : needSocket(sthis->id(), deviceId, std::move(wrappedCb), "application/im-gitmessage-id");
2461 1092 : }
2462 1328 : };
2463 391 : }
2464 :
2465 : void
2466 487 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2467 : {
2468 487 : auto deviceId = channel->deviceId();
2469 : // Transmit avatar if necessary
2470 : // We do this here, because at this point we know both sides are connected and in
2471 : // the same conversation
2472 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2473 487 : auto cert = channel->peerCertificate();
2474 487 : if (!cert || !cert->issuer)
2475 0 : return;
2476 487 : auto member = cert->issuer->getId().toString();
2477 487 : pimpl_->swarmManager_->addChannel(std::move(channel));
2478 487 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2479 487 : auto sthis = w.lock();
2480 486 : if (auto account = a.lock()) {
2481 487 : account->sendProfile(sthis->id(), member, deviceId.toString());
2482 487 : }
2483 487 : });
2484 487 : }
2485 :
2486 : uint32_t
2487 4 : Conversation::countInteractions(const std::string& toId, const std::string& fromId, const std::string& authorUri) const
2488 : {
2489 4 : LogOptions options;
2490 4 : options.to = toId;
2491 4 : options.from = fromId;
2492 4 : options.authorUri = authorUri;
2493 4 : options.logIfNotFound = false;
2494 4 : options.fastLog = true;
2495 4 : History history;
2496 4 : std::lock_guard lk(history.mutex);
2497 4 : auto res = pimpl_->loadMessages(options, &history);
2498 8 : return res.size();
2499 4 : }
2500 :
2501 : void
2502 4 : Conversation::search(uint32_t req, const Filter& filter, const std::shared_ptr<std::atomic_int>& flag) const
2503 : {
2504 : // Because logging a conversation can take quite some time,
2505 : // do it asynchronously
2506 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2507 4 : if (auto sthis = w.lock()) {
2508 4 : History history;
2509 4 : std::vector<std::map<std::string, std::string>> commits {};
2510 : // std::regex_constants::ECMAScript is the default flag.
2511 4 : auto re = std::regex(filter.regexSearch,
2512 4 : filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2513 4 : sthis->pimpl_->repository_->log(
2514 20 : [&](const std::string& /*id*/, const GitAuthor& author, const GitCommit& commit) {
2515 20 : if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
2516 : // Filter author
2517 0 : return CallbackResult::Skip;
2518 : }
2519 20 : auto commitTime = git_commit_time(commit.get());
2520 20 : if (filter.before && filter.before < commitTime) {
2521 : // Only get commits before this date
2522 0 : return CallbackResult::Skip;
2523 : }
2524 20 : if (filter.after && filter.after > commitTime) {
2525 : // Only get commits before this date
2526 0 : if (git_commit_parentcount(commit.get()) <= 1)
2527 0 : return CallbackResult::Break;
2528 : else
2529 0 : return CallbackResult::Skip; // Because we are sorting it with
2530 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2531 : }
2532 :
2533 20 : return CallbackResult::Ok; // Continue
2534 : },
2535 20 : [&](ConversationCommit&& cc) {
2536 20 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2537 40 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2538 20 : },
2539 20 : [&](const std::string& id, const GitAuthor&, ConversationCommit&) {
2540 20 : if (id == filter.lastId)
2541 0 : return true;
2542 20 : return false;
2543 : },
2544 : "",
2545 : false);
2546 : // Search on generated history
2547 24 : for (auto& message : history.messageList) {
2548 20 : auto contentType = message->type;
2549 20 : auto isSearchable = contentType == "text/plain" || contentType == "application/data-transfer+json";
2550 20 : if (filter.type.empty() && !isSearchable) {
2551 : // Not searchable, at least for now
2552 8 : continue;
2553 12 : } else if (contentType == filter.type || filter.type.empty()) {
2554 12 : if (isSearchable) {
2555 : // If it's a text match the body, else the display name
2556 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2557 36 : : message->body.at("displayName");
2558 12 : std::smatch body_match;
2559 12 : if (std::regex_search(body, body_match, re)) {
2560 5 : auto commit = message->body;
2561 5 : commit["id"] = message->id;
2562 5 : commit["type"] = message->type;
2563 5 : commits.emplace_back(commit);
2564 5 : }
2565 12 : } else {
2566 : // Matching type, just add it to the results
2567 0 : commits.emplace_back(message->body);
2568 : }
2569 :
2570 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2571 0 : break;
2572 : }
2573 20 : }
2574 :
2575 4 : if (commits.size() > 0)
2576 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2577 3 : sthis->pimpl_->accountId_,
2578 6 : sthis->id(),
2579 3 : std::move(commits));
2580 : // If we're the latest thread, inform client that the search is finished
2581 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2582 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2583 8 : req, sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2584 : }
2585 8 : }
2586 4 : });
2587 4 : }
2588 :
2589 : void
2590 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2591 : {
2592 14 : if (!message.isMember("confId")) {
2593 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2594 0 : return;
2595 : }
2596 :
2597 14 : auto now = std::chrono::system_clock::now();
2598 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2599 : {
2600 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2601 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2602 14 : pimpl_->saveHostedCalls();
2603 14 : }
2604 :
2605 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2606 : }
2607 :
2608 : bool
2609 20 : Conversation::isHosting(const std::string& confId) const
2610 : {
2611 20 : auto info = infos();
2612 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2613 0 : return true; // We are the current device Host
2614 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2615 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2616 20 : }
2617 :
2618 : void
2619 10 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2620 : {
2621 10 : if (!message.isMember("confId")) {
2622 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2623 0 : return;
2624 : }
2625 :
2626 10 : auto erased = false;
2627 : {
2628 10 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2629 10 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2630 10 : }
2631 10 : if (erased) {
2632 10 : pimpl_->saveHostedCalls();
2633 10 : sendMessage(std::move(message), "", {}, std::move(cb));
2634 : } else
2635 0 : cb(false, "");
2636 : }
2637 :
2638 : std::vector<std::map<std::string, std::string>>
2639 38 : Conversation::currentCalls() const
2640 : {
2641 38 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2642 76 : return pimpl_->activeCalls_;
2643 38 : }
2644 : } // namespace jami
|