Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/ring_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #ifdef ENABLE_PLUGIN
25 : #include "manager.h"
26 : #include "plugin/jamipluginmanager.h"
27 : #include "plugin/streamdata.h"
28 : #endif
29 : #include "jami/conversation_interface.h"
30 :
31 : #include "fileutils.h"
32 : #include "json_utils.h"
33 :
34 : #include <opendht/thread_pool.h>
35 : #include <fmt/compile.h>
36 :
37 : #include <charconv>
38 : #include <string_view>
39 : #include <tuple>
40 : #include <optional>
41 :
42 : namespace jami {
43 :
44 : static const char* const LAST_MODIFIED = "lastModified";
45 :
46 13 : ConvInfo::ConvInfo(const Json::Value& json)
47 : {
48 13 : id = json[ConversationMapKeys::ID].asString();
49 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
50 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
51 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
52 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
53 20 : members.emplace(v["uri"].asString());
54 : }
55 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
56 13 : }
57 :
58 : Json::Value
59 25 : ConvInfo::toJson() const
60 : {
61 25 : Json::Value json;
62 25 : json[ConversationMapKeys::ID] = id;
63 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
64 25 : if (removed) {
65 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
66 : }
67 25 : if (erased) {
68 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
69 : }
70 64 : for (const auto& m : members) {
71 39 : Json::Value member;
72 39 : member["uri"] = m;
73 39 : json[ConversationMapKeys::MEMBERS].append(member);
74 39 : }
75 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
76 25 : return json;
77 0 : }
78 :
79 : // ConversationRequest
80 259 : ConversationRequest::ConversationRequest(const Json::Value& json)
81 : {
82 259 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
83 259 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
84 259 : from = json[ConversationMapKeys::FROM].asString();
85 259 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
86 259 : auto& md = json[ConversationMapKeys::METADATAS];
87 521 : for (const auto& member : md.getMemberNames()) {
88 262 : metadatas.emplace(member, md[member].asString());
89 259 : }
90 259 : }
91 :
92 : Json::Value
93 2 : ConversationRequest::toJson() const
94 : {
95 2 : Json::Value json;
96 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
97 2 : json[ConversationMapKeys::FROM] = from;
98 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
99 2 : if (declined)
100 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
101 4 : for (const auto& [key, value] : metadatas) {
102 2 : json[ConversationMapKeys::METADATAS][key] = value;
103 : }
104 2 : return json;
105 0 : }
106 :
107 : std::map<std::string, std::string>
108 220 : ConversationRequest::toMap() const
109 : {
110 220 : auto result = metadatas;
111 220 : result[ConversationMapKeys::ID] = conversationId;
112 220 : result[ConversationMapKeys::FROM] = from;
113 220 : if (declined)
114 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
115 220 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
116 220 : return result;
117 0 : }
118 :
119 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
120 :
121 : struct History
122 : {
123 : // While loading the history, we need to avoid:
124 : // - reloading history (can just be ignored)
125 : // - adding new commits (should wait for history to be loaded)
126 : std::mutex mutex {};
127 : std::condition_variable cv {};
128 : bool loading {false};
129 : MessageList messageList {};
130 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
131 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
132 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
133 : };
134 :
135 : class Conversation::Impl
136 : {
137 : public:
138 183 : Impl(const std::shared_ptr<JamiAccount>& account,
139 : ConversationMode mode,
140 : const std::string& otherMember = "")
141 183 : : repository_(ConversationRepository::createConversation(account, mode, otherMember))
142 183 : , account_(account)
143 183 : , accountId_(account->getAccountID())
144 183 : , userId_(account->getUsername())
145 549 : , deviceId_(account->currentDeviceId())
146 : {
147 183 : if (!repository_) {
148 0 : throw std::logic_error("Unable to create repository");
149 : }
150 183 : init(account);
151 183 : }
152 :
153 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
154 18 : : account_(account)
155 18 : , accountId_(account->getAccountID())
156 18 : , userId_(account->getUsername())
157 36 : , deviceId_(account->currentDeviceId())
158 : {
159 18 : repository_ = std::make_unique<ConversationRepository>(account, conversationId);
160 18 : if (!repository_) {
161 0 : throw std::logic_error("Unable to create repository");
162 : }
163 18 : init(account);
164 18 : }
165 :
166 189 : Impl(const std::shared_ptr<JamiAccount>& account,
167 : const std::string& remoteDevice,
168 : const std::string& conversationId)
169 189 : : account_(account)
170 189 : , accountId_(account->getAccountID())
171 189 : , userId_(account->getUsername())
172 378 : , deviceId_(account->currentDeviceId())
173 : {
174 189 : std::vector<ConversationCommit> commits;
175 378 : repository_ = ConversationRepository::cloneConversation(account,
176 : remoteDevice,
177 : conversationId,
178 184 : [&](auto c) {
179 184 : commits = std::move(c);
180 188 : });
181 188 : if (!repository_) {
182 8 : emitSignal<libjami::ConversationSignal::OnConversationError>(
183 4 : accountId_, conversationId, EFETCH, "Unable to clone repository");
184 4 : throw std::logic_error("Unable to clone repository");
185 : }
186 : // To detect current active calls, we need to check history
187 368 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
188 552 : / "conversation_data" / conversationId;
189 184 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
190 184 : initActiveCalls(repository_->convCommitsToMap(commits));
191 184 : init(account);
192 344 : }
193 :
194 385 : void init(const std::shared_ptr<JamiAccount>& account) {
195 385 : ioContext_ = Manager::instance().ioContext();
196 385 : fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
197 : swarmManager_
198 770 : = std::make_shared<SwarmManager>(NodeId(deviceId_),
199 770 : Manager::instance().getSeededRandomEngine(),
200 385 : [account = account_](const DeviceId& deviceId) {
201 2626 : if (auto acc = account.lock()) {
202 2626 : return acc->isConnectedWith(deviceId);
203 2626 : }
204 0 : return false;
205 385 : });
206 385 : swarmManager_->setMobility(account->isMobile());
207 : transferManager_
208 385 : = std::make_shared<TransferManager>(accountId_,
209 : "",
210 385 : repository_->id(),
211 770 : Manager::instance().getSeededRandomEngine());
212 770 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
213 1155 : / "conversation_data" / repository_->id();
214 385 : fetchedPath_ = conversationDataPath_ / "fetched";
215 385 : statusPath_ = conversationDataPath_ / "status";
216 385 : sendingPath_ = conversationDataPath_ / "sending";
217 385 : preferencesPath_ = conversationDataPath_ / ConversationMapKeys::PREFERENCES;
218 385 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
219 385 : hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
220 385 : loadActiveCalls();
221 385 : loadStatus();
222 385 : typers_ = std::make_shared<Typers>(account, repository_->id());
223 385 : }
224 :
225 3791 : std::string toString() const {
226 11373 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
227 : }
228 : mutable std::string fmtStr_;
229 :
230 385 : ~Impl()
231 : {
232 : try {
233 385 : if (fallbackTimer_)
234 385 : fallbackTimer_->cancel();
235 0 : } catch (const std::exception& e) {
236 0 : JAMI_ERROR("{:s} {:s}", toString(), e.what());
237 0 : }
238 385 : }
239 :
240 : /**
241 : * If, for whatever reason, the daemon is stopped while hosting a conference,
242 : * we need to announce the end of this call when restarting.
243 : * To avoid to keep active calls forever.
244 : */
245 : std::vector<std::string> commitsEndedCalls();
246 : bool isAdmin() const;
247 : std::filesystem::path repoPath() const;
248 :
249 282 : void announce(const std::string& commitId, bool commitFromSelf = false)
250 : {
251 282 : std::vector<std::string> vec;
252 282 : if (!commitId.empty())
253 280 : vec.emplace_back(commitId);
254 282 : announce(vec, commitFromSelf);
255 282 : }
256 :
257 295 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
258 : {
259 295 : std::vector<ConversationCommit> convcommits;
260 295 : convcommits.reserve(commits.size());
261 601 : for (const auto& cid : commits) {
262 306 : auto commit = repository_->getCommit(cid);
263 306 : if (commit != std::nullopt) {
264 306 : convcommits.emplace_back(*commit);
265 : }
266 306 : }
267 295 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
268 295 : }
269 :
270 : /**
271 : * Initialize activeCalls_ from the list of commits in the repository
272 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
273 : */
274 184 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
275 : {
276 184 : std::unordered_set<std::string> invalidHostUris;
277 184 : std::unordered_set<std::string> invalidCallIds;
278 :
279 184 : std::lock_guard lk(activeCallsMtx_);
280 1160 : for (const auto& commit : commits) {
281 976 : if (commit.at("type") == "member") {
282 : // Each commit of type "member" has an "action" field whose value can be one
283 : // of the following: "add", "join", "remove", "ban", "unban"
284 : // In the case of "remove" and "ban", we need to add the member's URI to
285 : // invalidHostUris to ensure that any call they may have started in the past
286 : // is no longer considered active.
287 : // For the other actions, there's no harm in adding the member's URI anyway,
288 : // since it's not possible to start hosting a call before joining the swarm (or
289 : // before getting unbanned in the case of previously banned members).
290 771 : invalidHostUris.emplace(commit.at("uri"));
291 411 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
292 411 : && commit.find("device") != commit.end()) {
293 : // The commit indicates either the end or the beginning of a call
294 : // (depending on whether there is a "duration" field or not).
295 1 : auto convId = repository_->id();
296 2 : auto confId = commit.at("confId");
297 2 : auto uri = commit.at("uri");
298 2 : auto device = commit.at("device");
299 :
300 2 : if (commit.find("duration") == commit.end()
301 1 : && invalidCallIds.find(confId) == invalidCallIds.end()
302 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
303 1 : std::map<std::string, std::string> activeCall;
304 1 : activeCall["id"] = confId;
305 1 : activeCall["uri"] = uri;
306 1 : activeCall["device"] = device;
307 1 : activeCalls_.emplace_back(activeCall);
308 0 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
309 : convId,
310 : confId,
311 : device,
312 : uri);
313 1 : }
314 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
315 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
316 : // there are sometimes multiple commits indicating the beginning of the same call.)
317 1 : invalidCallIds.emplace(confId);
318 1 : }
319 : }
320 184 : saveActiveCalls();
321 184 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
322 184 : repository_->id(),
323 184 : activeCalls_);
324 184 : }
325 :
326 : /**
327 : * Update activeCalls_ via announced commits (in load or via new commits)
328 : * @param commit Commit to check
329 : * @param eraseOnly If we want to ignore added commits
330 : * @param emitSig If we want to emit to client
331 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
332 : */
333 79 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
334 : bool eraseOnly = false,
335 : bool emitSig = true) const
336 : {
337 79 : if (!repository_)
338 0 : return;
339 79 : if (commit.at("type") == "member") {
340 : // In this case, we need to check if we are not removing a hosting member or device
341 19 : std::lock_guard lk(activeCallsMtx_);
342 19 : auto it = activeCalls_.begin();
343 19 : auto updateActives = false;
344 20 : while (it != activeCalls_.end()) {
345 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
346 3 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
347 : it->at("id"),
348 : commit.at("uri"));
349 1 : it = activeCalls_.erase(it);
350 1 : updateActives = true;
351 : } else {
352 0 : ++it;
353 : }
354 : }
355 19 : if (updateActives) {
356 1 : saveActiveCalls();
357 1 : if (emitSig)
358 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
359 1 : repository_->id(),
360 1 : activeCalls_);
361 : }
362 19 : return;
363 19 : }
364 : // Else, it's a call information
365 177 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
366 177 : && commit.find("device") != commit.end()) {
367 57 : auto convId = repository_->id();
368 114 : auto confId = commit.at("confId");
369 114 : auto uri = commit.at("uri");
370 114 : auto device = commit.at("device");
371 57 : std::lock_guard lk(activeCallsMtx_);
372 57 : auto itActive = std::find_if(activeCalls_.begin(),
373 : activeCalls_.end(),
374 26 : [&](const auto& value) {
375 78 : return value.at("id") == confId
376 52 : && value.at("uri") == uri
377 130 : && value.at("device") == device;
378 : });
379 57 : if (commit.find("duration") == commit.end()) {
380 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
381 93 : JAMI_DEBUG(
382 : "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
383 : convId,
384 : confId,
385 : device,
386 : uri);
387 31 : std::map<std::string, std::string> activeCall;
388 31 : activeCall["id"] = confId;
389 31 : activeCall["uri"] = uri;
390 31 : activeCall["device"] = device;
391 31 : activeCalls_.emplace_back(activeCall);
392 31 : saveActiveCalls();
393 31 : if (emitSig)
394 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
395 : repository_
396 31 : ->id(),
397 31 : activeCalls_);
398 31 : }
399 : } else {
400 26 : if (itActive != activeCalls_.end()) {
401 26 : itActive = activeCalls_.erase(itActive);
402 : // Unlikely, but we must ensure that no duplicate exists
403 26 : while (itActive != activeCalls_.end()) {
404 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
405 0 : return value.at("id") == confId && value.at("uri") == uri
406 0 : && value.at("device") == device;
407 : });
408 0 : if (itActive != activeCalls_.end()) {
409 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
410 0 : itActive = activeCalls_.erase(itActive);
411 : }
412 : }
413 :
414 26 : if (eraseOnly) {
415 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
416 : "{:s}, account {:s}",
417 : convId,
418 : confId,
419 : device,
420 : uri);
421 : } else {
422 78 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
423 : convId,
424 : confId,
425 : device,
426 : uri);
427 : }
428 : }
429 26 : saveActiveCalls();
430 26 : if (emitSig)
431 26 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
432 26 : repository_->id(),
433 26 : activeCalls_);
434 : }
435 57 : }
436 : }
437 :
438 1194 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
439 : {
440 1194 : if (!repository_)
441 0 : return;
442 1194 : auto convId = repository_->id();
443 1194 : auto ok = !commits.empty();
444 3580 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
445 1194 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
446 1194 : if (ok) {
447 1192 : bool announceMember = false;
448 2431 : for (const auto& c : commits) {
449 : // Announce member events
450 1239 : if (c.at("type") == "member") {
451 926 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
452 926 : const auto& uri = c.at("uri");
453 926 : const auto& actionStr = c.at("action");
454 926 : auto action = -1;
455 926 : if (actionStr == "add")
456 441 : action = 0;
457 485 : else if (actionStr == "join")
458 465 : action = 1;
459 20 : else if (actionStr == "remove")
460 1 : action = 2;
461 19 : else if (actionStr == "ban")
462 18 : action = 3;
463 1 : else if (actionStr == "unban")
464 1 : action = 4;
465 926 : if (actionStr == "ban" || actionStr == "remove") {
466 : // In this case, a potential host was removed during a call.
467 19 : updateActiveCalls(c);
468 19 : typers_->removeTyper(uri);
469 : }
470 926 : if (action != -1) {
471 926 : announceMember = true;
472 1852 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(
473 926 : accountId_, convId, uri, action);
474 : }
475 : }
476 313 : } else if (c.at("type") == "application/call-history+json") {
477 60 : updateActiveCalls(c);
478 : }
479 : #ifdef ENABLE_PLUGIN
480 : auto& pluginChatManager
481 1239 : = Manager::instance().getJamiPluginManager().getChatServicesManager();
482 1239 : if (pluginChatManager.hasHandlers()) {
483 4 : auto cm = std::make_shared<JamiMessage>(accountId_,
484 : convId,
485 8 : c.at("author") != userId_,
486 : c,
487 8 : false);
488 4 : cm->isSwarm = true;
489 4 : pluginChatManager.publishMessage(std::move(cm));
490 4 : }
491 : #endif
492 : // announce message
493 1239 : emitSignal<libjami::ConversationSignal::MessageReceived>(accountId_, convId, c);
494 : }
495 :
496 1192 : if (announceMember && onMembersChanged_) {
497 917 : onMembersChanged_(repository_->memberUris("", {}));
498 : }
499 : }
500 1194 : }
501 :
502 385 : void loadStatus()
503 : {
504 : try {
505 : // read file
506 768 : auto file = fileutils::loadFile(statusPath_);
507 : // load values
508 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
509 2 : std::lock_guard lk {messageStatusMtx_};
510 2 : oh.get().convert(messagesStatus_);
511 385 : } catch (const std::exception& e) {
512 383 : }
513 385 : }
514 1110 : void saveStatus()
515 : {
516 1110 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
517 1110 : msgpack::pack(file, messagesStatus_);
518 1110 : }
519 :
520 403 : void loadActiveCalls() const
521 : {
522 : try {
523 : // read file
524 794 : auto file = fileutils::loadFile(activeCallsPath_);
525 : // load values
526 12 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
527 12 : std::lock_guard lk {activeCallsMtx_};
528 12 : oh.get().convert(activeCalls_);
529 403 : } catch (const std::exception& e) {
530 391 : return;
531 391 : }
532 : }
533 :
534 260 : void saveActiveCalls() const
535 : {
536 260 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
537 260 : msgpack::pack(file, activeCalls_);
538 260 : }
539 :
540 18 : void loadHostedCalls() const
541 : {
542 : try {
543 : // read file
544 30 : auto file = fileutils::loadFile(hostedCallsPath_);
545 : // load values
546 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
547 6 : std::lock_guard lk {activeCallsMtx_};
548 6 : oh.get().convert(hostedCalls_);
549 18 : } catch (const std::exception& e) {
550 12 : return;
551 12 : }
552 : }
553 :
554 43 : void saveHostedCalls() const
555 : {
556 43 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
557 43 : msgpack::pack(file, hostedCalls_);
558 43 : }
559 :
560 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
561 :
562 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
563 : bool includeLeft,
564 : bool includeBanned) const;
565 :
566 6435 : std::string_view bannedType(const std::string& uri) const
567 : {
568 6435 : auto repo = repoPath();
569 0 : auto crt = fmt::format("{}.crt", uri);
570 12870 : auto bannedMember = repo / "banned" / "members" / crt;
571 6435 : if (std::filesystem::is_regular_file(bannedMember))
572 14 : return "members"sv;
573 12842 : auto bannedAdmin = repo / "banned" / "admins" / crt;
574 6421 : if (std::filesystem::is_regular_file(bannedAdmin))
575 0 : return "admins"sv;
576 12842 : auto bannedInvited = repo / "banned" / "invited" / uri;
577 6421 : if (std::filesystem::is_regular_file(bannedInvited))
578 0 : return "invited"sv;
579 12842 : auto bannedDevice = repo / "banned" / "devices" / crt;
580 6421 : if (std::filesystem::is_regular_file(bannedDevice))
581 0 : return "devices"sv;
582 6421 : return {};
583 6435 : }
584 :
585 4347 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
586 : {
587 4347 : auto deviceSockets = gitSocketList_.find(deviceId);
588 8694 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
589 : }
590 :
591 1954 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
592 : {
593 1954 : gitSocketList_[deviceId] = socket;
594 1954 : }
595 795 : void removeGitSocket(const DeviceId& deviceId)
596 : {
597 795 : auto deviceSockets = gitSocketList_.find(deviceId);
598 795 : if (deviceSockets != gitSocketList_.end())
599 402 : gitSocketList_.erase(deviceSockets);
600 795 : }
601 :
602 : /**
603 : * Remove all git sockets and all DRT nodes associated with the given peer.
604 : * This is used when a swarm member is banned to ensure that we stop syncing
605 : * with them or sending them message notifications.
606 : */
607 : void disconnectFromPeer(const std::string& peerUri);
608 :
609 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
610 : bool includeLeft) const;
611 :
612 : std::mutex membersMtx_ {};
613 : std::set<std::string> checkedMembers_; // Store members we tried
614 : std::function<void()> bootstrapCb_;
615 : std::mutex bootstrapMtx_ {};
616 : #ifdef LIBJAMI_TEST
617 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
618 : #endif
619 :
620 : std::mutex writeMtx_ {};
621 : std::unique_ptr<ConversationRepository> repository_;
622 : std::shared_ptr<SwarmManager> swarmManager_;
623 : std::weak_ptr<JamiAccount> account_;
624 : std::string accountId_ {};
625 : std::string userId_;
626 : std::string deviceId_;
627 : std::atomic_bool isRemoving_ {false};
628 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
629 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
630 : History* optHistory = nullptr);
631 : void pull(const std::string& deviceId);
632 : std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
633 :
634 : // Avoid multiple fetch/merges at the same time.
635 : std::mutex pullcbsMtx_ {};
636 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch
637 : std::shared_ptr<TransferManager> transferManager_ {};
638 : std::filesystem::path conversationDataPath_ {};
639 : std::filesystem::path fetchedPath_ {};
640 :
641 : // Manage last message displayed and status
642 : std::filesystem::path sendingPath_ {};
643 : std::filesystem::path preferencesPath_ {};
644 : OnMembersChanged onMembersChanged_ {};
645 :
646 : // Manage hosted calls on this device
647 : std::filesystem::path hostedCallsPath_ {};
648 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
649 : // Manage active calls for this conversation (can be hosted by other devices)
650 : std::filesystem::path activeCallsPath_ {};
651 : mutable std::mutex activeCallsMtx_ {};
652 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
653 :
654 : GitSocketList gitSocketList_ {};
655 :
656 : // Bootstrap
657 : std::shared_ptr<asio::io_context> ioContext_;
658 : std::unique_ptr<asio::steady_timer> fallbackTimer_;
659 :
660 :
661 : /**
662 : * Loaded history represents the linearized history to show for clients
663 : */
664 : History loadedHistory_ {};
665 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
666 : History& history,
667 : const std::vector<std::map<std::string, std::string>>& commits,
668 : bool messageReceived = false,
669 : bool commitFromSelf = false);
670 :
671 : void handleReaction(History& history,
672 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
673 : void handleEdition(History& history,
674 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
675 : bool messageReceived) const;
676 : bool handleMessage(History& history,
677 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
678 : bool messageReceived) const;
679 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
680 : History& history) const;
681 : /**
682 : * {uri, {
683 : * {"fetch", "commitId"},
684 : * {"fetched_ts", "timestamp"},
685 : * {"read", "commitId"},
686 : * {"read_ts", "timestamp"}
687 : * }
688 : * }
689 : */
690 : mutable std::mutex messageStatusMtx_;
691 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
692 : std::filesystem::path statusPath_ {};
693 : mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
694 : /**
695 : * Status: 0 = commited, 1 = fetched, 2 = read
696 : * This cache the curent status to add in the messages
697 : */
698 : // Note: only store int32_t cause it's easy to pass to dbus this way
699 : // memberToStatus serves as a cache for loading messages
700 : mutable std::map<std::string, int32_t> memberToStatus;
701 :
702 :
703 : // futureStatus is used to store the status for receiving messages
704 : // (because we're not sure to fetch the commit before receiving a status change for this)
705 : mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
706 : // Update internal structures regarding status
707 : void updateStatus(const std::string& uri,
708 : libjami::Account::MessageStates status,
709 : const std::string& commitId,
710 : const std::string& ts,
711 : bool emit = false);
712 :
713 :
714 : std::shared_ptr<Typers> typers_;
715 : };
716 :
717 : bool
718 17 : Conversation::Impl::isAdmin() const
719 : {
720 34 : auto adminsPath = repoPath() / "admins";
721 34 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
722 17 : }
723 :
724 : void
725 16 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
726 : {
727 : // Remove nodes from swarmManager
728 16 : const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
729 16 : std::vector<NodeId> toRemove;
730 38 : for (const auto node : nodes)
731 22 : if (peerUri == repository_->uriFromDevice(node.toString()))
732 11 : toRemove.emplace_back(node);
733 16 : swarmManager_->deleteNode(toRemove);
734 :
735 : // Remove git sockets with this member
736 37 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
737 21 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
738 11 : it = gitSocketList_.erase(it);
739 : else
740 10 : ++it;
741 : }
742 16 : }
743 :
744 : std::vector<std::map<std::string, std::string>>
745 416 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
746 : {
747 416 : std::vector<std::map<std::string, std::string>> result;
748 416 : auto members = repository_->members();
749 416 : std::lock_guard lk(messageStatusMtx_);
750 1132 : for (const auto& member : members) {
751 716 : if (member.role == MemberRole::BANNED && !includeBanned) {
752 170 : continue;
753 : }
754 716 : if (member.role == MemberRole::INVITED && !includeInvited)
755 170 : continue;
756 546 : if (member.role == MemberRole::LEFT && !includeLeft)
757 0 : continue;
758 546 : auto mm = member.map();
759 546 : mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
760 546 : result.emplace_back(std::move(mm));
761 546 : }
762 832 : return result;
763 416 : }
764 :
765 : std::vector<std::string>
766 18 : Conversation::Impl::commitsEndedCalls()
767 : {
768 : // Handle current calls
769 18 : std::vector<std::string> commits {};
770 18 : std::unique_lock lk(writeMtx_);
771 18 : std::unique_lock lkA(activeCallsMtx_);
772 18 : for (const auto& hostedCall : hostedCalls_) {
773 : // In this case, this means that we left
774 : // the conference while still hosting it, so activeCalls
775 : // will not be correctly updated
776 : // We don't need to send notifications there, as peers will sync with presence
777 0 : Json::Value value;
778 0 : value["uri"] = userId_;
779 0 : value["device"] = deviceId_;
780 0 : value["confId"] = hostedCall.first;
781 0 : value["type"] = "application/call-history+json";
782 0 : auto now = std::chrono::system_clock::now();
783 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
784 0 : .count();
785 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
786 0 : auto itActive = std::find_if(activeCalls_.begin(),
787 : activeCalls_.end(),
788 0 : [this, confId = hostedCall.first](const auto& value) {
789 0 : return value.at("id") == confId && value.at("uri") == userId_
790 0 : && value.at("device") == deviceId_;
791 : });
792 0 : if (itActive != activeCalls_.end())
793 0 : activeCalls_.erase(itActive);
794 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
795 :
796 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
797 0 : }
798 18 : hostedCalls_.clear();
799 18 : saveActiveCalls();
800 18 : saveHostedCalls();
801 36 : return commits;
802 18 : }
803 :
804 : std::filesystem::path
805 9689 : Conversation::Impl::repoPath() const
806 : {
807 19378 : return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
808 : }
809 :
810 : std::vector<std::map<std::string, std::string>>
811 13 : Conversation::Impl::loadMessages(const LogOptions& options)
812 : {
813 13 : if (!repository_)
814 0 : return {};
815 13 : std::vector<ConversationCommit> commits;
816 13 : auto startLogging = options.from == "";
817 13 : auto breakLogging = false;
818 26 : repository_->log(
819 57 : [&](const auto& id, const auto& author, const auto& commit) {
820 57 : if (!commits.empty()) {
821 : // Set linearized parent
822 44 : commits.rbegin()->linearized_parent = id;
823 : }
824 57 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
825 0 : return CallbackResult::Skip;
826 : }
827 57 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
828 0 : return CallbackResult::Break; // Stop logging
829 57 : if (breakLogging)
830 0 : return CallbackResult::Break; // Stop logging
831 57 : if (id == options.to) {
832 13 : if (options.includeTo)
833 0 : breakLogging = true; // For the next commit
834 : else
835 13 : return CallbackResult::Break; // Stop logging
836 : }
837 :
838 44 : if (!startLogging && options.from != "" && options.from == id)
839 0 : startLogging = true;
840 44 : if (!startLogging)
841 0 : return CallbackResult::Skip; // Start logging after this one
842 :
843 44 : if (options.fastLog) {
844 0 : if (options.authorUri != "") {
845 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
846 0 : return CallbackResult::Break; // Found author, stop
847 : }
848 : }
849 : // Used to only count commit
850 0 : commits.emplace(commits.end(), ConversationCommit {});
851 0 : return CallbackResult::Skip;
852 : }
853 :
854 44 : return CallbackResult::Ok; // Continue
855 : },
856 44 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
857 44 : [](auto, auto, auto) { return false; },
858 13 : options.from,
859 13 : options.logIfNotFound);
860 13 : return repository_->convCommitsToMap(commits);
861 13 : }
862 :
863 : std::vector<libjami::SwarmMessage>
864 1525 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
865 : {
866 1525 : auto history = optHistory ? optHistory : &loadedHistory_;
867 :
868 : // history->mutex is locked by the caller
869 1525 : if (!repository_ || history->loading) {
870 0 : return {};
871 : }
872 1525 : history->loading = true;
873 :
874 : // By convention, if options.nbOfCommits is zero, then we
875 : // don't impose a limit on the number of commits returned.
876 1525 : bool limitNbOfCommits = options.nbOfCommits > 0;
877 :
878 1525 : auto startLogging = options.from == "";
879 1525 : auto breakLogging = false;
880 1525 : auto currentHistorySize = loadedHistory_.messageList.size();
881 1525 : std::vector<std::string> replies;
882 1525 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
883 3050 : repository_->log(
884 : /* preCondition */
885 6764 : [&](const auto& id, const auto& author, const auto& commit) {
886 6764 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
887 1 : return CallbackResult::Skip;
888 : }
889 6763 : if (id == options.to) {
890 731 : if (options.includeTo)
891 0 : breakLogging = true; // For the next commit
892 : }
893 6763 : if (replies.empty()) { // This avoid load until
894 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
895 : // until this commit
896 13524 : if ((limitNbOfCommits
897 7460 : && (loadedHistory_.messageList.size() - currentHistorySize)
898 698 : == options.nbOfCommits))
899 5 : return CallbackResult::Break; // Stop logging
900 6757 : if (breakLogging)
901 0 : return CallbackResult::Break; // Stop logging
902 6757 : if (id == options.to && !options.includeTo) {
903 730 : return CallbackResult::Break; // Stop logging
904 : }
905 : }
906 :
907 6028 : if (!startLogging && options.from != "" && options.from == id)
908 1108 : startLogging = true;
909 6028 : if (!startLogging)
910 25 : return CallbackResult::Skip; // Start logging after this one
911 :
912 6003 : if (options.fastLog) {
913 5302 : if (options.authorUri != "") {
914 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
915 1 : return CallbackResult::Break; // Found author, stop
916 : }
917 : }
918 : }
919 :
920 6002 : return CallbackResult::Ok; // Continue
921 : },
922 : /* emplaceCb */
923 6002 : [&](auto&& cc) {
924 6002 : if(limitNbOfCommits && (msgList.size() == options.nbOfCommits))
925 285 : return;
926 5718 : auto optMessage = repository_->convCommitToMap(cc);
927 5718 : if (!optMessage.has_value())
928 1 : return;
929 5717 : auto message = optMessage.value();
930 5717 : if (message.find("reply-to") != message.end()) {
931 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
932 1 : if(it == replies.end()) {
933 1 : replies.emplace_back(message.at("reply-to"));
934 : }
935 : }
936 5717 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
937 5717 : if (it != replies.end()) {
938 1 : replies.erase(it);
939 : }
940 5717 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
941 5717 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
942 0 : firstMsg = *loadedHistory_.messageList.rbegin();
943 : }
944 17151 : auto added = addToHistory(*history, {message}, false, false);
945 5717 : if (!added.empty() && firstMsg) {
946 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
947 0 : repository_->id(),
948 0 : *firstMsg);
949 : }
950 5717 : msgList.insert(msgList.end(), added.begin(), added.end());
951 5718 : },
952 : /* postCondition */
953 6002 : [&](auto, auto, auto) {
954 : // Stop logging if there was a limit set on the number of commits
955 : // to return and we reached it. This isn't strictly necessary since
956 : // the check at the beginning of `emplaceCb` ensures that we won't
957 : // return too many messages, but it prevents us from needlessly
958 : // iterating over a (potentially) large number of commits.
959 6002 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
960 : },
961 1525 : options.from,
962 1525 : options.logIfNotFound);
963 :
964 1525 : history->loading = false;
965 1525 : history->cv.notify_all();
966 :
967 : // Convert for client (remove ptr)
968 1525 : std::vector<libjami::SwarmMessage> ret;
969 1525 : ret.reserve(msgList.size());
970 7236 : for (const auto& msg: msgList) {
971 5711 : ret.emplace_back(*msg);
972 : }
973 1525 : return ret;
974 1525 : }
975 :
976 : void
977 3 : Conversation::Impl::handleReaction(History& history,
978 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
979 : {
980 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
981 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
982 3 : if (peditIt != history.pendingEditions.end()) {
983 0 : auto oldBody = sharedCommit->body;
984 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
985 0 : if (sharedCommit->body.at("body").empty())
986 0 : return;
987 0 : history.pendingEditions.erase(peditIt);
988 0 : }
989 3 : if (it != history.quickAccess.end()) {
990 3 : it->second->reactions.emplace_back(sharedCommit->body);
991 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
992 3 : repository_->id(),
993 3 : it->second->id,
994 3 : sharedCommit->body);
995 : } else {
996 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
997 : }
998 : }
999 :
1000 : void
1001 8 : Conversation::Impl::handleEdition(History& history,
1002 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1003 : bool messageReceived) const
1004 : {
1005 16 : auto editId = sharedCommit->body.at("edit");
1006 8 : auto it = history.quickAccess.find(editId);
1007 8 : if (it != history.quickAccess.end()) {
1008 6 : auto baseCommit = it->second;
1009 6 : if (baseCommit) {
1010 6 : auto itReact = baseCommit->body.find("react-to");
1011 6 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
1012 12 : "tid" : "body";
1013 6 : auto body = sharedCommit->body.at(toReplace);
1014 : // Edit reaction
1015 6 : if (itReact != baseCommit->body.end()) {
1016 1 : baseCommit->body[toReplace] = body; // Replace body if pending
1017 1 : it = history.quickAccess.find(itReact->second);
1018 1 : auto itPending = history.pendingReactions.find(itReact->second);
1019 1 : if (it != history.quickAccess.end()) {
1020 1 : baseCommit = it->second; // Base commit
1021 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1022 1 : baseCommit->reactions.end(),
1023 1 : [&](const auto& reaction) {
1024 1 : return reaction.at("id") == editId;
1025 : });
1026 1 : if (itPreviousReact != baseCommit->reactions.end()) {
1027 1 : (*itPreviousReact)[toReplace] = body;
1028 1 : if (body.empty()) {
1029 1 : baseCommit->reactions.erase(itPreviousReact);
1030 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
1031 : repository_
1032 1 : ->id(),
1033 1 : baseCommit->id,
1034 : editId);
1035 : }
1036 : }
1037 0 : } else if (itPending != history.pendingReactions.end()) {
1038 : // Else edit if pending
1039 0 : auto itReaction = std::find_if(itPending->second.begin(),
1040 0 : itPending->second.end(),
1041 0 : [&](const auto& reaction) {
1042 0 : return reaction.at("id") == editId;
1043 : });
1044 0 : if (itReaction != itPending->second.end()) {
1045 0 : (*itReaction)[toReplace] = body;
1046 0 : if (body.empty())
1047 0 : itPending->second.erase(itReaction);
1048 : }
1049 : } else {
1050 : // Add to pending edtions
1051 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1052 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1053 : }
1054 : } else {
1055 : // Normal message
1056 5 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1057 5 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1058 5 : if (toReplace == "tid") {
1059 : // Avoid to replace fileId in client
1060 1 : it->second->body["fileId"] = "";
1061 : }
1062 : // Remove reactions
1063 5 : if (sharedCommit->body.at(toReplace).empty())
1064 2 : it->second->reactions.clear();
1065 5 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1066 : }
1067 6 : }
1068 6 : } else {
1069 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1070 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1071 : }
1072 8 : }
1073 :
1074 : bool
1075 6940 : Conversation::Impl::handleMessage(History& history,
1076 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1077 : bool messageReceived) const
1078 : {
1079 6940 : if (messageReceived) {
1080 : // For a received message, we place it at the beginning of the list
1081 1209 : if (!history.messageList.empty())
1082 967 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1083 1209 : history.messageList.emplace_front(sharedCommit);
1084 : } else {
1085 : // For a loaded message, we load from newest to oldest
1086 : // So we change the parent of the last message.
1087 5731 : if (!history.messageList.empty())
1088 4209 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1089 5731 : history.messageList.emplace_back(sharedCommit);
1090 : }
1091 : // Handle pending reactions/editions
1092 6940 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1093 6940 : if (reactIt != history.pendingReactions.end()) {
1094 0 : for (const auto& commitBody : reactIt->second)
1095 0 : sharedCommit->reactions.emplace_back(commitBody);
1096 0 : history.pendingReactions.erase(reactIt);
1097 : }
1098 6940 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1099 6940 : if (peditIt != history.pendingEditions.end()) {
1100 0 : auto oldBody = sharedCommit->body;
1101 0 : if (sharedCommit->type == "application/data-transfer+json") {
1102 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1103 0 : sharedCommit->body["fileId"] = "";
1104 : } else {
1105 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1106 : }
1107 0 : peditIt->second.pop_front();
1108 0 : for (const auto& commit : peditIt->second) {
1109 0 : sharedCommit->editions.emplace_back(commit->body);
1110 : }
1111 0 : sharedCommit->editions.emplace_back(oldBody);
1112 0 : history.pendingEditions.erase(peditIt);
1113 0 : }
1114 : // Announce to client
1115 6940 : if (messageReceived)
1116 2418 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
1117 1209 : repository_->id(),
1118 1209 : *sharedCommit);
1119 6940 : return !messageReceived;
1120 : }
1121 :
1122 6951 : void Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
1123 : History& history) const
1124 : {
1125 :
1126 6951 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1127 6951 : auto currentMessage = message;
1128 :
1129 18961 : while(parentIt != history.quickAccess.end()){
1130 12010 : const auto& parent = parentIt->second;
1131 12675 : for (const auto& [peer, value] : message->status) {
1132 11581 : auto parentStatusIt = parent->status.find(peer);
1133 11581 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1134 665 : parent->status[peer] = value;
1135 1330 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
1136 665 : accountId_,
1137 665 : repository_->id(),
1138 : peer,
1139 665 : parent->id,
1140 : value);
1141 : }
1142 10916 : else if(parentStatusIt->second >= value){
1143 10916 : break;
1144 : }
1145 : }
1146 12010 : currentMessage = parent;
1147 12010 : parentIt = history.quickAccess.find(parent->linearizedParent);
1148 : }
1149 6951 : }
1150 :
1151 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1152 6931 : Conversation::Impl::addToHistory(History& history,
1153 : const std::vector<std::map<std::string, std::string>>& commits,
1154 : bool messageReceived,
1155 : bool commitFromSelf)
1156 : {
1157 : //
1158 : // NOTE: This function makes the following assumptions on its arguments:
1159 : // - The messages in "history" are in reverse chronological order (newest message
1160 : // first, oldest message last).
1161 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1162 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1163 : // They are therefore inserted at the beginning of the message list.
1164 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1165 : // reverse chronological order (newest to oldest) and to be older than the ones in
1166 : // "history". They are therefore appended at the end of the message list.
1167 : //
1168 6931 : auto acc = account_.lock();
1169 6931 : if (!acc)
1170 0 : return {};
1171 6931 : auto username = acc->getUsername();
1172 6931 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1173 0 : std::unique_lock lk(history.mutex);
1174 0 : history.cv.wait(lk, [&] { return !history.loading; });
1175 0 : }
1176 :
1177 : // Only set messages' status on history for client
1178 6931 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1179 :
1180 6931 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1181 13907 : for (const auto& commit : commits) {
1182 13952 : auto commitId = commit.at("id");
1183 6976 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1184 8 : continue; // Already present
1185 6968 : auto typeIt = commit.find("type");
1186 : // Nothing to show for the client, skip
1187 6968 : if (typeIt != commit.end() && typeIt->second == "merge")
1188 17 : continue;
1189 :
1190 6951 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1191 6951 : sharedCommit->fromMapStringString(commit);
1192 :
1193 6951 : if (needToSetMessageStatus) {
1194 : // Check if we already have status information for the commit.
1195 920 : auto itFuture = futureStatus.find(sharedCommit->id);
1196 920 : if (itFuture != futureStatus.end()) {
1197 14 : sharedCommit->status = std::move(itFuture->second);
1198 14 : futureStatus.erase(itFuture);
1199 : }
1200 : }
1201 :
1202 6951 : sharedCommits.emplace_back(sharedCommit);
1203 6976 : }
1204 :
1205 6931 : if (needToSetMessageStatus) {
1206 907 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1207 907 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1208 907 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1209 :
1210 907 : std::lock_guard lk(messageStatusMtx_);
1211 13475 : for (const auto& member: repository_->members()) {
1212 : // For each member, we iterate over the commits to add in reverse chronological
1213 : // order (i.e. from newest to oldest) and set their status from the point of view
1214 : // of that member (as best we can given the information we have).
1215 : //
1216 : // The key assumption made in order to compute the status is that it can never decrease
1217 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1218 : // therefore start by setting the "status" variable below to the lowest possible value,
1219 : // and increase it when we encounter a commit for which it is justified to do so.
1220 : //
1221 : // If messageReceived is true, then the commits we are adding are the most recent in the
1222 : // conversation history, so the lowest possible value is SENDING.
1223 : //
1224 : // If messageReceived is false, then the commits we are adding are older than the ones
1225 : // that are already in the history, so the lowest possible value is the status of the
1226 : // oldest message in the history so far, which is stored in memberToStatus.
1227 12568 : auto status = SENDING;
1228 12568 : if (!messageReceived) {
1229 12 : auto cache = memberToStatus[member.uri];
1230 12 : if (cache > status)
1231 9 : status = cache;
1232 : }
1233 :
1234 25172 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1235 12604 : auto sharedCommit = *it;
1236 12604 : auto previousStatus = status;
1237 :
1238 : // Compute status for the current commit.
1239 12604 : if (status < SENT && messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1240 14 : status = SENT;
1241 : }
1242 12604 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1243 1 : status = DISPLAYED;
1244 : }
1245 12604 : if (member.uri == sharedCommit->body.at("author")) {
1246 919 : status = DISPLAYED;
1247 : }
1248 12604 : if(status < sharedCommit->status[member.uri]){
1249 1 : status = sharedCommit->status[member.uri];
1250 : }
1251 :
1252 : // Store computed value.
1253 12604 : sharedCommit->status[member.uri] = status;
1254 :
1255 : // Update messagesStatus_ if needed.
1256 12604 : if (previousStatus == SENDING && status >= SENT) {
1257 921 : messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1258 : }
1259 12604 : if (previousStatus <= SENT && status == DISPLAYED) {
1260 908 : messagesStatus_[member.uri]["read"] = sharedCommit->id;
1261 : }
1262 12604 : }
1263 :
1264 12568 : if (!messageReceived) {
1265 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1266 12 : memberToStatus[member.uri] = status;
1267 : }
1268 907 : }
1269 907 : }
1270 :
1271 6931 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1272 13882 : for (const auto& sharedCommit : sharedCommits) {
1273 6951 : history.quickAccess[sharedCommit->id] = sharedCommit;
1274 :
1275 6951 : auto reactToIt = sharedCommit->body.find("react-to");
1276 6951 : auto editIt = sharedCommit->body.find("edit");
1277 6951 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1278 3 : handleReaction(history, sharedCommit);
1279 6948 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1280 8 : handleEdition(history, sharedCommit, messageReceived);
1281 6940 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1282 5731 : messages.emplace_back(sharedCommit);
1283 : }
1284 6951 : rectifyStatus(sharedCommit, history);
1285 : }
1286 :
1287 6931 : return messages;
1288 6931 : }
1289 :
1290 183 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1291 : ConversationMode mode,
1292 183 : const std::string& otherMember)
1293 183 : : pimpl_ {new Impl {account, mode, otherMember}}
1294 183 : {}
1295 :
1296 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1297 18 : const std::string& conversationId)
1298 18 : : pimpl_ {new Impl {account, conversationId}}
1299 18 : {}
1300 :
1301 189 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1302 : const std::string& remoteDevice,
1303 189 : const std::string& conversationId)
1304 189 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1305 189 : {}
1306 :
1307 385 : Conversation::~Conversation() {}
1308 :
1309 : std::string
1310 4879 : Conversation::id() const
1311 : {
1312 4879 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1313 : }
1314 :
1315 : void
1316 137 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1317 : {
1318 : try {
1319 137 : if (mode() == ConversationMode::ONE_TO_ONE) {
1320 : // Only authorize to add left members
1321 1 : auto initialMembers = getInitialMembers();
1322 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1323 1 : if (it == initialMembers.end()) {
1324 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1325 1 : cb(false, "");
1326 1 : return;
1327 : }
1328 1 : }
1329 0 : } catch (const std::exception& e) {
1330 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1331 0 : cb(false, "");
1332 0 : return;
1333 0 : }
1334 136 : if (isMember(contactUri, true)) {
1335 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1336 0 : cb(false, "");
1337 0 : return;
1338 : }
1339 136 : if (isBanned(contactUri)) {
1340 2 : if (pimpl_->isAdmin()) {
1341 3 : dht::ThreadPool::io().run(
1342 2 : [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1343 1 : if (auto sthis = w.lock()) {
1344 1 : auto members = sthis->pimpl_->repository_->members();
1345 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1346 1 : if (type.empty()) {
1347 0 : cb(false, {});
1348 0 : return;
1349 : }
1350 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1351 2 : }
1352 : });
1353 : } else {
1354 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1355 1 : cb(false, "");
1356 : }
1357 2 : return;
1358 : }
1359 :
1360 134 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1361 134 : if (auto sthis = w.lock()) {
1362 : // Add member files and commit
1363 134 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1364 134 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1365 134 : sthis->pimpl_->announce(commit, true);
1366 134 : lk.unlock();
1367 134 : if (cb)
1368 134 : cb(!commit.empty(), commit);
1369 268 : }
1370 134 : });
1371 : }
1372 :
1373 : std::shared_ptr<dhtnet::ChannelSocket>
1374 4347 : Conversation::gitSocket(const DeviceId& deviceId) const
1375 : {
1376 4347 : return pimpl_->gitSocket(deviceId);
1377 : }
1378 :
1379 : void
1380 1954 : Conversation::addGitSocket(const DeviceId& deviceId,
1381 : const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1382 : {
1383 1954 : pimpl_->addGitSocket(deviceId, socket);
1384 1954 : }
1385 :
1386 : void
1387 795 : Conversation::removeGitSocket(const DeviceId& deviceId)
1388 : {
1389 795 : pimpl_->removeGitSocket(deviceId);
1390 795 : }
1391 :
1392 : void
1393 381 : Conversation::shutdownConnections()
1394 : {
1395 381 : pimpl_->fallbackTimer_->cancel();
1396 381 : pimpl_->gitSocketList_.clear();
1397 381 : if (pimpl_->swarmManager_)
1398 381 : pimpl_->swarmManager_->shutdown();
1399 381 : std::lock_guard lk(pimpl_->membersMtx_);
1400 381 : pimpl_->checkedMembers_.clear();
1401 381 : }
1402 :
1403 : void
1404 0 : Conversation::connectivityChanged()
1405 : {
1406 0 : if (pimpl_->swarmManager_)
1407 0 : pimpl_->swarmManager_->maintainBuckets();
1408 0 : }
1409 :
1410 : std::vector<jami::DeviceId>
1411 0 : Conversation::getDeviceIdList() const
1412 : {
1413 0 : return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1414 : }
1415 :
1416 : std::shared_ptr<Typers>
1417 9 : Conversation::typers() const
1418 : {
1419 9 : return pimpl_->typers_;
1420 : }
1421 :
1422 : bool
1423 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1424 : {
1425 0 : if (!pimpl_->swarmManager_)
1426 0 : return false;
1427 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1428 : }
1429 :
1430 : void
1431 1 : Conversation::Impl::voteUnban(const std::string& contactUri,
1432 : const std::string_view type,
1433 : const OnDoneCb& cb)
1434 : {
1435 : // Check if admin
1436 1 : if (!isAdmin()) {
1437 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1438 0 : cb(false, {});
1439 0 : return;
1440 : }
1441 :
1442 : // Vote for removal
1443 1 : std::unique_lock lk(writeMtx_);
1444 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1445 1 : if (voteCommit.empty()) {
1446 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1447 0 : cb(false, "");
1448 0 : return;
1449 : }
1450 :
1451 1 : auto lastId = voteCommit;
1452 1 : std::vector<std::string> commits;
1453 1 : commits.emplace_back(voteCommit);
1454 :
1455 : // If admin, check vote
1456 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1457 1 : if (!resolveCommit.empty()) {
1458 1 : commits.emplace_back(resolveCommit);
1459 1 : lastId = resolveCommit;
1460 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1461 : }
1462 1 : announce(commits, true);
1463 1 : lk.unlock();
1464 1 : if (cb)
1465 1 : cb(!lastId.empty(), lastId);
1466 1 : }
1467 :
1468 : void
1469 14 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1470 : {
1471 42 : dht::ThreadPool::io().run([w = weak(),
1472 14 : contactUri = std::move(contactUri),
1473 14 : isDevice = std::move(isDevice),
1474 14 : cb = std::move(cb)] {
1475 14 : if (auto sthis = w.lock()) {
1476 : // Check if admin
1477 14 : if (!sthis->pimpl_->isAdmin()) {
1478 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1479 1 : cb(false, {});
1480 2 : return;
1481 : }
1482 :
1483 : // Get current user type
1484 13 : std::string type;
1485 13 : if (isDevice) {
1486 0 : type = "devices";
1487 : } else {
1488 13 : auto members = sthis->pimpl_->repository_->members();
1489 29 : for (const auto& member : members) {
1490 29 : if (member.uri == contactUri) {
1491 13 : if (member.role == MemberRole::INVITED) {
1492 2 : type = "invited";
1493 11 : } else if (member.role == MemberRole::ADMIN) {
1494 1 : type = "admins";
1495 10 : } else if (member.role == MemberRole::MEMBER) {
1496 10 : type = "members";
1497 : }
1498 13 : break;
1499 : }
1500 : }
1501 13 : if (type.empty()) {
1502 0 : cb(false, {});
1503 0 : return;
1504 : }
1505 13 : }
1506 :
1507 : // Vote for removal
1508 13 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1509 13 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1510 13 : if (voteCommit.empty()) {
1511 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1512 1 : cb(false, "");
1513 1 : return;
1514 : }
1515 :
1516 12 : auto lastId = voteCommit;
1517 12 : std::vector<std::string> commits;
1518 12 : commits.emplace_back(voteCommit);
1519 :
1520 : // If admin, check vote
1521 24 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1522 12 : if (!resolveCommit.empty()) {
1523 12 : commits.emplace_back(resolveCommit);
1524 12 : lastId = resolveCommit;
1525 12 : JAMI_WARN("Vote solved for %s. %s banned",
1526 : contactUri.c_str(),
1527 : isDevice ? "Device" : "Member");
1528 12 : sthis->pimpl_->disconnectFromPeer(contactUri);
1529 : }
1530 :
1531 12 : sthis->pimpl_->announce(commits, true);
1532 12 : lk.unlock();
1533 12 : cb(!lastId.empty(), lastId);
1534 29 : }
1535 : });
1536 14 : }
1537 :
1538 : std::vector<std::map<std::string, std::string>>
1539 416 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1540 : {
1541 416 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1542 : }
1543 :
1544 : std::set<std::string>
1545 2026 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1546 : {
1547 2026 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1548 : }
1549 :
1550 : std::vector<NodeId>
1551 1753 : Conversation::peersToSyncWith() const
1552 : {
1553 1753 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1554 1753 : const auto& nodes = routingTable.getNodes();
1555 1753 : const auto& mobiles = routingTable.getMobileNodes();
1556 1753 : std::vector<NodeId> s;
1557 1753 : s.reserve(nodes.size() + mobiles.size());
1558 1753 : s.insert(s.end(), nodes.begin(), nodes.end());
1559 1753 : s.insert(s.end(), mobiles.begin(), mobiles.end());
1560 12983 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1561 11230 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1562 5385 : s.emplace_back(deviceId);
1563 3506 : return s;
1564 1753 : }
1565 :
1566 : bool
1567 1753 : Conversation::isBootstrapped() const
1568 : {
1569 1753 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1570 1753 : return !routingTable.getNodes().empty();
1571 : }
1572 :
1573 : std::string
1574 13020 : Conversation::uriFromDevice(const std::string& deviceId) const
1575 : {
1576 13020 : return pimpl_->repository_->uriFromDevice(deviceId);
1577 : }
1578 :
1579 : void
1580 0 : Conversation::monitor()
1581 : {
1582 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1583 0 : }
1584 :
1585 : std::string
1586 184 : Conversation::join()
1587 : {
1588 184 : return pimpl_->repository_->join();
1589 : }
1590 :
1591 : bool
1592 3237 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1593 : {
1594 3237 : auto repoPath = pimpl_->repoPath();
1595 3237 : auto invitedPath = repoPath / "invited";
1596 3237 : auto adminsPath = repoPath / "admins";
1597 3237 : auto membersPath = repoPath / "members";
1598 12948 : std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1599 3237 : if (includeInvited)
1600 2584 : pathsToCheck.emplace_back(invitedPath);
1601 7125 : for (const auto& path : pathsToCheck) {
1602 25193 : for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1603 21305 : std::string_view crtUri = certificate;
1604 21305 : auto crtIt = crtUri.find(".crt");
1605 21305 : if (path != invitedPath && crtIt == std::string_view::npos) {
1606 0 : JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1607 0 : continue;
1608 0 : }
1609 21305 : if (crtIt != std::string_view::npos)
1610 20682 : crtUri = crtUri.substr(0, crtIt);
1611 21305 : if (crtUri == uri)
1612 2783 : return true;
1613 6671 : }
1614 : }
1615 :
1616 454 : if (includeInvited && mode() == ConversationMode::ONE_TO_ONE) {
1617 3 : for (const auto& member : getInitialMembers()) {
1618 2 : if (member == uri)
1619 0 : return true;
1620 1 : }
1621 : }
1622 :
1623 454 : return false;
1624 3237 : }
1625 :
1626 : bool
1627 6434 : Conversation::isBanned(const std::string& uri) const
1628 : {
1629 6434 : return !pimpl_->bannedType(uri).empty();
1630 : }
1631 :
1632 : void
1633 0 : Conversation::sendMessage(std::string&& message,
1634 : const std::string& type,
1635 : const std::string& replyTo,
1636 : OnCommitCb&& onCommit,
1637 : OnDoneCb&& cb)
1638 : {
1639 0 : Json::Value json;
1640 0 : json["body"] = std::move(message);
1641 0 : json["type"] = type;
1642 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1643 0 : }
1644 :
1645 : void
1646 140 : Conversation::sendMessage(Json::Value&& value,
1647 : const std::string& replyTo,
1648 : OnCommitCb&& onCommit,
1649 : OnDoneCb&& cb)
1650 : {
1651 140 : if (!replyTo.empty()) {
1652 2 : auto commit = pimpl_->repository_->getCommit(replyTo);
1653 2 : if (commit == std::nullopt) {
1654 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1655 1 : return;
1656 : }
1657 1 : value["reply-to"] = replyTo;
1658 2 : }
1659 556 : dht::ThreadPool::io().run(
1660 417 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1661 139 : if (auto sthis = w.lock()) {
1662 139 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1663 139 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1664 139 : lk.unlock();
1665 139 : if (onCommit)
1666 13 : onCommit(commit);
1667 139 : sthis->pimpl_->announce(commit, true);
1668 139 : if (cb)
1669 139 : cb(!commit.empty(), commit);
1670 278 : }
1671 139 : });
1672 : }
1673 :
1674 : void
1675 0 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1676 : {
1677 0 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1678 0 : if (auto sthis = w.lock()) {
1679 0 : std::vector<std::string> commits;
1680 0 : commits.reserve(messages.size());
1681 0 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1682 0 : for (const auto& message : messages) {
1683 0 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1684 0 : commits.emplace_back(std::move(commit));
1685 0 : }
1686 0 : lk.unlock();
1687 0 : sthis->pimpl_->announce(commits, true);
1688 0 : if (cb)
1689 0 : cb(commits);
1690 0 : }
1691 0 : });
1692 0 : }
1693 :
1694 : std::optional<std::map<std::string, std::string>>
1695 11963 : Conversation::getCommit(const std::string& commitId) const
1696 : {
1697 11963 : auto commit = pimpl_->repository_->getCommit(commitId);
1698 11963 : if (commit == std::nullopt)
1699 1534 : return std::nullopt;
1700 10429 : return pimpl_->repository_->convCommitToMap(*commit);
1701 11963 : }
1702 :
1703 : void
1704 0 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1705 : {
1706 0 : if (!cb)
1707 0 : return;
1708 0 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1709 0 : if (auto sthis = w.lock()) {
1710 0 : cb(sthis->pimpl_->loadMessages(options));
1711 0 : }
1712 0 : });
1713 : }
1714 :
1715 : void
1716 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1717 : {
1718 2 : if (!cb)
1719 0 : return;
1720 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1721 2 : if (auto sthis = w.lock()) {
1722 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1723 2 : auto result = sthis->pimpl_->loadMessages2(options);
1724 2 : lk.unlock();
1725 2 : cb(std::move(result));
1726 4 : }
1727 2 : });
1728 : }
1729 :
1730 : void
1731 0 : Conversation::clearCache()
1732 : {
1733 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1734 0 : pimpl_->loadedHistory_.messageList.clear();
1735 0 : pimpl_->loadedHistory_.quickAccess.clear();
1736 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1737 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1738 : {
1739 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1740 0 : pimpl_->memberToStatus.clear();
1741 0 : }
1742 0 : }
1743 :
1744 : std::string
1745 2109 : Conversation::lastCommitId() const
1746 : {
1747 : {
1748 2109 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1749 2109 : if (!pimpl_->loadedHistory_.messageList.empty())
1750 3400 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1751 2109 : }
1752 409 : LogOptions options;
1753 409 : options.nbOfCommits = 1;
1754 409 : options.skipMerge = true;
1755 409 : History optHistory;
1756 409 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1757 409 : auto res = pimpl_->loadMessages2(options, &optHistory);
1758 409 : if (res.empty())
1759 0 : return {};
1760 818 : return (*optHistory.messageList.begin())->id;
1761 409 : }
1762 :
1763 : std::vector<std::map<std::string, std::string>>
1764 1692 : Conversation::Impl::mergeHistory(const std::string& uri)
1765 : {
1766 1692 : if (not repository_) {
1767 0 : JAMI_WARNING("{} Invalid repo. Abort merge", toString());
1768 0 : return {};
1769 : }
1770 3384 : auto remoteHead = repository_->remoteHead(uri);
1771 1692 : if (remoteHead.empty()) {
1772 0 : JAMI_WARNING("{} Unable to get HEAD of {}", toString(), uri);
1773 0 : return {};
1774 : }
1775 :
1776 : // Validate commit
1777 1692 : auto [newCommits, err] = repository_->validFetch(uri);
1778 1692 : if (newCommits.empty()) {
1779 793 : if (err)
1780 69 : JAMI_ERROR("{} Unable to validate history with {}", toString(), uri);
1781 793 : repository_->removeBranchWith(uri);
1782 793 : return {};
1783 : }
1784 :
1785 : // If validated, merge
1786 899 : auto [ok, cid] = repository_->merge(remoteHead);
1787 899 : if (!ok) {
1788 0 : JAMI_ERROR("{} Unable to merge history with {}", toString(), uri);
1789 0 : repository_->removeBranchWith(uri);
1790 0 : return {};
1791 : }
1792 899 : if (!cid.empty()) {
1793 : // A merge commit was generated, should be added in new commits
1794 13 : auto commit = repository_->getCommit(cid);
1795 13 : if (commit != std::nullopt)
1796 13 : newCommits.emplace_back(*commit);
1797 13 : }
1798 :
1799 2697 : JAMI_LOG("{} Successfully merged history with {:s}", toString(), uri);
1800 899 : auto result = repository_->convCommitsToMap(newCommits);
1801 1817 : for (auto& commit : result) {
1802 918 : auto it = commit.find("type");
1803 918 : if (it != commit.end() && it->second == "member") {
1804 768 : repository_->refreshMembers();
1805 :
1806 768 : if (commit["action"] == "ban")
1807 4 : disconnectFromPeer(commit["uri"]);
1808 : }
1809 : }
1810 899 : return result;
1811 1692 : }
1812 :
1813 : bool
1814 1770 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1815 : {
1816 1770 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1817 1770 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1818 1770 : auto& pullcbs = it->second;
1819 1770 : auto itPull = std::find_if(pullcbs.begin(),
1820 0 : pullcbs.end(),
1821 3541 : [&](const auto& elem) { return std::get<0>(elem) == commitId; });
1822 1770 : if (itPull != pullcbs.end()) {
1823 0 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress", pimpl_->toString(), deviceId, commitId);
1824 0 : cb(false);
1825 0 : return false;
1826 : }
1827 5310 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1828 1770 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1829 1770 : if (notInProgress)
1830 1758 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1831 1758 : if (auto sthis_ = w.lock())
1832 1758 : sthis_->pimpl_->pull(deviceId);
1833 1758 : });
1834 1770 : return true;
1835 1770 : }
1836 :
1837 : void
1838 1758 : Conversation::Impl::pull(const std::string& deviceId)
1839 : {
1840 1758 : auto& repo = repository_;
1841 :
1842 1758 : std::string commitId;
1843 1758 : OnPullCb cb;
1844 : while (true) {
1845 : {
1846 3528 : std::lock_guard lk(pullcbsMtx_);
1847 3528 : auto it = fetchingRemotes_.find(deviceId);
1848 3528 : if (it == fetchingRemotes_.end()) {
1849 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1850 0 : break;
1851 : }
1852 3528 : auto& pullcbs = it->second;
1853 3528 : if (pullcbs.empty()) {
1854 1758 : fetchingRemotes_.erase(it);
1855 1758 : break;
1856 : }
1857 1770 : auto& elem = pullcbs.front();
1858 1770 : commitId = std::move(std::get<0>(elem));
1859 1770 : cb = std::move(std::get<1>(elem));
1860 1770 : pullcbs.pop_front();
1861 3528 : }
1862 : // If recently fetched, the commit can already be there, so no need to do complex operations
1863 1770 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1864 70 : cb(true);
1865 78 : continue;
1866 : }
1867 : // Pull from remote
1868 1700 : auto fetched = repo->fetch(deviceId);
1869 1700 : if (!fetched) {
1870 8 : cb(false);
1871 8 : continue;
1872 : }
1873 1692 : auto oldHead = repo->getHead();
1874 1692 : std::string newHead = oldHead;
1875 1692 : std::unique_lock lk(writeMtx_);
1876 1692 : auto commits = mergeHistory(deviceId);
1877 1692 : if (!commits.empty()) {
1878 899 : newHead = commits.rbegin()->at("id");
1879 : // Note: Because clients needs to linearize the history, they need to know all commits
1880 : // that can be updated.
1881 : // In this case, all commits until the common merge base should be announced.
1882 : // The client ill need to update it's model after this.
1883 899 : std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1884 899 : auto newHeadCommit = repo->getCommit(newHead);
1885 899 : if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1886 13 : mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1887 13 : LogOptions options;
1888 13 : options.to = mergeBase;
1889 13 : auto updatedCommits = loadMessages(options);
1890 : // We announce commits from oldest to update to newest. This generally avoid
1891 : // to get detached commits until they are all announced.
1892 13 : std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1893 13 : announce(updatedCommits);
1894 13 : } else {
1895 886 : announce(commits);
1896 : }
1897 899 : }
1898 1692 : lk.unlock();
1899 :
1900 1692 : bool commitFound = false;
1901 1692 : if (commitId != "") {
1902 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1903 : // We need to check if we actually got it; the fact that the fetch above was
1904 : // successful doesn't guarantee that we did.
1905 1291 : for (const auto& commit : commits) {
1906 904 : if (commit.at("id") == commitId) {
1907 898 : commitFound = true;
1908 898 : break;
1909 : }
1910 : }
1911 : } else {
1912 407 : commitFound = true;
1913 : }
1914 1692 : if (!commitFound)
1915 1161 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1916 : deviceId, commitId);
1917 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1918 : // for commit `commitId` to other members of the swarm. It's important that we only
1919 : // send these notifications if we actually have the commit. Otherwise, we can end up
1920 : // in a situation where the members of the swarm keep sending notifications to each
1921 : // other for a commit that none of them have (note that we are unable to rule this out, as
1922 : // nothing prevents a malicious user from intentionally sending a notification with
1923 : // a fake commit ID).
1924 1692 : if (cb)
1925 1692 : cb(commitFound);
1926 : // Announce if profile changed
1927 1692 : if (oldHead != newHead) {
1928 899 : auto diffStats = repo->diffStats(newHead, oldHead);
1929 899 : auto changedFiles = repo->changedFiles(diffStats);
1930 899 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1931 1798 : != changedFiles.end()) {
1932 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
1933 10 : accountId_, repo->id(), repo->infos());
1934 : }
1935 899 : }
1936 3462 : }
1937 1758 : }
1938 :
1939 : void
1940 1770 : Conversation::sync(const std::string& member,
1941 : const std::string& deviceId,
1942 : OnPullCb&& cb,
1943 : std::string commitId)
1944 : {
1945 1770 : pull(deviceId, std::move(cb), commitId);
1946 1770 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1947 1770 : auto sthis = w.lock();
1948 : // For waiting request, downloadFile
1949 1771 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1950 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1951 1770 : }
1952 1770 : });
1953 1770 : }
1954 :
1955 : std::map<std::string, std::string>
1956 263 : Conversation::generateInvitation() const
1957 : {
1958 : // Invite the new member to the conversation
1959 263 : Json::Value root;
1960 263 : auto& metadata = root[ConversationMapKeys::METADATAS];
1961 532 : for (const auto& [k, v] : infos()) {
1962 269 : if (v.size() >= 64000) {
1963 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1964 0 : continue;
1965 0 : }
1966 269 : metadata[k] = v;
1967 263 : }
1968 263 : root[ConversationMapKeys::CONVERSATIONID] = id();
1969 789 : return {{"application/invite+json", json::toString(root)}};
1970 263 : }
1971 :
1972 : std::string
1973 7 : Conversation::leave()
1974 : {
1975 7 : setRemovingFlag();
1976 7 : std::lock_guard lk(pimpl_->writeMtx_);
1977 14 : return pimpl_->repository_->leave();
1978 7 : }
1979 :
1980 : void
1981 10 : Conversation::setRemovingFlag()
1982 : {
1983 10 : pimpl_->isRemoving_ = true;
1984 10 : }
1985 :
1986 : bool
1987 3756 : Conversation::isRemoving()
1988 : {
1989 3756 : return pimpl_->isRemoving_;
1990 : }
1991 :
1992 : void
1993 21 : Conversation::erase()
1994 : {
1995 21 : if (pimpl_->conversationDataPath_ != "")
1996 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1997 21 : if (!pimpl_->repository_)
1998 0 : return;
1999 21 : std::lock_guard lk(pimpl_->writeMtx_);
2000 21 : pimpl_->repository_->erase();
2001 21 : }
2002 :
2003 : ConversationMode
2004 4535 : Conversation::mode() const
2005 : {
2006 4535 : return pimpl_->repository_->mode();
2007 : }
2008 :
2009 : std::vector<std::string>
2010 30 : Conversation::getInitialMembers() const
2011 : {
2012 30 : return pimpl_->repository_->getInitialMembers();
2013 : }
2014 :
2015 : bool
2016 0 : Conversation::isInitialMember(const std::string& uri) const
2017 : {
2018 0 : auto members = getInitialMembers();
2019 0 : return std::find(members.begin(), members.end(), uri) != members.end();
2020 0 : }
2021 :
2022 : void
2023 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2024 : {
2025 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2026 9 : if (auto sthis = w.lock()) {
2027 9 : auto& repo = sthis->pimpl_->repository_;
2028 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
2029 9 : auto commit = repo->updateInfos(map);
2030 9 : sthis->pimpl_->announce(commit, true);
2031 9 : lk.unlock();
2032 9 : if (cb)
2033 9 : cb(!commit.empty(), commit);
2034 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2035 18 : sthis->pimpl_->accountId_, repo->id(), repo->infos());
2036 18 : }
2037 9 : });
2038 9 : }
2039 :
2040 : std::map<std::string, std::string>
2041 310 : Conversation::infos() const
2042 : {
2043 310 : return pimpl_->repository_->infos();
2044 : }
2045 :
2046 : void
2047 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2048 : {
2049 8 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2050 8 : auto prefs = map;
2051 8 : auto itLast = prefs.find(LAST_MODIFIED);
2052 8 : if (itLast != prefs.end()) {
2053 3 : if (std::filesystem::is_regular_file(filePath)) {
2054 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
2055 : try {
2056 1 : if (lastModified >= std::stoul(itLast->second))
2057 0 : return;
2058 0 : } catch (...) {
2059 0 : return;
2060 0 : }
2061 : }
2062 3 : prefs.erase(itLast);
2063 : }
2064 :
2065 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2066 8 : msgpack::pack(file, prefs);
2067 16 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_,
2068 16 : id(),
2069 8 : std::move(prefs));
2070 8 : }
2071 :
2072 : std::map<std::string, std::string>
2073 101 : Conversation::preferences(bool includeLastModified) const
2074 : {
2075 : try {
2076 101 : std::map<std::string, std::string> preferences;
2077 101 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2078 191 : auto file = fileutils::loadFile(filePath);
2079 11 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2080 11 : oh.get().convert(preferences);
2081 11 : if (includeLastModified)
2082 8 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
2083 11 : return preferences;
2084 281 : } catch (const std::exception& e) {
2085 90 : }
2086 90 : return {};
2087 : }
2088 :
2089 : std::vector<uint8_t>
2090 0 : Conversation::vCard() const
2091 : {
2092 : try {
2093 0 : return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
2094 0 : } catch (...) {
2095 0 : }
2096 0 : return {};
2097 : }
2098 :
2099 : std::shared_ptr<TransferManager>
2100 1876 : Conversation::dataTransfer() const
2101 : {
2102 1876 : return pimpl_->transferManager_;
2103 : }
2104 :
2105 : bool
2106 13 : Conversation::onFileChannelRequest(const std::string& member,
2107 : const std::string& fileId,
2108 : std::filesystem::path& path,
2109 : std::string& sha3sum) const
2110 : {
2111 13 : if (!isMember(member))
2112 0 : return false;
2113 :
2114 13 : auto sep = fileId.find('_');
2115 13 : if (sep == std::string::npos)
2116 0 : return false;
2117 :
2118 13 : auto interactionId = fileId.substr(0, sep);
2119 13 : auto commit = getCommit(interactionId);
2120 26 : if (commit == std::nullopt || commit->find("type") == commit->end()
2121 26 : || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
2122 26 : || commit->at("type") != "application/data-transfer+json") {
2123 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2124 : pimpl_->accountId_,
2125 : member,
2126 : interactionId);
2127 0 : return false;
2128 : }
2129 :
2130 13 : path = dataTransfer()->path(fileId);
2131 13 : sha3sum = commit->at("sha3sum");
2132 :
2133 13 : return true;
2134 13 : }
2135 :
2136 : bool
2137 14 : Conversation::downloadFile(const std::string& interactionId,
2138 : const std::string& fileId,
2139 : const std::string& path,
2140 : const std::string&,
2141 : const std::string& deviceId)
2142 : {
2143 14 : auto commit = getCommit(interactionId);
2144 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2145 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2146 0 : return false;
2147 : }
2148 14 : auto tid = commit->find("tid");
2149 14 : auto sha3sum = commit->find("sha3sum");
2150 14 : auto size_str = commit->find("totalSize");
2151 :
2152 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2153 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2154 0 : return false;
2155 : }
2156 :
2157 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2158 14 : if (totalSize < 0) {
2159 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2160 0 : return false;
2161 : }
2162 :
2163 : // Be sure to not lock conversation
2164 28 : dht::ThreadPool().io().run([w = weak(),
2165 : deviceId,
2166 : fileId,
2167 : interactionId,
2168 14 : sha3sum = sha3sum->second,
2169 : path,
2170 27 : totalSize] {
2171 14 : if (auto shared = w.lock()) {
2172 14 : std::filesystem::path filePath(path);
2173 14 : if (filePath.empty()) {
2174 0 : filePath = shared->dataTransfer()->path(fileId);
2175 : }
2176 :
2177 14 : if (fileutils::size(filePath) == totalSize) {
2178 1 : if (fileutils::sha3File(filePath) == sha3sum) {
2179 3 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2180 1 : return;
2181 : }
2182 : }
2183 :
2184 13 : std::filesystem::path tempFilePath(filePath);
2185 13 : tempFilePath += ".tmp";
2186 13 : auto start = fileutils::size(tempFilePath);
2187 13 : if (start < 0)
2188 11 : start = 0;
2189 13 : size_t end = 0;
2190 :
2191 13 : auto acc = shared->pimpl_->account_.lock();
2192 13 : if (!acc)
2193 0 : return;
2194 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2195 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2196 28 : }
2197 : });
2198 14 : return true;
2199 14 : }
2200 :
2201 : void
2202 1111 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2203 : {
2204 1111 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2205 1111 : auto sthis = w.lock();
2206 1111 : if (!sthis)
2207 0 : return;
2208 : // Update fetched for Uri
2209 1111 : auto uri = sthis->uriFromDevice(deviceId);
2210 1111 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2211 42 : return;
2212 : // When a user fetches a commit, the message is sent for this person
2213 1069 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2214 1153 : });
2215 1111 : }
2216 :
2217 :
2218 : void
2219 1113 : Conversation::Impl::updateStatus(const std::string& uri,
2220 : libjami::Account::MessageStates st,
2221 : const std::string& commitId,
2222 : const std::string& ts,
2223 : bool emit)
2224 : {
2225 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2226 1113 : LogOptions options;
2227 1113 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2228 : {
2229 : // Update internal structures.
2230 1113 : std::lock_guard lk(messageStatusMtx_);
2231 1113 : auto& status = messagesStatus_[uri];
2232 1113 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2233 1113 : if (oldStatus == commitId)
2234 3 : return; // Nothing to do
2235 1110 : options.to = oldStatus;
2236 1110 : options.from = commitId;
2237 1110 : oldStatus = commitId;
2238 1110 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2239 1110 : saveStatus();
2240 1110 : if (emit)
2241 1083 : newStatus[uri].insert(status.begin(), status.end());
2242 1113 : }
2243 1110 : if (emit && messageStatusCb_) {
2244 1083 : messageStatusCb_(newStatus);
2245 : }
2246 : // Update messages status for all commit between the old and new one
2247 1110 : options.logIfNotFound = false;
2248 1110 : options.fastLog = true;
2249 1110 : History optHistory;
2250 1110 : std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status.
2251 1110 : auto res = loadMessages2(options, &optHistory);
2252 1110 : if (res.size() == 0) {
2253 : // In this case, commit is not received yet, so we cache it
2254 5 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2255 : }
2256 6401 : for (const auto& [cid, _]: optHistory.quickAccess) {
2257 5291 : auto message = loadedHistory_.quickAccess.find(cid);
2258 5291 : if (message != loadedHistory_.quickAccess.end()) {
2259 : // Update message and emit to client,
2260 2817 : if(static_cast<int32_t>(st) > message->second->status[uri]){
2261 2813 : message->second->status[uri] = static_cast<int32_t>(st);
2262 5626 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
2263 2813 : accountId_,
2264 2813 : repository_->id(),
2265 : uri,
2266 : cid,
2267 : static_cast<int>(st));
2268 : }
2269 : } else {
2270 : // In this case, commit is not loaded by client, so we cache it
2271 : // No need to emit to client, they will get a correct status on load.
2272 2474 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2273 : }
2274 : }
2275 1116 : }
2276 :
2277 : bool
2278 18 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2279 : {
2280 18 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2281 18 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2282 1 : return false; // Nothing to do
2283 17 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2284 17 : auto sthis = w.lock();
2285 17 : if (!sthis)
2286 0 : return;
2287 17 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2288 17 : });
2289 17 : return true;
2290 18 : }
2291 :
2292 : std::map<std::string, std::map<std::string, std::string>>
2293 82 : Conversation::messageStatus() const
2294 : {
2295 82 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2296 164 : return pimpl_->messagesStatus_;
2297 82 : }
2298 :
2299 : void
2300 55 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2301 : {
2302 55 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2303 55 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2304 118 : for (const auto& [uri, status] : messageStatus) {
2305 63 : auto& oldMs = pimpl_->messagesStatus_[uri];
2306 63 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2307 24 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2308 24 : stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2309 : }
2310 : }
2311 63 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2312 3 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2313 3 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2314 : }
2315 : }
2316 : }
2317 55 : lk.unlock();
2318 :
2319 82 : for (const auto& [status, uri, commitId, ts] : stVec) {
2320 27 : pimpl_->updateStatus(uri, status, commitId, ts);
2321 : }
2322 55 : }
2323 :
2324 : void
2325 385 : Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2326 : {
2327 385 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2328 385 : pimpl_->messageStatusCb_ = cb;
2329 385 : }
2330 :
2331 : #ifdef LIBJAMI_TEST
2332 : void
2333 512 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2334 : {
2335 512 : pimpl_->bootstrapCbTest_ = cb;
2336 512 : }
2337 : #endif
2338 :
2339 : void
2340 602 : Conversation::checkBootstrapMember(const asio::error_code& ec,
2341 : std::vector<std::map<std::string, std::string>> members)
2342 : {
2343 602 : if (ec == asio::error::operation_aborted)
2344 317 : return;
2345 580 : auto acc = pimpl_->account_.lock();
2346 580 : if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2347 12 : return;
2348 : // We bootstrap the DRT with devices who already wrote in the repository.
2349 : // However, in a conversation, a large number of devices may just watch
2350 : // the conversation, but never write any message.
2351 568 : std::unique_lock lock(pimpl_->membersMtx_);
2352 :
2353 568 : std::string uri;
2354 853 : while (!members.empty()) {
2355 291 : auto member = std::move(members.back());
2356 291 : members.pop_back();
2357 291 : uri = std::move(member.at("uri"));
2358 291 : if (uri != pimpl_->userId_
2359 291 : && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2360 6 : break;
2361 291 : }
2362 283 : auto fallbackFailed = [](auto sthis) {
2363 849 : JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.",
2364 : sthis->pimpl_->toString());
2365 : #ifdef LIBJAMI_TEST
2366 283 : if (sthis->pimpl_->bootstrapCbTest_)
2367 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2368 : #endif
2369 283 : };
2370 : // If members is empty, we finished the fallback un-successfully
2371 568 : if (members.empty() && uri.empty()) {
2372 283 : lock.unlock();
2373 283 : fallbackFailed(this);
2374 283 : return;
2375 : }
2376 :
2377 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2378 285 : pimpl_->checkedMembers_.emplace(uri);
2379 285 : auto devices = std::make_shared<std::vector<NodeId>>();
2380 1140 : acc->forEachDevice(
2381 570 : dht::InfoHash(uri),
2382 286 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2383 : // Test if already sent
2384 286 : if (auto sthis = w.lock()) {
2385 285 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2386 276 : devices->emplace_back(dev->getLongId());
2387 286 : }
2388 286 : },
2389 285 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2390 285 : auto sthis = w.lock();
2391 285 : if (!sthis)
2392 1 : return;
2393 284 : auto checkNext = true;
2394 284 : if (ok && devices->size() != 0) {
2395 : #ifdef LIBJAMI_TEST
2396 276 : if (sthis->pimpl_->bootstrapCbTest_)
2397 4 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2398 : #endif
2399 828 : JAMI_LOG("{} Bootstrap: Fallback with member: {}",
2400 : sthis->pimpl_->toString(),
2401 : uri);
2402 276 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2403 0 : checkNext = false;
2404 : }
2405 284 : if (checkNext) {
2406 : // Check next member
2407 284 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2408 568 : sthis->pimpl_->fallbackTimer_->async_wait(
2409 568 : std::bind(&Conversation::checkBootstrapMember,
2410 : sthis,
2411 : std::placeholders::_1,
2412 284 : std::move(members)));
2413 : } else {
2414 : // In this case, all members are checked. Fallback failed
2415 0 : fallbackFailed(sthis);
2416 : }
2417 285 : });
2418 1146 : }
2419 :
2420 : void
2421 511 : Conversation::bootstrap(std::function<void()> onBootstrapped,
2422 : const std::vector<DeviceId>& knownDevices)
2423 : {
2424 511 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2425 511 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2426 0 : return;
2427 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2428 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2429 : // If it works, the callback onConnectionChanged will be called with ok=true
2430 511 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2431 511 : std::vector<DeviceId> devices = knownDevices;
2432 1526 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2433 1015 : if (!isBanned(member))
2434 1015 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2435 511 : }
2436 1533 : JAMI_DEBUG("{} Bootstrap with {} device(s)",
2437 : pimpl_->toString(),
2438 : devices.size());
2439 : // set callback
2440 318 : auto fallback = [](std::shared_ptr<Conversation> sthis, bool now = false) {
2441 : // Fallback
2442 318 : auto acc = sthis->pimpl_->account_.lock();
2443 318 : if (!acc)
2444 0 : return;
2445 318 : auto members = sthis->getMembers(false, false);
2446 318 : std::shuffle(members.begin(), members.end(), acc->rand);
2447 318 : if (now) {
2448 289 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2449 : } else {
2450 29 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2451 87 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2452 58 : - std::chrono::seconds(timeForBootstrap));
2453 87 : JAMI_DEBUG("{} Fallback in {} seconds",
2454 : sthis->pimpl_->toString(),
2455 : (20 - timeForBootstrap));
2456 : }
2457 636 : sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2458 : sthis,
2459 : std::placeholders::_1,
2460 318 : std::move(members)));
2461 318 : };
2462 :
2463 511 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2464 : // This will call methods from accounts, so trigger on another thread.
2465 365 : dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2466 365 : auto sthis = w.lock();
2467 365 : if (!sthis)
2468 0 : return;
2469 365 : if (ok) {
2470 : // Bootstrap succeeded!
2471 : {
2472 336 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2473 336 : sthis->pimpl_->checkedMembers_.clear();
2474 336 : }
2475 336 : if (sthis->pimpl_->bootstrapCb_)
2476 336 : sthis->pimpl_->bootstrapCb_();
2477 : #ifdef LIBJAMI_TEST
2478 336 : if (sthis->pimpl_->bootstrapCbTest_)
2479 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2480 : #endif
2481 336 : return;
2482 : }
2483 29 : fallback(sthis);
2484 365 : });
2485 365 : });
2486 : {
2487 511 : std::lock_guard lock(pimpl_->membersMtx_);
2488 511 : pimpl_->checkedMembers_.clear();
2489 511 : }
2490 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2491 511 : if (pimpl_->swarmManager_->isShutdown()) {
2492 26 : pimpl_->swarmManager_->restart();
2493 26 : pimpl_->swarmManager_->maintainBuckets();
2494 485 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2495 289 : fallback(shared_from_this(), true);
2496 : }
2497 511 : }
2498 :
2499 : std::vector<std::string>
2500 18 : Conversation::commitsEndedCalls()
2501 : {
2502 18 : pimpl_->loadActiveCalls();
2503 18 : pimpl_->loadHostedCalls();
2504 18 : auto commits = pimpl_->commitsEndedCalls();
2505 18 : if (!commits.empty()) {
2506 : // Announce to client
2507 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2508 0 : if (auto sthis = w.lock())
2509 0 : sthis->pimpl_->announce(commits, true);
2510 0 : });
2511 : }
2512 18 : return commits;
2513 0 : }
2514 :
2515 : void
2516 385 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2517 : {
2518 385 : pimpl_->onMembersChanged_ = std::move(cb);
2519 385 : pimpl_->repository_->onMembersChanged([w=weak()] (const std::set<std::string>& memberUris) {
2520 1076 : if (auto sthis = w.lock())
2521 1076 : sthis->pimpl_->onMembersChanged_(memberUris);
2522 1076 : });
2523 385 : }
2524 :
2525 : void
2526 385 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2527 : {
2528 770 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2529 : w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2530 700 : if (auto sthis = w.lock())
2531 700 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2532 1470 : };
2533 385 : }
2534 :
2535 : void
2536 695 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2537 : {
2538 695 : auto deviceId = channel->deviceId();
2539 : // Transmit avatar if necessary
2540 : // We do this here, because at this point we know both sides are connected and in
2541 : // the same conversation
2542 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2543 695 : auto cert = channel->peerCertificate();
2544 695 : if (!cert || !cert->issuer)
2545 0 : return;
2546 695 : auto member = cert->issuer->getId().toString();
2547 695 : pimpl_->swarmManager_->addChannel(std::move(channel));
2548 695 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2549 695 : auto sthis = w.lock();
2550 695 : if (auto account = a.lock()) {
2551 695 : account->sendProfile(sthis->id(), member, deviceId.toString());
2552 695 : }
2553 695 : });
2554 695 : }
2555 :
2556 : uint32_t
2557 4 : Conversation::countInteractions(const std::string& toId,
2558 : const std::string& fromId,
2559 : const std::string& authorUri) const
2560 : {
2561 4 : LogOptions options;
2562 4 : options.to = toId;
2563 4 : options.from = fromId;
2564 4 : options.authorUri = authorUri;
2565 4 : options.logIfNotFound = false;
2566 4 : options.fastLog = true;
2567 4 : History history;
2568 4 : std::lock_guard lk(history.mutex);
2569 4 : auto res = pimpl_->loadMessages2(options, &history);
2570 8 : return res.size();
2571 4 : }
2572 :
2573 : void
2574 4 : Conversation::search(uint32_t req,
2575 : const Filter& filter,
2576 : const std::shared_ptr<std::atomic_int>& flag) const
2577 : {
2578 : // Because logging a conversation can take quite some time,
2579 : // do it asynchronously
2580 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2581 4 : if (auto sthis = w.lock()) {
2582 4 : History history;
2583 4 : std::vector<std::map<std::string, std::string>> commits {};
2584 : // std::regex_constants::ECMAScript is the default flag.
2585 4 : auto re = std::regex(filter.regexSearch,
2586 4 : filter.caseSensitive ? std::regex_constants::ECMAScript
2587 4 : : std::regex_constants::icase);
2588 4 : sthis->pimpl_->repository_->log(
2589 20 : [&](const auto& id, const auto& author, auto& commit) {
2590 100 : if (!filter.author.empty()
2591 20 : && filter.author != sthis->uriFromDevice(author.email)) {
2592 : // Filter author
2593 0 : return CallbackResult::Skip;
2594 : }
2595 20 : auto commitTime = git_commit_time(commit.get());
2596 20 : if (filter.before && filter.before < commitTime) {
2597 : // Only get commits before this date
2598 0 : return CallbackResult::Skip;
2599 : }
2600 20 : if (filter.after && filter.after > commitTime) {
2601 : // Only get commits before this date
2602 0 : if (git_commit_parentcount(commit.get()) <= 1)
2603 0 : return CallbackResult::Break;
2604 : else
2605 0 : return CallbackResult::Skip; // Because we are sorting it with
2606 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2607 : }
2608 :
2609 20 : return CallbackResult::Ok; // Continue
2610 : },
2611 20 : [&](auto&& cc) {
2612 40 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2613 40 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2614 20 : },
2615 20 : [&](auto id, auto, auto) {
2616 20 : if (id == filter.lastId)
2617 0 : return true;
2618 20 : return false;
2619 : },
2620 : "",
2621 : false);
2622 : // Search on generated history
2623 24 : for (auto& message : history.messageList) {
2624 20 : auto contentType = message->type;
2625 20 : auto isSearchable = contentType == "text/plain"
2626 20 : || contentType == "application/data-transfer+json";
2627 20 : if (filter.type.empty() && !isSearchable) {
2628 : // Not searchable, at least for now
2629 8 : continue;
2630 12 : } else if (contentType == filter.type || filter.type.empty()) {
2631 12 : if (isSearchable) {
2632 : // If it's a text match the body, else the display name
2633 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2634 36 : : message->body.at("displayName");
2635 12 : std::smatch body_match;
2636 12 : if (std::regex_search(body, body_match, re)) {
2637 5 : auto commit = message->body;
2638 5 : commit["id"] = message->id;
2639 5 : commit["type"] = message->type;
2640 5 : commits.emplace_back(commit);
2641 5 : }
2642 12 : } else {
2643 : // Matching type, just add it to the results
2644 0 : commits.emplace_back(message->body);
2645 : }
2646 :
2647 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2648 0 : break;
2649 : }
2650 20 : }
2651 :
2652 4 : if (commits.size() > 0)
2653 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2654 3 : sthis->pimpl_->accountId_,
2655 6 : sthis->id(),
2656 3 : std::move(commits));
2657 : // If we're the latest thread, inform client that the search is finished
2658 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2659 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2660 : req,
2661 4 : sthis->pimpl_->accountId_,
2662 8 : std::string {},
2663 8 : std::vector<std::map<std::string, std::string>> {});
2664 : }
2665 8 : }
2666 4 : });
2667 4 : }
2668 :
2669 : void
2670 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2671 : {
2672 14 : if (!message.isMember("confId")) {
2673 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2674 0 : return;
2675 : }
2676 :
2677 14 : auto now = std::chrono::system_clock::now();
2678 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2679 : {
2680 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2681 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2682 14 : pimpl_->saveHostedCalls();
2683 14 : }
2684 :
2685 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2686 : }
2687 :
2688 : bool
2689 20 : Conversation::isHosting(const std::string& confId) const
2690 : {
2691 20 : auto info = infos();
2692 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2693 0 : return true; // We are the current device Host
2694 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2695 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2696 20 : }
2697 :
2698 : void
2699 11 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2700 : {
2701 11 : if (!message.isMember("confId")) {
2702 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2703 0 : return;
2704 : }
2705 :
2706 11 : auto erased = false;
2707 : {
2708 11 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2709 11 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2710 11 : }
2711 11 : if (erased) {
2712 11 : pimpl_->saveHostedCalls();
2713 11 : sendMessage(std::move(message), "", {}, std::move(cb));
2714 : } else
2715 0 : cb(false, "");
2716 : }
2717 :
2718 : std::vector<std::map<std::string, std::string>>
2719 39 : Conversation::currentCalls() const
2720 : {
2721 39 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2722 78 : return pimpl_->activeCalls_;
2723 39 : }
2724 : } // namespace jami
|