Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "conversation.h"
18 :
19 : #include "account_const.h"
20 : #include "jamiaccount.h"
21 : #include "client/ring_signal.h"
22 : #include "swarm/swarm_manager.h"
23 : #ifdef ENABLE_PLUGIN
24 : #include "manager.h"
25 : #include "plugin/jamipluginmanager.h"
26 : #include "plugin/streamdata.h"
27 : #endif
28 : #include "jami/conversation_interface.h"
29 :
30 : #include "fileutils.h"
31 : #include "json_utils.h"
32 :
33 : #include <opendht/thread_pool.h>
34 : #include <fmt/compile.h>
35 :
36 : #include <charconv>
37 : #include <string_view>
38 : #include <tuple>
39 : #include <optional>
40 :
41 : namespace jami {
42 :
43 : static const char* const LAST_MODIFIED = "lastModified";
44 :
45 11 : ConvInfo::ConvInfo(const Json::Value& json)
46 : {
47 11 : id = json[ConversationMapKeys::ID].asString();
48 11 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
49 11 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
50 11 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
51 28 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
52 17 : members.emplace(v["uri"].asString());
53 : }
54 11 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
55 11 : }
56 :
57 : Json::Value
58 22 : ConvInfo::toJson() const
59 : {
60 22 : Json::Value json;
61 22 : json[ConversationMapKeys::ID] = id;
62 22 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
63 22 : if (removed) {
64 0 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
65 : }
66 22 : if (erased) {
67 0 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
68 : }
69 56 : for (const auto& m : members) {
70 34 : Json::Value member;
71 34 : member["uri"] = m;
72 34 : json[ConversationMapKeys::MEMBERS].append(member);
73 34 : }
74 22 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
75 22 : return json;
76 0 : }
77 :
78 : // ConversationRequest
79 208 : ConversationRequest::ConversationRequest(const Json::Value& json)
80 : {
81 208 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
82 208 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
83 208 : from = json[ConversationMapKeys::FROM].asString();
84 208 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
85 208 : auto& md = json[ConversationMapKeys::METADATAS];
86 416 : for (const auto& member : md.getMemberNames()) {
87 208 : metadatas.emplace(member, md[member].asString());
88 208 : }
89 208 : }
90 :
91 : Json::Value
92 2 : ConversationRequest::toJson() const
93 : {
94 2 : Json::Value json;
95 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
96 2 : json[ConversationMapKeys::FROM] = from;
97 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
98 2 : if (declined)
99 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
100 4 : for (const auto& [key, value] : metadatas) {
101 2 : json[ConversationMapKeys::METADATAS][key] = value;
102 : }
103 2 : return json;
104 0 : }
105 :
106 : std::map<std::string, std::string>
107 201 : ConversationRequest::toMap() const
108 : {
109 201 : auto result = metadatas;
110 201 : result[ConversationMapKeys::ID] = conversationId;
111 201 : result[ConversationMapKeys::FROM] = from;
112 201 : if (declined)
113 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
114 201 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
115 201 : return result;
116 0 : }
117 :
118 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
119 :
120 : struct History
121 : {
122 : // While loading the history, we need to avoid:
123 : // - reloading history (can just be ignored)
124 : // - adding new commits (should wait for history to be loaded)
125 : std::mutex mutex {};
126 : std::condition_variable cv {};
127 : bool loading {false};
128 : MessageList messageList {};
129 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
130 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
131 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
132 : };
133 :
134 : class Conversation::Impl
135 : {
136 : public:
137 135 : Impl(const std::shared_ptr<JamiAccount>& account,
138 : ConversationMode mode,
139 : const std::string& otherMember = "")
140 135 : : repository_(ConversationRepository::createConversation(account, mode, otherMember))
141 135 : , account_(account)
142 135 : , accountId_(account->getAccountID())
143 135 : , userId_(account->getUsername())
144 405 : , deviceId_(account->currentDeviceId())
145 : {
146 135 : if (!repository_) {
147 0 : throw std::logic_error("Unable to create repository");
148 : }
149 135 : init(account);
150 135 : }
151 :
152 8 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
153 8 : : account_(account)
154 8 : , accountId_(account->getAccountID())
155 8 : , userId_(account->getUsername())
156 16 : , deviceId_(account->currentDeviceId())
157 : {
158 8 : repository_ = std::make_unique<ConversationRepository>(account, conversationId);
159 8 : if (!repository_) {
160 0 : throw std::logic_error("Unable to create repository");
161 : }
162 8 : init(account);
163 8 : }
164 :
165 144 : Impl(const std::shared_ptr<JamiAccount>& account,
166 : const std::string& remoteDevice,
167 : const std::string& conversationId)
168 144 : : account_(account)
169 144 : , accountId_(account->getAccountID())
170 144 : , userId_(account->getUsername())
171 288 : , deviceId_(account->currentDeviceId())
172 : {
173 144 : std::vector<ConversationCommit> commits;
174 288 : repository_ = ConversationRepository::cloneConversation(account,
175 : remoteDevice,
176 : conversationId,
177 143 : [&](auto c) {
178 143 : commits = std::move(c);
179 144 : });
180 144 : if (!repository_) {
181 2 : emitSignal<libjami::ConversationSignal::OnConversationError>(
182 1 : accountId_, conversationId, EFETCH, "Unable to clone repository");
183 1 : throw std::logic_error("Unable to clone repository");
184 : }
185 : // To detect current active calls, we need to check history
186 286 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
187 429 : / "conversation_data" / conversationId;
188 143 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
189 143 : initActiveCalls(repository_->convCommitsToMap(commits));
190 143 : init(account);
191 175 : }
192 :
193 286 : void init(const std::shared_ptr<JamiAccount>& account) {
194 286 : ioContext_ = Manager::instance().ioContext();
195 286 : fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
196 : swarmManager_
197 572 : = std::make_shared<SwarmManager>(NodeId(deviceId_),
198 572 : Manager::instance().getSeededRandomEngine(),
199 286 : [account = account_](const DeviceId& deviceId) {
200 575 : if (auto acc = account.lock()) {
201 575 : return acc->isConnectedWith(deviceId);
202 575 : }
203 0 : return false;
204 286 : });
205 286 : swarmManager_->setMobility(account->isMobile());
206 : transferManager_
207 286 : = std::make_shared<TransferManager>(accountId_,
208 : "",
209 286 : repository_->id(),
210 572 : Manager::instance().getSeededRandomEngine());
211 572 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
212 858 : / "conversation_data" / repository_->id();
213 286 : fetchedPath_ = conversationDataPath_ / "fetched";
214 286 : statusPath_ = conversationDataPath_ / "status";
215 286 : sendingPath_ = conversationDataPath_ / "sending";
216 286 : preferencesPath_ = conversationDataPath_ / ConversationMapKeys::PREFERENCES;
217 286 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
218 286 : hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
219 286 : loadActiveCalls();
220 286 : loadStatus();
221 286 : typers_ = std::make_shared<Typers>(account, repository_->id());
222 286 : }
223 :
224 3733 : std::string toString() const {
225 11199 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
226 : }
227 : mutable std::string fmtStr_;
228 :
229 284 : ~Impl()
230 : {
231 : try {
232 284 : if (fallbackTimer_)
233 284 : fallbackTimer_->cancel();
234 0 : } catch (const std::exception& e) {
235 0 : JAMI_ERROR("{:s} {:s}", toString(), e.what());
236 0 : }
237 284 : }
238 :
239 : /**
240 : * If, for whatever reason, the daemon is stopped while hosting a conference,
241 : * we need to announce the end of this call when restarting.
242 : * To avoid to keep active calls forever.
243 : */
244 : std::vector<std::string> commitsEndedCalls();
245 : bool isAdmin() const;
246 : std::filesystem::path repoPath() const;
247 :
248 209 : void announce(const std::string& commitId, bool commitFromSelf = false)
249 : {
250 209 : std::vector<std::string> vec;
251 209 : if (!commitId.empty())
252 208 : vec.emplace_back(commitId);
253 209 : announce(vec, commitFromSelf);
254 209 : }
255 :
256 221 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
257 : {
258 221 : std::vector<ConversationCommit> convcommits;
259 221 : convcommits.reserve(commits.size());
260 453 : for (const auto& cid : commits) {
261 232 : auto commit = repository_->getCommit(cid);
262 232 : if (commit != std::nullopt) {
263 232 : convcommits.emplace_back(*commit);
264 : }
265 232 : }
266 221 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
267 221 : }
268 :
269 : /**
270 : * Initialize activeCalls_ from the list of commits in the repository
271 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
272 : */
273 143 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
274 : {
275 143 : std::unordered_set<std::string> invalidHostUris;
276 143 : std::unordered_set<std::string> invalidCallIds;
277 :
278 143 : std::lock_guard lk(activeCallsMtx_);
279 1043 : for (const auto& commit : commits) {
280 900 : if (commit.at("type") == "member") {
281 : // Each commit of type "member" has an "action" field whose value can be one
282 : // of the following: "add", "join", "remove", "ban", "unban"
283 : // In the case of "remove" and "ban", we need to add the member's URI to
284 : // invalidHostUris to ensure that any call they may have started in the past
285 : // is no longer considered active.
286 : // For the other actions, there's no harm in adding the member's URI anyway,
287 : // since it's not possible to start hosting a call before joining the swarm (or
288 : // before getting unbanned in the case of previously banned members).
289 737 : invalidHostUris.emplace(commit.at("uri"));
290 327 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
291 327 : && commit.find("device") != commit.end()) {
292 : // The commit indicates either the end or the beginning of a call
293 : // (depending on whether there is a "duration" field or not).
294 1 : auto convId = repository_->id();
295 2 : auto confId = commit.at("confId");
296 2 : auto uri = commit.at("uri");
297 2 : auto device = commit.at("device");
298 :
299 2 : if (commit.find("duration") == commit.end()
300 1 : && invalidCallIds.find(confId) == invalidCallIds.end()
301 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
302 1 : std::map<std::string, std::string> activeCall;
303 1 : activeCall["id"] = confId;
304 1 : activeCall["uri"] = uri;
305 1 : activeCall["device"] = device;
306 1 : activeCalls_.emplace_back(activeCall);
307 0 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
308 : convId,
309 : confId,
310 : device,
311 : uri);
312 1 : }
313 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
314 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
315 : // there are sometimes multiple commits indicating the beginning of the same call.)
316 1 : invalidCallIds.emplace(confId);
317 1 : }
318 : }
319 143 : saveActiveCalls();
320 143 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
321 143 : repository_->id(),
322 143 : activeCalls_);
323 143 : }
324 :
325 : /**
326 : * Update activeCalls_ via announced commits (in load or via new commits)
327 : * @param commit Commit to check
328 : * @param eraseOnly If we want to ignore added commits
329 : * @param emitSig If we want to emit to client
330 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
331 : */
332 79 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
333 : bool eraseOnly = false,
334 : bool emitSig = true) const
335 : {
336 79 : if (!repository_)
337 0 : return;
338 79 : if (commit.at("type") == "member") {
339 : // In this case, we need to check if we are not removing a hosting member or device
340 19 : std::lock_guard lk(activeCallsMtx_);
341 19 : auto it = activeCalls_.begin();
342 19 : auto updateActives = false;
343 20 : while (it != activeCalls_.end()) {
344 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
345 3 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
346 : it->at("id"),
347 : commit.at("uri"));
348 1 : it = activeCalls_.erase(it);
349 1 : updateActives = true;
350 : } else {
351 0 : ++it;
352 : }
353 : }
354 19 : if (updateActives) {
355 1 : saveActiveCalls();
356 1 : if (emitSig)
357 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
358 1 : repository_->id(),
359 1 : activeCalls_);
360 : }
361 19 : return;
362 19 : }
363 : // Else, it's a call information
364 177 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
365 177 : && commit.find("device") != commit.end()) {
366 57 : auto convId = repository_->id();
367 114 : auto confId = commit.at("confId");
368 114 : auto uri = commit.at("uri");
369 114 : auto device = commit.at("device");
370 57 : std::lock_guard lk(activeCallsMtx_);
371 57 : auto itActive = std::find_if(activeCalls_.begin(),
372 : activeCalls_.end(),
373 26 : [&](const auto& value) {
374 78 : return value.at("id") == confId
375 52 : && value.at("uri") == uri
376 130 : && value.at("device") == device;
377 : });
378 57 : if (commit.find("duration") == commit.end()) {
379 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
380 93 : JAMI_DEBUG(
381 : "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
382 : convId,
383 : confId,
384 : device,
385 : uri);
386 31 : std::map<std::string, std::string> activeCall;
387 31 : activeCall["id"] = confId;
388 31 : activeCall["uri"] = uri;
389 31 : activeCall["device"] = device;
390 31 : activeCalls_.emplace_back(activeCall);
391 31 : saveActiveCalls();
392 31 : if (emitSig)
393 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
394 : repository_
395 31 : ->id(),
396 31 : activeCalls_);
397 31 : }
398 : } else {
399 26 : if (itActive != activeCalls_.end()) {
400 26 : itActive = activeCalls_.erase(itActive);
401 : // Unlikely, but we must ensure that no duplicate exists
402 26 : while (itActive != activeCalls_.end()) {
403 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
404 0 : return value.at("id") == confId && value.at("uri") == uri
405 0 : && value.at("device") == device;
406 : });
407 0 : if (itActive != activeCalls_.end()) {
408 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
409 0 : itActive = activeCalls_.erase(itActive);
410 : }
411 : }
412 :
413 26 : if (eraseOnly) {
414 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
415 : "{:s}, account {:s}",
416 : convId,
417 : confId,
418 : device,
419 : uri);
420 : } else {
421 78 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
422 : convId,
423 : confId,
424 : device,
425 : uri);
426 : }
427 : }
428 26 : saveActiveCalls();
429 26 : if (emitSig)
430 26 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
431 26 : repository_->id(),
432 26 : activeCalls_);
433 : }
434 57 : }
435 : }
436 :
437 1072 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
438 : {
439 1072 : if (!repository_)
440 0 : return;
441 1072 : auto convId = repository_->id();
442 1072 : auto ok = !commits.empty();
443 3215 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
444 1072 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
445 1072 : if (ok) {
446 1071 : bool announceMember = false;
447 2177 : for (const auto& c : commits) {
448 : // Announce member events
449 1106 : if (c.at("type") == "member") {
450 862 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
451 862 : const auto& uri = c.at("uri");
452 862 : const auto& actionStr = c.at("action");
453 862 : auto action = -1;
454 862 : if (actionStr == "add")
455 412 : action = 0;
456 450 : else if (actionStr == "join")
457 430 : action = 1;
458 20 : else if (actionStr == "remove")
459 1 : action = 2;
460 19 : else if (actionStr == "ban")
461 18 : action = 3;
462 1 : else if (actionStr == "unban")
463 1 : action = 4;
464 862 : if (actionStr == "ban" || actionStr == "remove") {
465 : // In this case, a potential host was removed during a call.
466 19 : updateActiveCalls(c);
467 19 : typers_->removeTyper(uri);
468 : }
469 862 : if (action != -1) {
470 862 : announceMember = true;
471 1724 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(
472 862 : accountId_, convId, uri, action);
473 : }
474 : }
475 244 : } else if (c.at("type") == "application/call-history+json") {
476 60 : updateActiveCalls(c);
477 : }
478 : #ifdef ENABLE_PLUGIN
479 : auto& pluginChatManager
480 1106 : = Manager::instance().getJamiPluginManager().getChatServicesManager();
481 1106 : if (pluginChatManager.hasHandlers()) {
482 4 : auto cm = std::make_shared<JamiMessage>(accountId_,
483 : convId,
484 8 : c.at("author") != userId_,
485 : c,
486 8 : false);
487 4 : cm->isSwarm = true;
488 4 : pluginChatManager.publishMessage(std::move(cm));
489 4 : }
490 : #endif
491 : // announce message
492 1106 : emitSignal<libjami::ConversationSignal::MessageReceived>(accountId_, convId, c);
493 : }
494 :
495 1071 : if (announceMember && onMembersChanged_) {
496 857 : onMembersChanged_(repository_->memberUris("", {}));
497 : }
498 : }
499 1072 : }
500 :
501 286 : void loadStatus()
502 : {
503 : try {
504 : // read file
505 570 : auto file = fileutils::loadFile(statusPath_);
506 : // load values
507 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
508 2 : std::lock_guard lk {messageStatusMtx_};
509 2 : oh.get().convert(messagesStatus_);
510 286 : } catch (const std::exception& e) {
511 284 : }
512 286 : }
513 1000 : void saveStatus()
514 : {
515 1000 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
516 1000 : msgpack::pack(file, messagesStatus_);
517 1000 : }
518 :
519 294 : void loadActiveCalls() const
520 : {
521 : try {
522 : // read file
523 582 : auto file = fileutils::loadFile(activeCallsPath_);
524 : // load values
525 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
526 6 : std::lock_guard lk {activeCallsMtx_};
527 6 : oh.get().convert(activeCalls_);
528 294 : } catch (const std::exception& e) {
529 288 : return;
530 288 : }
531 : }
532 :
533 209 : void saveActiveCalls() const
534 : {
535 209 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
536 209 : msgpack::pack(file, activeCalls_);
537 209 : }
538 :
539 8 : void loadHostedCalls() const
540 : {
541 : try {
542 : // read file
543 13 : auto file = fileutils::loadFile(hostedCallsPath_);
544 : // load values
545 3 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
546 3 : std::lock_guard lk {activeCallsMtx_};
547 3 : oh.get().convert(hostedCalls_);
548 8 : } catch (const std::exception& e) {
549 5 : return;
550 5 : }
551 : }
552 :
553 33 : void saveHostedCalls() const
554 : {
555 33 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
556 33 : msgpack::pack(file, hostedCalls_);
557 33 : }
558 :
559 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
560 :
561 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
562 : bool includeLeft,
563 : bool includeBanned) const;
564 :
565 6157 : std::string_view bannedType(const std::string& uri) const
566 : {
567 6157 : auto repo = repoPath();
568 0 : auto crt = fmt::format("{}.crt", uri);
569 12314 : auto bannedMember = repo / "banned" / "members" / crt;
570 6157 : if (std::filesystem::is_regular_file(bannedMember))
571 15 : return "members"sv;
572 12284 : auto bannedAdmin = repo / "banned" / "admins" / crt;
573 6142 : if (std::filesystem::is_regular_file(bannedAdmin))
574 0 : return "admins"sv;
575 12284 : auto bannedInvited = repo / "banned" / "invited" / uri;
576 6142 : if (std::filesystem::is_regular_file(bannedInvited))
577 0 : return "invited"sv;
578 12284 : auto bannedDevice = repo / "banned" / "devices" / crt;
579 6142 : if (std::filesystem::is_regular_file(bannedDevice))
580 0 : return "devices"sv;
581 6142 : return {};
582 6157 : }
583 :
584 4664 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
585 : {
586 4664 : auto deviceSockets = gitSocketList_.find(deviceId);
587 9328 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
588 : }
589 :
590 2125 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
591 : {
592 2125 : gitSocketList_[deviceId] = socket;
593 2125 : }
594 750 : void removeGitSocket(const DeviceId& deviceId)
595 : {
596 750 : auto deviceSockets = gitSocketList_.find(deviceId);
597 750 : if (deviceSockets != gitSocketList_.end())
598 383 : gitSocketList_.erase(deviceSockets);
599 750 : }
600 :
601 : /**
602 : * Remove all git sockets and all DRT nodes associated with the given peer.
603 : * This is used when a swarm member is banned to ensure that we stop syncing
604 : * with them or sending them message notifications.
605 : */
606 : void disconnectFromPeer(const std::string& peerUri);
607 :
608 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
609 : bool includeLeft) const;
610 :
611 : std::mutex membersMtx_ {};
612 : std::set<std::string> checkedMembers_; // Store members we tried
613 : std::function<void()> bootstrapCb_;
614 : #ifdef LIBJAMI_TEST
615 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
616 : #endif
617 :
618 : std::mutex writeMtx_ {};
619 : std::unique_ptr<ConversationRepository> repository_;
620 : std::shared_ptr<SwarmManager> swarmManager_;
621 : std::weak_ptr<JamiAccount> account_;
622 : std::string accountId_ {};
623 : std::string userId_;
624 : std::string deviceId_;
625 : std::atomic_bool isRemoving_ {false};
626 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
627 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
628 : History* optHistory = nullptr);
629 : void pull(const std::string& deviceId);
630 : std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
631 :
632 : // Avoid multiple fetch/merges at the same time.
633 : std::mutex pullcbsMtx_ {};
634 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch
635 : std::shared_ptr<TransferManager> transferManager_ {};
636 : std::filesystem::path conversationDataPath_ {};
637 : std::filesystem::path fetchedPath_ {};
638 :
639 : // Manage last message displayed and status
640 : std::filesystem::path sendingPath_ {};
641 : std::filesystem::path preferencesPath_ {};
642 : OnMembersChanged onMembersChanged_ {};
643 :
644 : // Manage hosted calls on this device
645 : std::filesystem::path hostedCallsPath_ {};
646 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
647 : // Manage active calls for this conversation (can be hosted by other devices)
648 : std::filesystem::path activeCallsPath_ {};
649 : mutable std::mutex activeCallsMtx_ {};
650 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
651 :
652 : GitSocketList gitSocketList_ {};
653 :
654 : // Bootstrap
655 : std::shared_ptr<asio::io_context> ioContext_;
656 : std::unique_ptr<asio::steady_timer> fallbackTimer_;
657 :
658 :
659 : /**
660 : * Loaded history represents the linearized history to show for clients
661 : */
662 : History loadedHistory_ {};
663 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
664 : History& history,
665 : const std::vector<std::map<std::string, std::string>>& commits,
666 : bool messageReceived = false,
667 : bool commitFromSelf = false);
668 :
669 : void handleReaction(History& history,
670 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
671 : void handleEdition(History& history,
672 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
673 : bool messageReceived) const;
674 : bool handleMessage(History& history,
675 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
676 : bool messageReceived) const;
677 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
678 : History& history) const;
679 : /**
680 : * {uri, {
681 : * {"fetch", "commitId"},
682 : * {"fetched_ts", "timestamp"},
683 : * {"read", "commitId"},
684 : * {"read_ts", "timestamp"}
685 : * }
686 : * }
687 : */
688 : mutable std::mutex messageStatusMtx_;
689 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
690 : std::filesystem::path statusPath_ {};
691 : mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
692 : /**
693 : * Status: 0 = commited, 1 = fetched, 2 = read
694 : * This cache the curent status to add in the messages
695 : */
696 : // Note: only store int32_t cause it's easy to pass to dbus this way
697 : // memberToStatus serves as a cache for loading messages
698 : mutable std::map<std::string, int32_t> memberToStatus;
699 :
700 :
701 : // futureStatus is used to store the status for receiving messages
702 : // (because we're not sure to fetch the commit before receiving a status change for this)
703 : mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
704 : // Update internal structures regarding status
705 : void updateStatus(const std::string& uri,
706 : libjami::Account::MessageStates status,
707 : const std::string& commitId,
708 : const std::string& ts,
709 : bool emit = false);
710 :
711 :
712 : std::shared_ptr<Typers> typers_;
713 : };
714 :
715 : bool
716 16 : Conversation::Impl::isAdmin() const
717 : {
718 32 : auto adminsPath = repoPath() / "admins";
719 32 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
720 16 : }
721 :
722 : void
723 16 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
724 : {
725 : // Remove nodes from swarmManager
726 16 : const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
727 16 : std::vector<NodeId> toRemove;
728 38 : for (const auto node : nodes)
729 22 : if (peerUri == repository_->uriFromDevice(node.toString()))
730 11 : toRemove.emplace_back(node);
731 16 : swarmManager_->deleteNode(toRemove);
732 :
733 : // Remove git sockets with this member
734 37 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
735 21 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
736 11 : it = gitSocketList_.erase(it);
737 : else
738 10 : ++it;
739 : }
740 16 : }
741 :
742 : std::vector<std::map<std::string, std::string>>
743 350 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
744 : {
745 350 : std::vector<std::map<std::string, std::string>> result;
746 350 : auto members = repository_->members();
747 350 : std::lock_guard lk(messageStatusMtx_);
748 1018 : for (const auto& member : members) {
749 668 : if (member.role == MemberRole::BANNED && !includeBanned) {
750 144 : continue;
751 : }
752 668 : if (member.role == MemberRole::INVITED && !includeInvited)
753 144 : continue;
754 524 : if (member.role == MemberRole::LEFT && !includeLeft)
755 0 : continue;
756 524 : auto mm = member.map();
757 524 : mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
758 524 : result.emplace_back(std::move(mm));
759 524 : }
760 700 : return result;
761 350 : }
762 :
763 : std::vector<std::string>
764 8 : Conversation::Impl::commitsEndedCalls()
765 : {
766 : // Handle current calls
767 8 : std::vector<std::string> commits {};
768 8 : std::unique_lock lk(writeMtx_);
769 8 : std::unique_lock lkA(activeCallsMtx_);
770 8 : for (const auto& hostedCall : hostedCalls_) {
771 : // In this case, this means that we left
772 : // the conference while still hosting it, so activeCalls
773 : // will not be correctly updated
774 : // We don't need to send notifications there, as peers will sync with presence
775 0 : Json::Value value;
776 0 : value["uri"] = userId_;
777 0 : value["device"] = deviceId_;
778 0 : value["confId"] = hostedCall.first;
779 0 : value["type"] = "application/call-history+json";
780 0 : auto now = std::chrono::system_clock::now();
781 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
782 0 : .count();
783 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
784 0 : auto itActive = std::find_if(activeCalls_.begin(),
785 : activeCalls_.end(),
786 0 : [this, confId = hostedCall.first](const auto& value) {
787 0 : return value.at("id") == confId && value.at("uri") == userId_
788 0 : && value.at("device") == deviceId_;
789 : });
790 0 : if (itActive != activeCalls_.end())
791 0 : activeCalls_.erase(itActive);
792 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
793 :
794 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
795 0 : }
796 8 : hostedCalls_.clear();
797 8 : saveActiveCalls();
798 8 : saveHostedCalls();
799 16 : return commits;
800 8 : }
801 :
802 : std::filesystem::path
803 9419 : Conversation::Impl::repoPath() const
804 : {
805 18838 : return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
806 : }
807 :
808 : std::vector<std::map<std::string, std::string>>
809 8 : Conversation::Impl::loadMessages(const LogOptions& options)
810 : {
811 8 : if (!repository_)
812 0 : return {};
813 8 : std::vector<ConversationCommit> commits;
814 8 : auto startLogging = options.from == "";
815 8 : auto breakLogging = false;
816 16 : repository_->log(
817 35 : [&](const auto& id, const auto& author, const auto& commit) {
818 35 : if (!commits.empty()) {
819 : // Set linearized parent
820 27 : commits.rbegin()->linearized_parent = id;
821 : }
822 35 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
823 0 : return CallbackResult::Skip;
824 : }
825 35 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
826 0 : return CallbackResult::Break; // Stop logging
827 35 : if (breakLogging)
828 0 : return CallbackResult::Break; // Stop logging
829 35 : if (id == options.to) {
830 8 : if (options.includeTo)
831 0 : breakLogging = true; // For the next commit
832 : else
833 8 : return CallbackResult::Break; // Stop logging
834 : }
835 :
836 27 : if (!startLogging && options.from != "" && options.from == id)
837 0 : startLogging = true;
838 27 : if (!startLogging)
839 0 : return CallbackResult::Skip; // Start logging after this one
840 :
841 27 : if (options.fastLog) {
842 0 : if (options.authorUri != "") {
843 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
844 0 : return CallbackResult::Break; // Found author, stop
845 : }
846 : }
847 : // Used to only count commit
848 0 : commits.emplace(commits.end(), ConversationCommit {});
849 0 : return CallbackResult::Skip;
850 : }
851 :
852 27 : return CallbackResult::Ok; // Continue
853 : },
854 27 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
855 27 : [](auto, auto, auto) { return false; },
856 8 : options.from,
857 8 : options.logIfNotFound);
858 8 : return repository_->convCommitsToMap(commits);
859 8 : }
860 :
861 : std::vector<libjami::SwarmMessage>
862 1342 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
863 : {
864 1342 : auto history = optHistory ? optHistory : &loadedHistory_;
865 :
866 : // history->mutex is locked by the caller
867 1342 : if (!repository_ || history->loading) {
868 0 : return {};
869 : }
870 1342 : history->loading = true;
871 :
872 : // By convention, if options.nbOfCommits is zero, then we
873 : // don't impose a limit on the number of commits returned.
874 1342 : bool limitNbOfCommits = options.nbOfCommits > 0;
875 :
876 1342 : auto startLogging = options.from == "";
877 1342 : auto breakLogging = false;
878 1342 : auto currentHistorySize = loadedHistory_.messageList.size();
879 1342 : std::vector<std::string> replies;
880 1342 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
881 2684 : repository_->log(
882 : /* preCondition */
883 7389 : [&](const auto& id, const auto& author, const auto& commit) {
884 7389 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
885 0 : return CallbackResult::Skip;
886 : }
887 7389 : if (id == options.to) {
888 684 : if (options.includeTo)
889 0 : breakLogging = true; // For the next commit
890 : }
891 7389 : if (replies.empty()) { // This avoid load until
892 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
893 : // until this commit
894 14778 : if ((limitNbOfCommits
895 7978 : && (loadedHistory_.messageList.size() - currentHistorySize)
896 589 : == options.nbOfCommits))
897 3 : return CallbackResult::Break; // Stop logging
898 7386 : if (breakLogging)
899 0 : return CallbackResult::Break; // Stop logging
900 7386 : if (id == options.to && !options.includeTo) {
901 684 : return CallbackResult::Break; // Stop logging
902 : }
903 : }
904 :
905 6702 : if (!startLogging && options.from != "" && options.from == id)
906 999 : startLogging = true;
907 6702 : if (!startLogging)
908 12 : return CallbackResult::Skip; // Start logging after this one
909 :
910 6690 : if (options.fastLog) {
911 6096 : if (options.authorUri != "") {
912 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
913 0 : return CallbackResult::Break; // Found author, stop
914 : }
915 : }
916 : }
917 :
918 6690 : return CallbackResult::Ok; // Continue
919 : },
920 : /* emplaceCb */
921 6690 : [&](auto&& cc) {
922 6690 : if(limitNbOfCommits && (msgList.size() == options.nbOfCommits))
923 247 : return;
924 6444 : auto optMessage = repository_->convCommitToMap(cc);
925 6444 : if (!optMessage.has_value())
926 1 : return;
927 6443 : auto message = optMessage.value();
928 6443 : if (message.find("reply-to") != message.end()) {
929 0 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
930 0 : if(it == replies.end()) {
931 0 : replies.emplace_back(message.at("reply-to"));
932 : }
933 : }
934 6443 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
935 6443 : if (it != replies.end()) {
936 0 : replies.erase(it);
937 : }
938 6443 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
939 6443 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
940 0 : firstMsg = *loadedHistory_.messageList.rbegin();
941 : }
942 19329 : auto added = addToHistory(*history, {message}, false, false);
943 6443 : if (!added.empty() && firstMsg) {
944 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
945 0 : repository_->id(),
946 0 : *firstMsg);
947 : }
948 6443 : msgList.insert(msgList.end(), added.begin(), added.end());
949 6444 : },
950 : /* postCondition */
951 6690 : [&](auto, auto, auto) {
952 : // Stop logging if there was a limit set on the number of commits
953 : // to return and we reached it. This isn't strictly necessary since
954 : // the check at the beginning of `emplaceCb` ensures that we won't
955 : // return too many messages, but it prevents us from needlessly
956 : // iterating over a (potentially) large number of commits.
957 6690 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
958 : },
959 1342 : options.from,
960 1342 : options.logIfNotFound);
961 :
962 1342 : history->loading = false;
963 1342 : history->cv.notify_all();
964 :
965 : // Convert for client (remove ptr)
966 1342 : std::vector<libjami::SwarmMessage> ret;
967 1342 : ret.reserve(msgList.size());
968 7782 : for (const auto& msg: msgList) {
969 6440 : ret.emplace_back(*msg);
970 : }
971 1342 : return ret;
972 1342 : }
973 :
974 : void
975 0 : Conversation::Impl::handleReaction(History& history,
976 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
977 : {
978 0 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
979 0 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
980 0 : if (peditIt != history.pendingEditions.end()) {
981 0 : auto oldBody = sharedCommit->body;
982 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
983 0 : if (sharedCommit->body.at("body").empty())
984 0 : return;
985 0 : history.pendingEditions.erase(peditIt);
986 0 : }
987 0 : if (it != history.quickAccess.end()) {
988 0 : it->second->reactions.emplace_back(sharedCommit->body);
989 0 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
990 0 : repository_->id(),
991 0 : it->second->id,
992 0 : sharedCommit->body);
993 : } else {
994 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
995 : }
996 : }
997 :
998 : void
999 1 : Conversation::Impl::handleEdition(History& history,
1000 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1001 : bool messageReceived) const
1002 : {
1003 2 : auto editId = sharedCommit->body.at("edit");
1004 1 : auto it = history.quickAccess.find(editId);
1005 1 : if (it != history.quickAccess.end()) {
1006 1 : auto baseCommit = it->second;
1007 1 : if (baseCommit) {
1008 1 : auto itReact = baseCommit->body.find("react-to");
1009 1 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
1010 2 : "tid" : "body";
1011 1 : auto body = sharedCommit->body.at(toReplace);
1012 : // Edit reaction
1013 1 : if (itReact != baseCommit->body.end()) {
1014 0 : baseCommit->body[toReplace] = body; // Replace body if pending
1015 0 : it = history.quickAccess.find(itReact->second);
1016 0 : auto itPending = history.pendingReactions.find(itReact->second);
1017 0 : if (it != history.quickAccess.end()) {
1018 0 : baseCommit = it->second; // Base commit
1019 0 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1020 0 : baseCommit->reactions.end(),
1021 0 : [&](const auto& reaction) {
1022 0 : return reaction.at("id") == editId;
1023 : });
1024 0 : if (itPreviousReact != baseCommit->reactions.end()) {
1025 0 : (*itPreviousReact)[toReplace] = body;
1026 0 : if (body.empty()) {
1027 0 : baseCommit->reactions.erase(itPreviousReact);
1028 0 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
1029 : repository_
1030 0 : ->id(),
1031 0 : baseCommit->id,
1032 : editId);
1033 : }
1034 : }
1035 0 : } else if (itPending != history.pendingReactions.end()) {
1036 : // Else edit if pending
1037 0 : auto itReaction = std::find_if(itPending->second.begin(),
1038 0 : itPending->second.end(),
1039 0 : [&](const auto& reaction) {
1040 0 : return reaction.at("id") == editId;
1041 : });
1042 0 : if (itReaction != itPending->second.end()) {
1043 0 : (*itReaction)[toReplace] = body;
1044 0 : if (body.empty())
1045 0 : itPending->second.erase(itReaction);
1046 : }
1047 : } else {
1048 : // Add to pending edtions
1049 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1050 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1051 : }
1052 : } else {
1053 : // Normal message
1054 1 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1055 1 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1056 1 : if (toReplace == "tid") {
1057 : // Avoid to replace fileId in client
1058 1 : it->second->body["fileId"] = "";
1059 : }
1060 : // Remove reactions
1061 1 : if (sharedCommit->body.at(toReplace).empty())
1062 1 : it->second->reactions.clear();
1063 1 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1064 : }
1065 1 : }
1066 1 : } else {
1067 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1068 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1069 : }
1070 1 : }
1071 :
1072 : bool
1073 7529 : Conversation::Impl::handleMessage(History& history,
1074 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1075 : bool messageReceived) const
1076 : {
1077 7529 : if (messageReceived) {
1078 : // For a received message, we place it at the beginning of the list
1079 1089 : if (!history.messageList.empty())
1080 900 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1081 1089 : history.messageList.emplace_front(sharedCommit);
1082 : } else {
1083 : // For a loaded message, we load from newest to oldest
1084 : // So we change the parent of the last message.
1085 6440 : if (!history.messageList.empty())
1086 5100 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1087 6440 : history.messageList.emplace_back(sharedCommit);
1088 : }
1089 : // Handle pending reactions/editions
1090 7529 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1091 7529 : if (reactIt != history.pendingReactions.end()) {
1092 0 : for (const auto& commitBody : reactIt->second)
1093 0 : sharedCommit->reactions.emplace_back(commitBody);
1094 0 : history.pendingReactions.erase(reactIt);
1095 : }
1096 7529 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1097 7529 : if (peditIt != history.pendingEditions.end()) {
1098 0 : auto oldBody = sharedCommit->body;
1099 0 : if (sharedCommit->type == "application/data-transfer+json") {
1100 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1101 0 : sharedCommit->body["fileId"] = "";
1102 : } else {
1103 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1104 : }
1105 0 : peditIt->second.pop_front();
1106 0 : for (const auto& commit : peditIt->second) {
1107 0 : sharedCommit->editions.emplace_back(commit->body);
1108 : }
1109 0 : sharedCommit->editions.emplace_back(oldBody);
1110 0 : history.pendingEditions.erase(peditIt);
1111 0 : }
1112 : // Announce to client
1113 7529 : if (messageReceived)
1114 2178 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
1115 1089 : repository_->id(),
1116 1089 : *sharedCommit);
1117 7529 : return !messageReceived;
1118 : }
1119 :
1120 7530 : void Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
1121 : History& history) const
1122 : {
1123 :
1124 7530 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1125 7530 : auto currentMessage = message;
1126 :
1127 19398 : while(parentIt != history.quickAccess.end()){
1128 11868 : const auto& parent = parentIt->second;
1129 12064 : for (const auto& [peer, value] : message->status) {
1130 11062 : auto parentStatusIt = parent->status.find(peer);
1131 11062 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1132 196 : parent->status[peer] = value;
1133 392 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
1134 196 : accountId_,
1135 196 : repository_->id(),
1136 : peer,
1137 196 : parent->id,
1138 : value);
1139 : }
1140 10866 : else if(parentStatusIt->second >= value){
1141 10866 : break;
1142 : }
1143 : }
1144 11868 : currentMessage = parent;
1145 11868 : parentIt = history.quickAccess.find(parent->linearizedParent);
1146 : }
1147 7530 : }
1148 :
1149 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1150 7515 : Conversation::Impl::addToHistory(History& history,
1151 : const std::vector<std::map<std::string, std::string>>& commits,
1152 : bool messageReceived,
1153 : bool commitFromSelf)
1154 : {
1155 : //
1156 : // NOTE: This function makes the following assumptions on its arguments:
1157 : // - The messages in "history" are in reverse chronological order (newest message
1158 : // first, oldest message last).
1159 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1160 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1161 : // They are therefore inserted at the beginning of the message list.
1162 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1163 : // reverse chronological order (newest to oldest) and to be older than the ones in
1164 : // "history". They are therefore appended at the end of the message list.
1165 : //
1166 7515 : auto acc = account_.lock();
1167 7515 : if (!acc)
1168 0 : return {};
1169 7515 : auto username = acc->getUsername();
1170 7515 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1171 0 : std::unique_lock lk(history.mutex);
1172 0 : history.cv.wait(lk, [&] { return !history.loading; });
1173 0 : }
1174 :
1175 : // Only set messages' status on history for client
1176 7515 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1177 :
1178 7515 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1179 15064 : for (const auto& commit : commits) {
1180 15098 : auto commitId = commit.at("id");
1181 7549 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1182 4 : continue; // Already present
1183 7545 : auto typeIt = commit.find("type");
1184 : // Nothing to show for the client, skip
1185 7545 : if (typeIt != commit.end() && typeIt->second == "merge")
1186 15 : continue;
1187 :
1188 7530 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1189 7530 : sharedCommit->fromMapStringString(commit);
1190 :
1191 7530 : if (needToSetMessageStatus) {
1192 : // Check if we already have status information for the commit.
1193 866 : auto itFuture = futureStatus.find(sharedCommit->id);
1194 866 : if (itFuture != futureStatus.end()) {
1195 12 : sharedCommit->status = std::move(itFuture->second);
1196 12 : futureStatus.erase(itFuture);
1197 : }
1198 : }
1199 :
1200 7530 : sharedCommits.emplace_back(sharedCommit);
1201 7549 : }
1202 :
1203 7515 : if (needToSetMessageStatus) {
1204 859 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1205 859 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1206 859 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1207 :
1208 859 : std::lock_guard lk(messageStatusMtx_);
1209 13326 : for (const auto& member: repository_->members()) {
1210 : // For each member, we iterate over the commits to add in reverse chronological
1211 : // order (i.e. from newest to oldest) and set their status from the point of view
1212 : // of that member (as best we can given the information we have).
1213 : //
1214 : // The key assumption made in order to compute the status is that it can never decrease
1215 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1216 : // therefore start by setting the "status" variable below to the lowest possible value,
1217 : // and increase it when we encounter a commit for which it is justified to do so.
1218 : //
1219 : // If messageReceived is true, then the commits we are adding are the most recent in the
1220 : // conversation history, so the lowest possible value is SENDING.
1221 : //
1222 : // If messageReceived is false, then the commits we are adding are older than the ones
1223 : // that are already in the history, so the lowest possible value is the status of the
1224 : // oldest message in the history so far, which is stored in memberToStatus.
1225 12467 : auto status = SENDING;
1226 12467 : if (!messageReceived) {
1227 12 : auto cache = memberToStatus[member.uri];
1228 12 : if (cache > status)
1229 9 : status = cache;
1230 : }
1231 :
1232 24955 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1233 12488 : auto sharedCommit = *it;
1234 12488 : auto previousStatus = status;
1235 :
1236 : // Compute status for the current commit.
1237 12488 : if (status < SENT && messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1238 11 : status = SENT;
1239 : }
1240 12488 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1241 1 : status = DISPLAYED;
1242 : }
1243 12488 : if (member.uri == sharedCommit->body.at("author")) {
1244 865 : status = DISPLAYED;
1245 : }
1246 12488 : if(status < sharedCommit->status[member.uri]){
1247 1 : status = sharedCommit->status[member.uri];
1248 : }
1249 :
1250 : // Store computed value.
1251 12488 : sharedCommit->status[member.uri] = status;
1252 :
1253 : // Update messagesStatus_ if needed.
1254 12488 : if (previousStatus == SENDING && status >= SENT) {
1255 866 : messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1256 : }
1257 12488 : if (previousStatus <= SENT && status == DISPLAYED) {
1258 855 : messagesStatus_[member.uri]["read"] = sharedCommit->id;
1259 : }
1260 12488 : }
1261 :
1262 12467 : if (!messageReceived) {
1263 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1264 12 : memberToStatus[member.uri] = status;
1265 : }
1266 859 : }
1267 859 : }
1268 :
1269 7515 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1270 15045 : for (const auto& sharedCommit : sharedCommits) {
1271 7530 : history.quickAccess[sharedCommit->id] = sharedCommit;
1272 :
1273 7530 : auto reactToIt = sharedCommit->body.find("react-to");
1274 7530 : auto editIt = sharedCommit->body.find("edit");
1275 7530 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1276 0 : handleReaction(history, sharedCommit);
1277 7530 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1278 1 : handleEdition(history, sharedCommit, messageReceived);
1279 7529 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1280 6440 : messages.emplace_back(sharedCommit);
1281 : }
1282 7530 : rectifyStatus(sharedCommit, history);
1283 : }
1284 :
1285 7515 : return messages;
1286 7515 : }
1287 :
1288 135 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1289 : ConversationMode mode,
1290 135 : const std::string& otherMember)
1291 135 : : pimpl_ {new Impl {account, mode, otherMember}}
1292 135 : {}
1293 :
1294 8 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1295 8 : const std::string& conversationId)
1296 8 : : pimpl_ {new Impl {account, conversationId}}
1297 8 : {}
1298 :
1299 144 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1300 : const std::string& remoteDevice,
1301 144 : const std::string& conversationId)
1302 144 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1303 144 : {}
1304 :
1305 284 : Conversation::~Conversation() {}
1306 :
1307 : std::string
1308 4106 : Conversation::id() const
1309 : {
1310 4106 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1311 : }
1312 :
1313 : void
1314 110 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1315 : {
1316 : try {
1317 110 : if (mode() == ConversationMode::ONE_TO_ONE) {
1318 : // Only authorize to add left members
1319 1 : auto initialMembers = getInitialMembers();
1320 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1321 1 : if (it == initialMembers.end()) {
1322 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1323 1 : cb(false, "");
1324 1 : return;
1325 : }
1326 1 : }
1327 0 : } catch (const std::exception& e) {
1328 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1329 0 : cb(false, "");
1330 0 : return;
1331 0 : }
1332 109 : if (isMember(contactUri, true)) {
1333 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1334 0 : cb(false, "");
1335 0 : return;
1336 : }
1337 109 : if (isBanned(contactUri)) {
1338 2 : if (pimpl_->isAdmin()) {
1339 3 : dht::ThreadPool::io().run(
1340 2 : [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1341 1 : if (auto sthis = w.lock()) {
1342 1 : auto members = sthis->pimpl_->repository_->members();
1343 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1344 1 : if (type.empty()) {
1345 0 : cb(false, {});
1346 0 : return;
1347 : }
1348 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1349 2 : }
1350 : });
1351 : } else {
1352 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1353 1 : cb(false, "");
1354 : }
1355 2 : return;
1356 : }
1357 :
1358 107 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1359 107 : if (auto sthis = w.lock()) {
1360 : // Add member files and commit
1361 107 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1362 107 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1363 107 : sthis->pimpl_->announce(commit, true);
1364 107 : lk.unlock();
1365 107 : if (cb)
1366 107 : cb(!commit.empty(), commit);
1367 214 : }
1368 107 : });
1369 : }
1370 :
1371 : std::shared_ptr<dhtnet::ChannelSocket>
1372 4664 : Conversation::gitSocket(const DeviceId& deviceId) const
1373 : {
1374 4664 : return pimpl_->gitSocket(deviceId);
1375 : }
1376 :
1377 : void
1378 2125 : Conversation::addGitSocket(const DeviceId& deviceId,
1379 : const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1380 : {
1381 2125 : pimpl_->addGitSocket(deviceId, socket);
1382 2125 : }
1383 :
1384 : void
1385 750 : Conversation::removeGitSocket(const DeviceId& deviceId)
1386 : {
1387 750 : pimpl_->removeGitSocket(deviceId);
1388 750 : }
1389 :
1390 : void
1391 290 : Conversation::shutdownConnections()
1392 : {
1393 290 : pimpl_->fallbackTimer_->cancel();
1394 290 : pimpl_->gitSocketList_.clear();
1395 290 : if (pimpl_->swarmManager_)
1396 290 : pimpl_->swarmManager_->shutdown();
1397 290 : std::lock_guard lk(pimpl_->membersMtx_);
1398 290 : pimpl_->checkedMembers_.clear();
1399 290 : }
1400 :
1401 : void
1402 0 : Conversation::connectivityChanged()
1403 : {
1404 0 : if (pimpl_->swarmManager_)
1405 0 : pimpl_->swarmManager_->maintainBuckets();
1406 0 : }
1407 :
1408 : std::vector<jami::DeviceId>
1409 0 : Conversation::getDeviceIdList() const
1410 : {
1411 0 : return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1412 : }
1413 :
1414 : std::shared_ptr<Typers>
1415 9 : Conversation::typers() const
1416 : {
1417 9 : return pimpl_->typers_;
1418 : }
1419 :
1420 : bool
1421 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1422 : {
1423 0 : if (!pimpl_->swarmManager_)
1424 0 : return false;
1425 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1426 : }
1427 :
1428 : void
1429 1 : Conversation::Impl::voteUnban(const std::string& contactUri,
1430 : const std::string_view type,
1431 : const OnDoneCb& cb)
1432 : {
1433 : // Check if admin
1434 1 : if (!isAdmin()) {
1435 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1436 0 : cb(false, {});
1437 0 : return;
1438 : }
1439 :
1440 : // Vote for removal
1441 1 : std::unique_lock lk(writeMtx_);
1442 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1443 1 : if (voteCommit.empty()) {
1444 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1445 0 : cb(false, "");
1446 0 : return;
1447 : }
1448 :
1449 1 : auto lastId = voteCommit;
1450 1 : std::vector<std::string> commits;
1451 1 : commits.emplace_back(voteCommit);
1452 :
1453 : // If admin, check vote
1454 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1455 1 : if (!resolveCommit.empty()) {
1456 1 : commits.emplace_back(resolveCommit);
1457 1 : lastId = resolveCommit;
1458 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1459 : }
1460 1 : announce(commits, true);
1461 1 : lk.unlock();
1462 1 : if (cb)
1463 1 : cb(!lastId.empty(), lastId);
1464 1 : }
1465 :
1466 : void
1467 13 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1468 : {
1469 39 : dht::ThreadPool::io().run([w = weak(),
1470 13 : contactUri = std::move(contactUri),
1471 13 : isDevice = std::move(isDevice),
1472 13 : cb = std::move(cb)] {
1473 13 : if (auto sthis = w.lock()) {
1474 : // Check if admin
1475 13 : if (!sthis->pimpl_->isAdmin()) {
1476 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1477 1 : cb(false, {});
1478 2 : return;
1479 : }
1480 :
1481 : // Get current user type
1482 12 : std::string type;
1483 12 : if (isDevice) {
1484 0 : type = "devices";
1485 : } else {
1486 12 : auto members = sthis->pimpl_->repository_->members();
1487 25 : for (const auto& member : members) {
1488 25 : if (member.uri == contactUri) {
1489 12 : if (member.role == MemberRole::INVITED) {
1490 2 : type = "invited";
1491 10 : } else if (member.role == MemberRole::ADMIN) {
1492 1 : type = "admins";
1493 9 : } else if (member.role == MemberRole::MEMBER) {
1494 9 : type = "members";
1495 : }
1496 12 : break;
1497 : }
1498 : }
1499 12 : if (type.empty()) {
1500 0 : cb(false, {});
1501 0 : return;
1502 : }
1503 12 : }
1504 :
1505 : // Vote for removal
1506 12 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1507 12 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1508 12 : if (voteCommit.empty()) {
1509 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1510 1 : cb(false, "");
1511 1 : return;
1512 : }
1513 :
1514 11 : auto lastId = voteCommit;
1515 11 : std::vector<std::string> commits;
1516 11 : commits.emplace_back(voteCommit);
1517 :
1518 : // If admin, check vote
1519 22 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1520 11 : if (!resolveCommit.empty()) {
1521 11 : commits.emplace_back(resolveCommit);
1522 11 : lastId = resolveCommit;
1523 11 : JAMI_WARN("Vote solved for %s. %s banned",
1524 : contactUri.c_str(),
1525 : isDevice ? "Device" : "Member");
1526 11 : sthis->pimpl_->disconnectFromPeer(contactUri);
1527 : }
1528 :
1529 11 : sthis->pimpl_->announce(commits, true);
1530 11 : lk.unlock();
1531 11 : cb(!lastId.empty(), lastId);
1532 27 : }
1533 : });
1534 13 : }
1535 :
1536 : std::vector<std::map<std::string, std::string>>
1537 350 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1538 : {
1539 350 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1540 : }
1541 :
1542 : std::set<std::string>
1543 1765 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1544 : {
1545 1765 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1546 : }
1547 :
1548 : std::vector<NodeId>
1549 1563 : Conversation::peersToSyncWith() const
1550 : {
1551 1563 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1552 1563 : const auto& nodes = routingTable.getNodes();
1553 1563 : const auto& mobiles = routingTable.getMobileNodes();
1554 1563 : std::vector<NodeId> s;
1555 1563 : s.reserve(nodes.size() + mobiles.size());
1556 1563 : s.insert(s.end(), nodes.begin(), nodes.end());
1557 1563 : s.insert(s.end(), mobiles.begin(), mobiles.end());
1558 13264 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1559 11701 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1560 5764 : s.emplace_back(deviceId);
1561 3126 : return s;
1562 1563 : }
1563 :
1564 : bool
1565 1563 : Conversation::isBootstraped() const
1566 : {
1567 1563 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1568 1563 : return !routingTable.getNodes().empty();
1569 : }
1570 :
1571 : std::string
1572 13751 : Conversation::uriFromDevice(const std::string& deviceId) const
1573 : {
1574 13751 : return pimpl_->repository_->uriFromDevice(deviceId);
1575 : }
1576 :
1577 : void
1578 0 : Conversation::monitor()
1579 : {
1580 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1581 0 : }
1582 :
1583 : std::string
1584 143 : Conversation::join()
1585 : {
1586 143 : return pimpl_->repository_->join();
1587 : }
1588 :
1589 : bool
1590 3246 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1591 : {
1592 3246 : auto repoPath = pimpl_->repoPath();
1593 3246 : auto invitedPath = repoPath / "invited";
1594 3246 : auto adminsPath = repoPath / "admins";
1595 3246 : auto membersPath = repoPath / "members";
1596 12984 : std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1597 3246 : if (includeInvited)
1598 2635 : pathsToCheck.emplace_back(invitedPath);
1599 6956 : for (const auto& path : pathsToCheck) {
1600 27173 : for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1601 23463 : std::string_view crtUri = certificate;
1602 23463 : auto crtIt = crtUri.find(".crt");
1603 23463 : if (path != invitedPath && crtIt == std::string_view::npos) {
1604 0 : JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1605 0 : continue;
1606 0 : }
1607 23463 : if (crtIt != std::string_view::npos)
1608 22951 : crtUri = crtUri.substr(0, crtIt);
1609 23463 : if (crtUri == uri)
1610 2864 : return true;
1611 6574 : }
1612 : }
1613 :
1614 382 : if (includeInvited && mode() == ConversationMode::ONE_TO_ONE) {
1615 3 : for (const auto& member : getInitialMembers()) {
1616 2 : if (member == uri)
1617 0 : return true;
1618 1 : }
1619 : }
1620 :
1621 382 : return false;
1622 3246 : }
1623 :
1624 : bool
1625 6156 : Conversation::isBanned(const std::string& uri) const
1626 : {
1627 6156 : return !pimpl_->bannedType(uri).empty();
1628 : }
1629 :
1630 : void
1631 0 : Conversation::sendMessage(std::string&& message,
1632 : const std::string& type,
1633 : const std::string& replyTo,
1634 : OnCommitCb&& onCommit,
1635 : OnDoneCb&& cb)
1636 : {
1637 0 : Json::Value json;
1638 0 : json["body"] = std::move(message);
1639 0 : json["type"] = type;
1640 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1641 0 : }
1642 :
1643 : void
1644 98 : Conversation::sendMessage(Json::Value&& value,
1645 : const std::string& replyTo,
1646 : OnCommitCb&& onCommit,
1647 : OnDoneCb&& cb)
1648 : {
1649 98 : if (!replyTo.empty()) {
1650 0 : auto commit = pimpl_->repository_->getCommit(replyTo);
1651 0 : if (commit == std::nullopt) {
1652 0 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1653 0 : return;
1654 : }
1655 0 : value["reply-to"] = replyTo;
1656 0 : }
1657 392 : dht::ThreadPool::io().run(
1658 294 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1659 98 : if (auto sthis = w.lock()) {
1660 98 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1661 98 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1662 98 : lk.unlock();
1663 98 : if (onCommit)
1664 13 : onCommit(commit);
1665 98 : sthis->pimpl_->announce(commit, true);
1666 98 : if (cb)
1667 98 : cb(!commit.empty(), commit);
1668 196 : }
1669 98 : });
1670 : }
1671 :
1672 : void
1673 0 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1674 : {
1675 0 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1676 0 : if (auto sthis = w.lock()) {
1677 0 : std::vector<std::string> commits;
1678 0 : commits.reserve(messages.size());
1679 0 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1680 0 : for (const auto& message : messages) {
1681 0 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1682 0 : commits.emplace_back(std::move(commit));
1683 0 : }
1684 0 : lk.unlock();
1685 0 : sthis->pimpl_->announce(commits, true);
1686 0 : if (cb)
1687 0 : cb(commits);
1688 0 : }
1689 0 : });
1690 0 : }
1691 :
1692 : std::optional<std::map<std::string, std::string>>
1693 12781 : Conversation::getCommit(const std::string& commitId) const
1694 : {
1695 12781 : auto commit = pimpl_->repository_->getCommit(commitId);
1696 12781 : if (commit == std::nullopt)
1697 1731 : return std::nullopt;
1698 11050 : return pimpl_->repository_->convCommitToMap(*commit);
1699 12781 : }
1700 :
1701 : void
1702 0 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1703 : {
1704 0 : if (!cb)
1705 0 : return;
1706 0 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1707 0 : if (auto sthis = w.lock()) {
1708 0 : cb(sthis->pimpl_->loadMessages(options));
1709 0 : }
1710 0 : });
1711 : }
1712 :
1713 : void
1714 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1715 : {
1716 2 : if (!cb)
1717 0 : return;
1718 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1719 2 : if (auto sthis = w.lock()) {
1720 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1721 2 : auto result = sthis->pimpl_->loadMessages2(options);
1722 2 : lk.unlock();
1723 2 : cb(std::move(result));
1724 4 : }
1725 2 : });
1726 : }
1727 :
1728 : void
1729 0 : Conversation::clearCache()
1730 : {
1731 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1732 0 : pimpl_->loadedHistory_.messageList.clear();
1733 0 : pimpl_->loadedHistory_.quickAccess.clear();
1734 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1735 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1736 : {
1737 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1738 0 : pimpl_->memberToStatus.clear();
1739 0 : }
1740 0 : }
1741 :
1742 : std::string
1743 2261 : Conversation::lastCommitId() const
1744 : {
1745 : {
1746 2261 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1747 2261 : if (!pimpl_->loadedHistory_.messageList.empty())
1748 3842 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1749 2261 : }
1750 340 : LogOptions options;
1751 340 : options.nbOfCommits = 1;
1752 340 : options.skipMerge = true;
1753 340 : History optHistory;
1754 340 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1755 340 : auto res = pimpl_->loadMessages2(options, &optHistory);
1756 340 : if (res.empty())
1757 0 : return {};
1758 680 : return (*optHistory.messageList.begin())->id;
1759 340 : }
1760 :
1761 : std::vector<std::map<std::string, std::string>>
1762 1844 : Conversation::Impl::mergeHistory(const std::string& uri)
1763 : {
1764 1844 : if (not repository_) {
1765 0 : JAMI_WARNING("{} Invalid repo. Abort merge", toString());
1766 0 : return {};
1767 : }
1768 3688 : auto remoteHead = repository_->remoteHead(uri);
1769 1844 : if (remoteHead.empty()) {
1770 0 : JAMI_WARNING("{} Unable to get HEAD of {}", toString(), uri);
1771 0 : return {};
1772 : }
1773 :
1774 : // Validate commit
1775 1844 : auto [newCommits, err] = repository_->validFetch(uri);
1776 1844 : if (newCommits.empty()) {
1777 992 : if (err)
1778 24 : JAMI_ERROR("{} Unable to validate history with {}", toString(), uri);
1779 992 : repository_->removeBranchWith(uri);
1780 992 : return {};
1781 : }
1782 :
1783 : // If validated, merge
1784 852 : auto [ok, cid] = repository_->merge(remoteHead);
1785 852 : if (!ok) {
1786 3 : JAMI_ERROR("{} Unable to merge history with {}", toString(), uri);
1787 1 : repository_->removeBranchWith(uri);
1788 1 : return {};
1789 : }
1790 851 : if (!cid.empty()) {
1791 : // A merge commit was generated, should be added in new commits
1792 7 : auto commit = repository_->getCommit(cid);
1793 7 : if (commit != std::nullopt)
1794 7 : newCommits.emplace_back(*commit);
1795 7 : }
1796 :
1797 2553 : JAMI_LOG("{} Successfully merged history with {:s}", toString(), uri);
1798 851 : auto result = repository_->convCommitsToMap(newCommits);
1799 1716 : for (auto& commit : result) {
1800 865 : auto it = commit.find("type");
1801 865 : if (it != commit.end() && it->second == "member") {
1802 738 : repository_->refreshMembers();
1803 :
1804 738 : if (commit["action"] == "ban")
1805 5 : disconnectFromPeer(commit["uri"]);
1806 : }
1807 : }
1808 851 : return result;
1809 1844 : }
1810 :
1811 : bool
1812 1982 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1813 : {
1814 1982 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1815 1982 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1816 1982 : auto& pullcbs = it->second;
1817 1982 : auto itPull = std::find_if(pullcbs.begin(),
1818 0 : pullcbs.end(),
1819 3964 : [&](const auto& elem) { return std::get<0>(elem) == commitId; });
1820 1982 : if (itPull != pullcbs.end()) {
1821 0 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress", pimpl_->toString(), deviceId, commitId);
1822 0 : cb(false);
1823 0 : return false;
1824 : }
1825 5946 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1826 1982 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1827 1982 : if (notInProgress)
1828 1961 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1829 1961 : if (auto sthis_ = w.lock())
1830 1961 : sthis_->pimpl_->pull(deviceId);
1831 1961 : });
1832 1982 : return true;
1833 1982 : }
1834 :
1835 : void
1836 1961 : Conversation::Impl::pull(const std::string& deviceId)
1837 : {
1838 1961 : auto& repo = repository_;
1839 :
1840 1961 : std::string commitId;
1841 1961 : OnPullCb cb;
1842 : while (true) {
1843 : {
1844 3943 : std::lock_guard lk(pullcbsMtx_);
1845 3943 : auto it = fetchingRemotes_.find(deviceId);
1846 3943 : if (it == fetchingRemotes_.end()) {
1847 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1848 0 : break;
1849 : }
1850 3943 : auto& pullcbs = it->second;
1851 3943 : if (pullcbs.empty()) {
1852 1961 : fetchingRemotes_.erase(it);
1853 1961 : break;
1854 : }
1855 1982 : auto& elem = pullcbs.front();
1856 1982 : commitId = std::move(std::get<0>(elem));
1857 1982 : cb = std::move(std::get<1>(elem));
1858 1982 : pullcbs.pop_front();
1859 3943 : }
1860 : // If recently fetched, the commit can already be there, so no need to do complex operations
1861 1982 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1862 110 : cb(true);
1863 138 : continue;
1864 : }
1865 : // Pull from remote
1866 1872 : auto fetched = repo->fetch(deviceId);
1867 1872 : if (!fetched) {
1868 28 : cb(false);
1869 28 : continue;
1870 : }
1871 1844 : auto oldHead = repo->getHead();
1872 1844 : std::string newHead = oldHead;
1873 1844 : std::unique_lock lk(writeMtx_);
1874 1844 : auto commits = mergeHistory(deviceId);
1875 1844 : if (!commits.empty()) {
1876 851 : newHead = commits.rbegin()->at("id");
1877 : // Note: Because clients needs to linearize the history, they need to know all commits
1878 : // that can be updated.
1879 : // In this case, all commits until the common merge base should be announced.
1880 : // The client ill need to update it's model after this.
1881 851 : std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1882 851 : auto newHeadCommit = repo->getCommit(newHead);
1883 851 : if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1884 8 : mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1885 8 : LogOptions options;
1886 8 : options.to = mergeBase;
1887 8 : auto updatedCommits = loadMessages(options);
1888 : // We announce commits from oldest to update to newest. This generally avoid
1889 : // to get detached commits until they are all announced.
1890 8 : std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1891 8 : announce(updatedCommits);
1892 8 : } else {
1893 843 : announce(commits);
1894 : }
1895 851 : }
1896 1844 : lk.unlock();
1897 :
1898 1844 : bool commitFound = false;
1899 1844 : if (commitId != "") {
1900 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1901 : // We need to check if we actually got it; the fact that the fetch above was
1902 : // successful doesn't guarantee that we did.
1903 1480 : for (const auto& commit : commits) {
1904 854 : if (commit.at("id") == commitId) {
1905 848 : commitFound = true;
1906 848 : break;
1907 : }
1908 : }
1909 : } else {
1910 370 : commitFound = true;
1911 : }
1912 1844 : if (!commitFound)
1913 1878 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1914 : deviceId, commitId);
1915 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1916 : // for commit `commitId` to other members of the swarm. It's important that we only
1917 : // send these notifications if we actually have the commit. Otherwise, we can end up
1918 : // in a situation where the members of the swarm keep sending notifications to each
1919 : // other for a commit that none of them have (note that we are unable to rule this out, as
1920 : // nothing prevents a malicious user from intentionally sending a notification with
1921 : // a fake commit ID).
1922 1844 : if (cb)
1923 1844 : cb(commitFound);
1924 : // Announce if profile changed
1925 1844 : if (oldHead != newHead) {
1926 851 : auto diffStats = repo->diffStats(newHead, oldHead);
1927 851 : auto changedFiles = repo->changedFiles(diffStats);
1928 851 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1929 1702 : != changedFiles.end()) {
1930 3 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
1931 6 : accountId_, repo->id(), repo->infos());
1932 : }
1933 851 : }
1934 3826 : }
1935 1961 : }
1936 :
1937 : void
1938 1982 : Conversation::sync(const std::string& member,
1939 : const std::string& deviceId,
1940 : OnPullCb&& cb,
1941 : std::string commitId)
1942 : {
1943 1982 : pull(deviceId, std::move(cb), commitId);
1944 1982 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1945 1982 : auto sthis = w.lock();
1946 : // For waiting request, downloadFile
1947 1983 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1948 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1949 1982 : }
1950 1982 : });
1951 1982 : }
1952 :
1953 : std::map<std::string, std::string>
1954 211 : Conversation::generateInvitation() const
1955 : {
1956 : // Invite the new member to the conversation
1957 211 : Json::Value root;
1958 211 : auto& metadata = root[ConversationMapKeys::METADATAS];
1959 424 : for (const auto& [k, v] : infos()) {
1960 213 : if (v.size() >= 64000) {
1961 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1962 0 : continue;
1963 0 : }
1964 213 : metadata[k] = v;
1965 211 : }
1966 211 : root[ConversationMapKeys::CONVERSATIONID] = id();
1967 633 : return {{"application/invite+json", json::toString(root)}};
1968 211 : }
1969 :
1970 : std::string
1971 4 : Conversation::leave()
1972 : {
1973 4 : setRemovingFlag();
1974 4 : std::lock_guard lk(pimpl_->writeMtx_);
1975 8 : return pimpl_->repository_->leave();
1976 4 : }
1977 :
1978 : void
1979 5 : Conversation::setRemovingFlag()
1980 : {
1981 5 : pimpl_->isRemoving_ = true;
1982 5 : }
1983 :
1984 : bool
1985 3777 : Conversation::isRemoving()
1986 : {
1987 3777 : return pimpl_->isRemoving_;
1988 : }
1989 :
1990 : void
1991 16 : Conversation::erase()
1992 : {
1993 16 : if (pimpl_->conversationDataPath_ != "")
1994 16 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1995 16 : if (!pimpl_->repository_)
1996 0 : return;
1997 16 : std::lock_guard lk(pimpl_->writeMtx_);
1998 16 : pimpl_->repository_->erase();
1999 16 : }
2000 :
2001 : ConversationMode
2002 4018 : Conversation::mode() const
2003 : {
2004 4018 : return pimpl_->repository_->mode();
2005 : }
2006 :
2007 : std::vector<std::string>
2008 25 : Conversation::getInitialMembers() const
2009 : {
2010 25 : return pimpl_->repository_->getInitialMembers();
2011 : }
2012 :
2013 : bool
2014 0 : Conversation::isInitialMember(const std::string& uri) const
2015 : {
2016 0 : auto members = getInitialMembers();
2017 0 : return std::find(members.begin(), members.end(), uri) != members.end();
2018 0 : }
2019 :
2020 : void
2021 4 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2022 : {
2023 4 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2024 4 : if (auto sthis = w.lock()) {
2025 4 : auto& repo = sthis->pimpl_->repository_;
2026 4 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
2027 4 : auto commit = repo->updateInfos(map);
2028 4 : sthis->pimpl_->announce(commit, true);
2029 4 : lk.unlock();
2030 4 : if (cb)
2031 4 : cb(!commit.empty(), commit);
2032 4 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2033 8 : sthis->pimpl_->accountId_, repo->id(), repo->infos());
2034 8 : }
2035 4 : });
2036 4 : }
2037 :
2038 : std::map<std::string, std::string>
2039 254 : Conversation::infos() const
2040 : {
2041 254 : return pimpl_->repository_->infos();
2042 : }
2043 :
2044 : void
2045 1 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2046 : {
2047 1 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2048 1 : auto prefs = map;
2049 1 : auto itLast = prefs.find(LAST_MODIFIED);
2050 1 : if (itLast != prefs.end()) {
2051 0 : if (std::filesystem::is_regular_file(filePath)) {
2052 0 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
2053 : try {
2054 0 : if (lastModified >= std::stoul(itLast->second))
2055 0 : return;
2056 0 : } catch (...) {
2057 0 : return;
2058 0 : }
2059 : }
2060 0 : prefs.erase(itLast);
2061 : }
2062 :
2063 1 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2064 1 : msgpack::pack(file, prefs);
2065 2 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_,
2066 2 : id(),
2067 1 : std::move(prefs));
2068 1 : }
2069 :
2070 : std::map<std::string, std::string>
2071 74 : Conversation::preferences(bool includeLastModified) const
2072 : {
2073 : try {
2074 74 : std::map<std::string, std::string> preferences;
2075 74 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2076 146 : auto file = fileutils::loadFile(filePath);
2077 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2078 2 : oh.get().convert(preferences);
2079 2 : if (includeLastModified)
2080 1 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
2081 2 : return preferences;
2082 218 : } catch (const std::exception& e) {
2083 72 : }
2084 72 : return {};
2085 : }
2086 :
2087 : std::vector<uint8_t>
2088 0 : Conversation::vCard() const
2089 : {
2090 : try {
2091 0 : return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
2092 0 : } catch (...) {
2093 0 : }
2094 0 : return {};
2095 : }
2096 :
2097 : std::shared_ptr<TransferManager>
2098 2076 : Conversation::dataTransfer() const
2099 : {
2100 2076 : return pimpl_->transferManager_;
2101 : }
2102 :
2103 : bool
2104 13 : Conversation::onFileChannelRequest(const std::string& member,
2105 : const std::string& fileId,
2106 : std::filesystem::path& path,
2107 : std::string& sha3sum) const
2108 : {
2109 13 : if (!isMember(member))
2110 0 : return false;
2111 :
2112 13 : auto sep = fileId.find('_');
2113 13 : if (sep == std::string::npos)
2114 0 : return false;
2115 :
2116 13 : auto interactionId = fileId.substr(0, sep);
2117 13 : auto commit = getCommit(interactionId);
2118 26 : if (commit == std::nullopt || commit->find("type") == commit->end()
2119 26 : || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
2120 26 : || commit->at("type") != "application/data-transfer+json") {
2121 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2122 : pimpl_->accountId_,
2123 : member,
2124 : interactionId);
2125 0 : return false;
2126 : }
2127 :
2128 13 : path = dataTransfer()->path(fileId);
2129 13 : sha3sum = commit->at("sha3sum");
2130 :
2131 13 : return true;
2132 13 : }
2133 :
2134 : bool
2135 14 : Conversation::downloadFile(const std::string& interactionId,
2136 : const std::string& fileId,
2137 : const std::string& path,
2138 : const std::string&,
2139 : const std::string& deviceId)
2140 : {
2141 14 : auto commit = getCommit(interactionId);
2142 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2143 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2144 0 : return false;
2145 : }
2146 14 : auto tid = commit->find("tid");
2147 14 : auto sha3sum = commit->find("sha3sum");
2148 14 : auto size_str = commit->find("totalSize");
2149 :
2150 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2151 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2152 0 : return false;
2153 : }
2154 :
2155 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2156 14 : if (totalSize < 0) {
2157 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2158 0 : return false;
2159 : }
2160 :
2161 : // Be sure to not lock conversation
2162 28 : dht::ThreadPool().io().run([w = weak(),
2163 : deviceId,
2164 : fileId,
2165 : interactionId,
2166 14 : sha3sum = sha3sum->second,
2167 : path,
2168 27 : totalSize] {
2169 14 : if (auto shared = w.lock()) {
2170 14 : std::filesystem::path filePath(path);
2171 14 : if (filePath.empty()) {
2172 0 : filePath = shared->dataTransfer()->path(fileId);
2173 : }
2174 :
2175 14 : if (fileutils::size(filePath) == totalSize) {
2176 1 : if (fileutils::sha3File(filePath) == sha3sum) {
2177 3 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2178 1 : return;
2179 : }
2180 : }
2181 :
2182 13 : std::filesystem::path tempFilePath(filePath);
2183 13 : tempFilePath += ".tmp";
2184 13 : auto start = fileutils::size(tempFilePath);
2185 13 : if (start < 0)
2186 11 : start = 0;
2187 13 : size_t end = 0;
2188 :
2189 13 : auto acc = shared->pimpl_->account_.lock();
2190 13 : if (!acc)
2191 0 : return;
2192 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2193 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2194 28 : }
2195 : });
2196 14 : return true;
2197 14 : }
2198 :
2199 : void
2200 1004 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2201 : {
2202 1004 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2203 1004 : auto sthis = w.lock();
2204 1004 : if (!sthis)
2205 0 : return;
2206 : // Update fetched for Uri
2207 1004 : auto uri = sthis->uriFromDevice(deviceId);
2208 1004 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2209 31 : return;
2210 : // When a user fetches a commit, the message is sent for this person
2211 973 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2212 1035 : });
2213 1004 : }
2214 :
2215 :
2216 : void
2217 1005 : Conversation::Impl::updateStatus(const std::string& uri,
2218 : libjami::Account::MessageStates st,
2219 : const std::string& commitId,
2220 : const std::string& ts,
2221 : bool emit)
2222 : {
2223 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2224 1005 : LogOptions options;
2225 1005 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2226 : {
2227 : // Update internal structures.
2228 1005 : std::lock_guard lk(messageStatusMtx_);
2229 1005 : auto& status = messagesStatus_[uri];
2230 1005 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2231 1005 : if (oldStatus == commitId)
2232 5 : return; // Nothing to do
2233 1000 : options.to = oldStatus;
2234 1000 : options.from = commitId;
2235 1000 : oldStatus = commitId;
2236 1000 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2237 1000 : saveStatus();
2238 1000 : if (emit)
2239 979 : newStatus[uri].insert(status.begin(), status.end());
2240 1005 : }
2241 1000 : if (emit && messageStatusCb_) {
2242 979 : messageStatusCb_(newStatus);
2243 : }
2244 : // Update messages status for all commit between the old and new one
2245 1000 : options.logIfNotFound = false;
2246 1000 : options.fastLog = true;
2247 1000 : History optHistory;
2248 1000 : std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status.
2249 1000 : auto res = loadMessages2(options, &optHistory);
2250 1000 : if (res.size() == 0) {
2251 : // In this case, commit is not received yet, so we cache it
2252 2 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2253 : }
2254 7092 : for (const auto& [cid, _]: optHistory.quickAccess) {
2255 6092 : auto message = loadedHistory_.quickAccess.find(cid);
2256 6092 : if (message != loadedHistory_.quickAccess.end()) {
2257 : // Update message and emit to client,
2258 3216 : if(static_cast<int32_t>(st) > message->second->status[uri]){
2259 3216 : message->second->status[uri] = static_cast<int32_t>(st);
2260 6432 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
2261 3216 : accountId_,
2262 3216 : repository_->id(),
2263 : uri,
2264 : cid,
2265 : static_cast<int>(st));
2266 : }
2267 : } else {
2268 : // In this case, commit is not loaded by client, so we cache it
2269 : // No need to emit to client, they will get a correct status on load.
2270 2876 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2271 : }
2272 : }
2273 1010 : }
2274 :
2275 : bool
2276 12 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2277 : {
2278 12 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2279 12 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2280 1 : return false; // Nothing to do
2281 11 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2282 11 : auto sthis = w.lock();
2283 11 : if (!sthis)
2284 0 : return;
2285 11 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2286 11 : });
2287 11 : return true;
2288 12 : }
2289 :
2290 : std::map<std::string, std::map<std::string, std::string>>
2291 63 : Conversation::messageStatus() const
2292 : {
2293 63 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2294 126 : return pimpl_->messagesStatus_;
2295 63 : }
2296 :
2297 : void
2298 41 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2299 : {
2300 41 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2301 41 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2302 89 : for (const auto& [uri, status] : messageStatus) {
2303 48 : auto& oldMs = pimpl_->messagesStatus_[uri];
2304 48 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2305 18 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2306 18 : stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2307 : }
2308 : }
2309 48 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2310 3 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2311 3 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2312 : }
2313 : }
2314 : }
2315 41 : lk.unlock();
2316 :
2317 62 : for (const auto& [status, uri, commitId, ts] : stVec) {
2318 21 : pimpl_->updateStatus(uri, status, commitId, ts);
2319 : }
2320 41 : }
2321 :
2322 : void
2323 286 : Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2324 : {
2325 286 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2326 286 : pimpl_->messageStatusCb_ = cb;
2327 286 : }
2328 :
2329 : #ifdef LIBJAMI_TEST
2330 : void
2331 413 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2332 : {
2333 413 : pimpl_->bootstrapCbTest_ = cb;
2334 413 : }
2335 : #endif
2336 :
2337 : void
2338 501 : Conversation::checkBootstrapMember(const asio::error_code& ec,
2339 : std::vector<std::map<std::string, std::string>> members)
2340 : {
2341 501 : if (ec == asio::error::operation_aborted)
2342 268 : return;
2343 480 : auto acc = pimpl_->account_.lock();
2344 480 : if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2345 22 : return;
2346 : // We bootstrap the DRT with devices who already wrote in the repository.
2347 : // However, in a conversation, a large number of devices may just watch
2348 : // the conversation, but never write any message.
2349 458 : std::unique_lock lock(pimpl_->membersMtx_);
2350 :
2351 458 : std::string uri;
2352 690 : while (!members.empty()) {
2353 242 : auto member = std::move(members.back());
2354 242 : members.pop_back();
2355 242 : uri = std::move(member.at("uri"));
2356 242 : if (uri != pimpl_->userId_
2357 242 : && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2358 10 : break;
2359 242 : }
2360 226 : auto fallbackFailed = [](auto sthis) {
2361 678 : JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.",
2362 : sthis->pimpl_->toString());
2363 : #ifdef LIBJAMI_TEST
2364 226 : if (sthis->pimpl_->bootstrapCbTest_)
2365 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2366 : #endif
2367 226 : };
2368 : // If members is empty, we finished the fallback un-successfully
2369 458 : if (members.empty() && uri.empty()) {
2370 225 : lock.unlock();
2371 225 : fallbackFailed(this);
2372 225 : return;
2373 : }
2374 :
2375 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2376 233 : pimpl_->checkedMembers_.emplace(uri);
2377 233 : auto devices = std::make_shared<std::vector<NodeId>>();
2378 932 : acc->forEachDevice(
2379 466 : dht::InfoHash(uri),
2380 232 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2381 : // Test if already sent
2382 232 : if (auto sthis = w.lock()) {
2383 232 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2384 219 : devices->emplace_back(dev->getLongId());
2385 232 : }
2386 232 : },
2387 233 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2388 233 : auto sthis = w.lock();
2389 233 : if (!sthis)
2390 0 : return;
2391 233 : auto checkNext = true;
2392 233 : if (ok && devices->size() != 0) {
2393 : #ifdef LIBJAMI_TEST
2394 219 : if (sthis->pimpl_->bootstrapCbTest_)
2395 5 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2396 : #endif
2397 657 : JAMI_LOG("{} Bootstrap: Fallback with member: {}",
2398 : sthis->pimpl_->toString(),
2399 : uri);
2400 219 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2401 1 : checkNext = false;
2402 : }
2403 233 : if (checkNext) {
2404 : // Check next member
2405 232 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2406 464 : sthis->pimpl_->fallbackTimer_->async_wait(
2407 464 : std::bind(&Conversation::checkBootstrapMember,
2408 : sthis,
2409 : std::placeholders::_1,
2410 232 : std::move(members)));
2411 : } else {
2412 : // In this case, all members are checked. Fallback failed
2413 1 : fallbackFailed(sthis);
2414 : }
2415 233 : });
2416 930 : }
2417 :
2418 : void
2419 412 : Conversation::bootstrap(std::function<void()> onBootstraped,
2420 : const std::vector<DeviceId>& knownDevices)
2421 : {
2422 412 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2423 0 : return;
2424 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2425 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2426 : // If it works, the callback onConnectionChanged will be called with ok=true
2427 412 : pimpl_->bootstrapCb_ = std::move(onBootstraped);
2428 412 : std::vector<DeviceId> devices = knownDevices;
2429 1296 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2430 884 : if (!isBanned(member))
2431 884 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2432 412 : }
2433 1236 : JAMI_DEBUG("{} Bootstrap with {} device(s)",
2434 : pimpl_->toString(),
2435 : devices.size());
2436 : // set callback
2437 270 : auto fallback = [](auto sthis, bool now = false) {
2438 : // Fallback
2439 270 : auto acc = sthis->pimpl_->account_.lock();
2440 270 : if (!acc)
2441 0 : return;
2442 270 : auto members = sthis->getMembers(false, false);
2443 270 : std::shuffle(members.begin(), members.end(), acc->rand);
2444 270 : if (now) {
2445 236 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2446 : } else {
2447 34 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2448 102 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2449 68 : - std::chrono::seconds(timeForBootstrap));
2450 102 : JAMI_DEBUG("{} Fallback in {} seconds",
2451 : sthis->pimpl_->toString(),
2452 : (20 - timeForBootstrap));
2453 : }
2454 540 : sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2455 : sthis,
2456 : std::placeholders::_1,
2457 270 : std::move(members)));
2458 270 : };
2459 :
2460 412 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2461 : // This will call methods from accounts, so trigger on another thread.
2462 300 : dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2463 300 : auto sthis = w.lock();
2464 300 : if (!sthis)
2465 1 : return;
2466 299 : if (ok) {
2467 : // Bootstrap succeeded!
2468 : {
2469 265 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2470 265 : sthis->pimpl_->checkedMembers_.clear();
2471 265 : }
2472 265 : if (sthis->pimpl_->bootstrapCb_)
2473 265 : sthis->pimpl_->bootstrapCb_();
2474 : #ifdef LIBJAMI_TEST
2475 265 : if (sthis->pimpl_->bootstrapCbTest_)
2476 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2477 : #endif
2478 265 : return;
2479 : }
2480 34 : fallback(sthis);
2481 300 : });
2482 300 : });
2483 : {
2484 412 : std::lock_guard lock(pimpl_->membersMtx_);
2485 412 : pimpl_->checkedMembers_.clear();
2486 412 : }
2487 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2488 412 : if (pimpl_->swarmManager_->isShutdown()) {
2489 14 : pimpl_->swarmManager_->restart();
2490 14 : pimpl_->swarmManager_->maintainBuckets();
2491 398 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2492 236 : fallback(this, true);
2493 : }
2494 412 : }
2495 :
2496 : std::vector<std::string>
2497 8 : Conversation::commitsEndedCalls()
2498 : {
2499 8 : pimpl_->loadActiveCalls();
2500 8 : pimpl_->loadHostedCalls();
2501 8 : auto commits = pimpl_->commitsEndedCalls();
2502 8 : if (!commits.empty()) {
2503 : // Announce to client
2504 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2505 0 : if (auto sthis = w.lock())
2506 0 : sthis->pimpl_->announce(commits, true);
2507 0 : });
2508 : }
2509 8 : return commits;
2510 0 : }
2511 :
2512 : void
2513 286 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2514 : {
2515 286 : pimpl_->onMembersChanged_ = std::move(cb);
2516 286 : pimpl_->repository_->onMembersChanged([w=weak()] (const std::set<std::string>& memberUris) {
2517 982 : if (auto sthis = w.lock())
2518 982 : sthis->pimpl_->onMembersChanged_(memberUris);
2519 982 : });
2520 286 : }
2521 :
2522 : void
2523 286 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2524 : {
2525 572 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2526 : w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2527 609 : if (auto sthis = w.lock())
2528 609 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2529 1181 : };
2530 286 : }
2531 :
2532 : void
2533 598 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2534 : {
2535 598 : auto deviceId = channel->deviceId();
2536 : // Transmit avatar if necessary
2537 : // We do this here, because at this point we know both sides are connected and in
2538 : // the same conversation
2539 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2540 598 : auto cert = channel->peerCertificate();
2541 598 : if (!cert || !cert->issuer)
2542 0 : return;
2543 598 : auto member = cert->issuer->getId().toString();
2544 598 : pimpl_->swarmManager_->addChannel(std::move(channel));
2545 598 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2546 598 : auto sthis = w.lock();
2547 598 : if (auto account = a.lock()) {
2548 598 : account->sendProfile(sthis->id(), member, deviceId.toString());
2549 598 : }
2550 598 : });
2551 598 : }
2552 :
2553 : uint32_t
2554 0 : Conversation::countInteractions(const std::string& toId,
2555 : const std::string& fromId,
2556 : const std::string& authorUri) const
2557 : {
2558 0 : LogOptions options;
2559 0 : options.to = toId;
2560 0 : options.from = fromId;
2561 0 : options.authorUri = authorUri;
2562 0 : options.logIfNotFound = false;
2563 0 : options.fastLog = true;
2564 0 : History history;
2565 0 : std::lock_guard lk(history.mutex);
2566 0 : auto res = pimpl_->loadMessages2(options, &history);
2567 0 : return res.size();
2568 0 : }
2569 :
2570 : void
2571 0 : Conversation::search(uint32_t req,
2572 : const Filter& filter,
2573 : const std::shared_ptr<std::atomic_int>& flag) const
2574 : {
2575 : // Because logging a conversation can take quite some time,
2576 : // do it asynchronously
2577 0 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2578 0 : if (auto sthis = w.lock()) {
2579 0 : History history;
2580 0 : std::vector<std::map<std::string, std::string>> commits {};
2581 : // std::regex_constants::ECMAScript is the default flag.
2582 0 : auto re = std::regex(filter.regexSearch,
2583 0 : filter.caseSensitive ? std::regex_constants::ECMAScript
2584 0 : : std::regex_constants::icase);
2585 0 : sthis->pimpl_->repository_->log(
2586 0 : [&](const auto& id, const auto& author, auto& commit) {
2587 0 : if (!filter.author.empty()
2588 0 : && filter.author != sthis->uriFromDevice(author.email)) {
2589 : // Filter author
2590 0 : return CallbackResult::Skip;
2591 : }
2592 0 : auto commitTime = git_commit_time(commit.get());
2593 0 : if (filter.before && filter.before < commitTime) {
2594 : // Only get commits before this date
2595 0 : return CallbackResult::Skip;
2596 : }
2597 0 : if (filter.after && filter.after > commitTime) {
2598 : // Only get commits before this date
2599 0 : if (git_commit_parentcount(commit.get()) <= 1)
2600 0 : return CallbackResult::Break;
2601 : else
2602 0 : return CallbackResult::Skip; // Because we are sorting it with
2603 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2604 : }
2605 :
2606 0 : return CallbackResult::Ok; // Continue
2607 : },
2608 0 : [&](auto&& cc) {
2609 0 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2610 0 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2611 0 : },
2612 0 : [&](auto id, auto, auto) {
2613 0 : if (id == filter.lastId)
2614 0 : return true;
2615 0 : return false;
2616 : },
2617 : "",
2618 : false);
2619 : // Search on generated history
2620 0 : for (auto& message : history.messageList) {
2621 0 : auto contentType = message->type;
2622 0 : auto isSearchable = contentType == "text/plain"
2623 0 : || contentType == "application/data-transfer+json";
2624 0 : if (filter.type.empty() && !isSearchable) {
2625 : // Not searchable, at least for now
2626 0 : continue;
2627 0 : } else if (contentType == filter.type || filter.type.empty()) {
2628 0 : if (isSearchable) {
2629 : // If it's a text match the body, else the display name
2630 0 : auto body = contentType == "text/plain" ? message->body.at("body")
2631 0 : : message->body.at("displayName");
2632 0 : std::smatch body_match;
2633 0 : if (std::regex_search(body, body_match, re)) {
2634 0 : auto commit = message->body;
2635 0 : commit["id"] = message->id;
2636 0 : commit["type"] = message->type;
2637 0 : commits.emplace_back(commit);
2638 0 : }
2639 0 : } else {
2640 : // Matching type, just add it to the results
2641 0 : commits.emplace_back(message->body);
2642 : }
2643 :
2644 0 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2645 0 : break;
2646 : }
2647 0 : }
2648 :
2649 0 : if (commits.size() > 0)
2650 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2651 0 : sthis->pimpl_->accountId_,
2652 0 : sthis->id(),
2653 0 : std::move(commits));
2654 : // If we're the latest thread, inform client that the search is finished
2655 0 : if ((*flag)-- == 1 /* decrement return the old value */) {
2656 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2657 : req,
2658 0 : sthis->pimpl_->accountId_,
2659 0 : std::string {},
2660 0 : std::vector<std::map<std::string, std::string>> {});
2661 : }
2662 0 : }
2663 0 : });
2664 0 : }
2665 :
2666 : void
2667 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2668 : {
2669 14 : if (!message.isMember("confId")) {
2670 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2671 0 : return;
2672 : }
2673 :
2674 14 : auto now = std::chrono::system_clock::now();
2675 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2676 : {
2677 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2678 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2679 14 : pimpl_->saveHostedCalls();
2680 14 : }
2681 :
2682 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2683 : }
2684 :
2685 : bool
2686 20 : Conversation::isHosting(const std::string& confId) const
2687 : {
2688 20 : auto info = infos();
2689 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2690 0 : return true; // We are the current device Host
2691 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2692 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2693 20 : }
2694 :
2695 : void
2696 11 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2697 : {
2698 11 : if (!message.isMember("confId")) {
2699 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2700 0 : return;
2701 : }
2702 :
2703 11 : auto erased = false;
2704 : {
2705 11 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2706 11 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2707 11 : }
2708 11 : if (erased) {
2709 11 : pimpl_->saveHostedCalls();
2710 11 : sendMessage(std::move(message), "", {}, std::move(cb));
2711 : } else
2712 0 : cb(false, "");
2713 : }
2714 :
2715 : std::vector<std::map<std::string, std::string>>
2716 39 : Conversation::currentCalls() const
2717 : {
2718 39 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2719 78 : return pimpl_->activeCalls_;
2720 39 : }
2721 : } // namespace jami
|