Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/ring_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #ifdef ENABLE_PLUGIN
25 : #include "manager.h"
26 : #include "plugin/jamipluginmanager.h"
27 : #include "plugin/streamdata.h"
28 : #endif
29 : #include "jami/conversation_interface.h"
30 :
31 : #include "fileutils.h"
32 : #include "json_utils.h"
33 :
34 : #include <opendht/thread_pool.h>
35 : #include <fmt/compile.h>
36 :
37 : #include <charconv>
38 : #include <string_view>
39 : #include <tuple>
40 : #include <optional>
41 :
42 : namespace jami {
43 :
44 : static const char* const LAST_MODIFIED = "lastModified";
45 :
46 13 : ConvInfo::ConvInfo(const Json::Value& json)
47 : {
48 13 : id = json[ConversationMapKeys::ID].asString();
49 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
50 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
51 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
52 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
53 20 : members.emplace(v["uri"].asString());
54 : }
55 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
56 13 : }
57 :
58 : Json::Value
59 25 : ConvInfo::toJson() const
60 : {
61 25 : Json::Value json;
62 25 : json[ConversationMapKeys::ID] = id;
63 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
64 25 : if (removed) {
65 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
66 : }
67 25 : if (erased) {
68 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
69 : }
70 64 : for (const auto& m : members) {
71 39 : Json::Value member;
72 39 : member["uri"] = m;
73 39 : json[ConversationMapKeys::MEMBERS].append(member);
74 39 : }
75 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
76 25 : return json;
77 0 : }
78 :
79 : // ConversationRequest
80 262 : ConversationRequest::ConversationRequest(const Json::Value& json)
81 : {
82 262 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
83 262 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
84 262 : from = json[ConversationMapKeys::FROM].asString();
85 262 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
86 262 : auto& md = json[ConversationMapKeys::METADATAS];
87 527 : for (const auto& member : md.getMemberNames()) {
88 265 : metadatas.emplace(member, md[member].asString());
89 262 : }
90 262 : }
91 :
92 : Json::Value
93 2 : ConversationRequest::toJson() const
94 : {
95 2 : Json::Value json;
96 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
97 2 : json[ConversationMapKeys::FROM] = from;
98 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
99 2 : if (declined)
100 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
101 4 : for (const auto& [key, value] : metadatas) {
102 2 : json[ConversationMapKeys::METADATAS][key] = value;
103 : }
104 2 : return json;
105 0 : }
106 :
107 : std::map<std::string, std::string>
108 226 : ConversationRequest::toMap() const
109 : {
110 226 : auto result = metadatas;
111 226 : result[ConversationMapKeys::ID] = conversationId;
112 226 : result[ConversationMapKeys::FROM] = from;
113 226 : if (declined)
114 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
115 226 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
116 226 : return result;
117 0 : }
118 :
119 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
120 :
121 : struct History
122 : {
123 : // While loading the history, we need to avoid:
124 : // - reloading history (can just be ignored)
125 : // - adding new commits (should wait for history to be loaded)
126 : std::mutex mutex {};
127 : std::condition_variable cv {};
128 : bool loading {false};
129 : MessageList messageList {};
130 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
131 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
132 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
133 : };
134 :
135 : class Conversation::Impl
136 : {
137 : private:
138 394 : Impl(std::unique_ptr<ConversationRepository>&& repository,
139 : const std::shared_ptr<JamiAccount>& account,
140 : std::vector<ConversationCommit>&& commits = {})
141 394 : : repository_(std::move(repository ? repository : throw std::logic_error("Invalid repository")))
142 389 : , account_(account)
143 389 : , accountId_(account->getAccountID())
144 389 : , userId_(account->getUsername())
145 389 : , deviceId_(account->currentDeviceId())
146 389 : , swarmManager_(std::make_shared<SwarmManager>(NodeId(deviceId_),
147 778 : Manager::instance().getSeededRandomEngine(),
148 389 : [account = account_](const DeviceId& deviceId) {
149 2603 : if (auto acc = account.lock()) {
150 2603 : return acc->isConnectedWith(deviceId);
151 2603 : }
152 0 : return false;
153 : }))
154 389 : , transferManager_(std::make_shared<TransferManager>(accountId_,
155 : "",
156 389 : repository_->id(),
157 389 : Manager::instance().getSeededRandomEngine()))
158 389 : , repoPath_(fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id())
159 389 : , conversationDataPath_(fileutils::get_data_dir() / accountId_ / "conversation_data" / repository_->id())
160 389 : , fetchedPath_(conversationDataPath_ / ConversationDirectories::FETCHED)
161 389 : , sendingPath_(conversationDataPath_ / ConversationDirectories::SENDING)
162 389 : , preferencesPath_(conversationDataPath_ / ConversationDirectories::PREFERENCES)
163 389 : , statusPath_(conversationDataPath_ / ConversationDirectories::STATUS)
164 389 : , hostedCallsPath_(conversationDataPath_ / ConversationDirectories::HOSTED_CALLS)
165 389 : , activeCallsPath_(conversationDataPath_ / ConversationDirectories::ACTIVE_CALLS)
166 389 : , ioContext_(Manager::instance().ioContext())
167 389 : , fallbackTimer_(std::make_unique<asio::steady_timer>(*ioContext_))
168 2339 : , typers_(std::make_shared<Typers>(account, repository_->id()))
169 : {
170 389 : if (!commits.empty())
171 185 : initActiveCalls(repository_->convCommitsToMap(commits));
172 389 : loadStatus();
173 409 : }
174 :
175 190 : Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&& repoAndCommits,
176 : const std::shared_ptr<JamiAccount>& account)
177 190 : : Impl(std::move(repoAndCommits.first), account, std::move(repoAndCommits.second))
178 185 : {}
179 :
180 : public:
181 186 : Impl(const std::shared_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "")
182 186 : : Impl(ConversationRepository::createConversation(account, mode, otherMember), account)
183 186 : {}
184 :
185 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
186 18 : : Impl(std::make_unique<ConversationRepository>(account, conversationId), account)
187 18 : {}
188 :
189 191 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& remoteDevice, const std::string& conversationId)
190 191 : : Impl(ConversationRepository::cloneConversation(account, remoteDevice, conversationId), account)
191 185 : {}
192 :
193 3894 : std::string toString() const
194 : {
195 15576 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
196 : }
197 : mutable std::string fmtStr_;
198 :
199 389 : ~Impl()
200 5835 : {
201 : try {
202 389 : if (fallbackTimer_)
203 389 : fallbackTimer_->cancel();
204 0 : } catch (const std::exception& e) {
205 0 : JAMI_ERROR("{:s} {:s}", toString(), e.what());
206 0 : }
207 389 : }
208 :
209 : /**
210 : * If, for whatever reason, the daemon is stopped while hosting a conference,
211 : * we need to announce the end of this call when restarting.
212 : * To avoid to keep active calls forever.
213 : */
214 : std::vector<std::string> commitsEndedCalls();
215 : bool isAdmin() const;
216 :
217 282 : void announce(const std::string& commitId, bool commitFromSelf = false)
218 : {
219 282 : std::vector<std::string> vec;
220 282 : if (!commitId.empty())
221 280 : vec.emplace_back(commitId);
222 282 : announce(vec, commitFromSelf);
223 282 : }
224 :
225 295 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
226 : {
227 295 : std::vector<ConversationCommit> convcommits;
228 295 : convcommits.reserve(commits.size());
229 601 : for (const auto& cid : commits) {
230 306 : if (auto commit = repository_->getCommit(cid)) {
231 306 : convcommits.emplace_back(*commit);
232 306 : }
233 : }
234 295 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
235 295 : }
236 :
237 : /**
238 : * Initialize activeCalls_ from the list of commits in the repository
239 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
240 : */
241 185 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
242 : {
243 185 : std::unordered_set<std::string> invalidHostUris;
244 185 : std::unordered_set<std::string> invalidCallIds;
245 :
246 185 : std::lock_guard lk(activeCallsMtx_);
247 1161 : for (const auto& commit : commits) {
248 976 : if (commit.at("type") == "member") {
249 : // Each commit of type "member" has an "action" field whose value can be one
250 : // of the following: "add", "join", "remove", "ban", "unban"
251 : // In the case of "remove" and "ban", we need to add the member's URI to
252 : // invalidHostUris to ensure that any call they may have started in the past
253 : // is no longer considered active.
254 : // For the other actions, there's no harm in adding the member's URI anyway,
255 : // since it's not possible to start hosting a call before joining the swarm (or
256 : // before getting unbanned in the case of previously banned members).
257 770 : invalidHostUris.emplace(commit.at("uri"));
258 413 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
259 413 : && commit.find("device") != commit.end()) {
260 : // The commit indicates either the end or the beginning of a call
261 : // (depending on whether there is a "duration" field or not).
262 1 : auto convId = repository_->id();
263 2 : auto confId = commit.at("confId");
264 2 : auto uri = commit.at("uri");
265 2 : auto device = commit.at("device");
266 :
267 3 : if (commit.find("duration") == commit.end() && invalidCallIds.find(confId) == invalidCallIds.end()
268 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
269 1 : std::map<std::string, std::string> activeCall;
270 1 : activeCall["id"] = confId;
271 1 : activeCall["uri"] = uri;
272 1 : activeCall["device"] = device;
273 1 : activeCalls_.emplace_back(activeCall);
274 0 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
275 : convId,
276 : confId,
277 : device,
278 : uri);
279 1 : }
280 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
281 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
282 : // there are sometimes multiple commits indicating the beginning of the same call.)
283 1 : invalidCallIds.emplace(confId);
284 1 : }
285 : }
286 185 : saveActiveCalls();
287 185 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_, repository_->id(), activeCalls_);
288 185 : }
289 :
290 : /**
291 : * Update activeCalls_ via announced commits (in load or via new commits)
292 : * @param commit Commit to check
293 : * @param eraseOnly If we want to ignore added commits
294 : * @param emitSig If we want to emit to client
295 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
296 : */
297 78 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
298 : bool eraseOnly = false,
299 : bool emitSig = true) const
300 : {
301 78 : if (!repository_)
302 0 : return;
303 78 : if (commit.at("type") == "member") {
304 : // In this case, we need to check if we are not removing a hosting member or device
305 19 : std::lock_guard lk(activeCallsMtx_);
306 19 : auto it = activeCalls_.begin();
307 19 : auto updateActives = false;
308 20 : while (it != activeCalls_.end()) {
309 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
310 4 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left", it->at("id"), commit.at("uri"));
311 1 : it = activeCalls_.erase(it);
312 1 : updateActives = true;
313 : } else {
314 0 : ++it;
315 : }
316 : }
317 19 : if (updateActives) {
318 1 : saveActiveCalls();
319 1 : if (emitSig)
320 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
321 1 : repository_->id(),
322 1 : activeCalls_);
323 : }
324 19 : return;
325 19 : }
326 : // Else, it's a call information
327 174 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
328 174 : && commit.find("device") != commit.end()) {
329 56 : auto convId = repository_->id();
330 112 : auto confId = commit.at("confId");
331 112 : auto uri = commit.at("uri");
332 112 : auto device = commit.at("device");
333 56 : std::lock_guard lk(activeCallsMtx_);
334 56 : auto itActive = std::find_if(activeCalls_.begin(), activeCalls_.end(), [&](const auto& value) {
335 25 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
336 : });
337 56 : if (commit.find("duration") == commit.end()) {
338 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
339 124 : JAMI_DEBUG("swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
340 : convId,
341 : confId,
342 : device,
343 : uri);
344 155 : activeCalls_.emplace_back(std::map<std::string, std::string> {
345 : {"id", confId},
346 : {"uri", uri },
347 : {"device", device},
348 124 : });
349 31 : saveActiveCalls();
350 31 : if (emitSig)
351 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
352 31 : repository_->id(),
353 31 : activeCalls_);
354 : }
355 : } else {
356 25 : if (itActive != activeCalls_.end()) {
357 25 : itActive = activeCalls_.erase(itActive);
358 : // Unlikely, but we must ensure that no duplicate exists
359 25 : while (itActive != activeCalls_.end()) {
360 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
361 0 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
362 : });
363 0 : if (itActive != activeCalls_.end()) {
364 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
365 0 : itActive = activeCalls_.erase(itActive);
366 : }
367 : }
368 :
369 25 : if (eraseOnly) {
370 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
371 : "{:s}, account {:s}",
372 : convId,
373 : confId,
374 : device,
375 : uri);
376 : } else {
377 100 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
378 : convId,
379 : confId,
380 : device,
381 : uri);
382 : }
383 : }
384 25 : saveActiveCalls();
385 25 : if (emitSig)
386 25 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
387 25 : repository_->id(),
388 25 : activeCalls_);
389 : }
390 56 : }
391 : }
392 :
393 1201 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
394 : {
395 1201 : if (!repository_)
396 0 : return;
397 1201 : auto convId = repository_->id();
398 1201 : auto ok = !commits.empty();
399 3601 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
400 1201 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
401 1201 : if (ok) {
402 1199 : bool announceMember = false;
403 2451 : for (const auto& c : commits) {
404 : // Announce member events
405 1252 : if (c.at("type") == "member") {
406 930 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
407 930 : const auto& uri = c.at("uri");
408 930 : const auto& actionStr = c.at("action");
409 930 : auto action = -1;
410 930 : if (actionStr == "add")
411 442 : action = 0;
412 488 : else if (actionStr == "join")
413 468 : action = 1;
414 20 : else if (actionStr == "remove")
415 1 : action = 2;
416 19 : else if (actionStr == "ban")
417 18 : action = 3;
418 1 : else if (actionStr == "unban")
419 1 : action = 4;
420 930 : if (actionStr == "ban" || actionStr == "remove") {
421 : // In this case, a potential host was removed during a call.
422 19 : updateActiveCalls(c);
423 19 : typers_->removeTyper(uri);
424 : }
425 930 : if (action != -1) {
426 930 : announceMember = true;
427 930 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(accountId_,
428 : convId,
429 : uri,
430 : action);
431 : }
432 : }
433 322 : } else if (c.at("type") == "application/call-history+json") {
434 59 : updateActiveCalls(c);
435 : }
436 : #ifdef ENABLE_PLUGIN
437 1252 : auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager();
438 1252 : if (pluginChatManager.hasHandlers()) {
439 0 : auto cm = std::make_shared<JamiMessage>(accountId_, convId, c.at("author") != userId_, c, false);
440 0 : cm->isSwarm = true;
441 0 : pluginChatManager.publishMessage(std::move(cm));
442 0 : }
443 : #endif
444 : }
445 :
446 1199 : if (announceMember && onMembersChanged_) {
447 921 : onMembersChanged_(repository_->memberUris("", {}));
448 : }
449 : }
450 1201 : }
451 :
452 389 : void loadStatus()
453 : {
454 : try {
455 : // read file
456 776 : auto file = fileutils::loadFile(statusPath_);
457 : // load values
458 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
459 2 : std::lock_guard lk {messageStatusMtx_};
460 2 : oh.get().convert(messagesStatus_);
461 389 : } catch (const std::exception& e) {
462 387 : }
463 389 : }
464 1113 : void saveStatus()
465 : {
466 1113 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
467 1113 : msgpack::pack(file, messagesStatus_);
468 1113 : }
469 :
470 18 : void loadActiveCalls() const
471 : {
472 : try {
473 : // read file
474 29 : auto file = fileutils::loadFile(activeCallsPath_);
475 : // load values
476 7 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
477 7 : std::lock_guard lk {activeCallsMtx_};
478 7 : oh.get().convert(activeCalls_);
479 18 : } catch (const std::exception& e) {
480 11 : return;
481 11 : }
482 : }
483 :
484 260 : void saveActiveCalls() const
485 : {
486 260 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
487 260 : msgpack::pack(file, activeCalls_);
488 260 : }
489 :
490 18 : void loadHostedCalls() const
491 : {
492 : try {
493 : // read file
494 30 : auto file = fileutils::loadFile(hostedCallsPath_);
495 : // load values
496 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
497 6 : std::lock_guard lk {activeCallsMtx_};
498 6 : oh.get().convert(hostedCalls_);
499 18 : } catch (const std::exception& e) {
500 12 : return;
501 12 : }
502 : }
503 :
504 42 : void saveHostedCalls() const
505 : {
506 42 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
507 42 : msgpack::pack(file, hostedCalls_);
508 42 : }
509 :
510 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
511 :
512 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
513 : bool includeLeft,
514 : bool includeBanned) const;
515 :
516 6501 : std::string_view bannedType(const std::string& uri) const
517 : {
518 0 : auto crt = fmt::format("{}.crt", uri);
519 13002 : auto bannedMember = repoPath_ / MemberPath::BANNED / MemberPath::MEMBERS / crt;
520 6501 : if (std::filesystem::is_regular_file(bannedMember))
521 16 : return "members"sv;
522 12970 : auto bannedAdmin = repoPath_ / MemberPath::BANNED / MemberPath::ADMINS / crt;
523 6485 : if (std::filesystem::is_regular_file(bannedAdmin))
524 0 : return "admins"sv;
525 12970 : auto bannedInvited = repoPath_ / MemberPath::BANNED / MemberPath::INVITED / uri;
526 6485 : if (std::filesystem::is_regular_file(bannedInvited))
527 0 : return "invited"sv;
528 12970 : auto bannedDevice = repoPath_ / MemberPath::BANNED / MemberPath::DEVICES / crt;
529 6484 : if (std::filesystem::is_regular_file(bannedDevice))
530 0 : return "devices"sv;
531 6483 : return {};
532 6499 : }
533 :
534 4561 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
535 : {
536 4561 : auto deviceSockets = gitSocketList_.find(deviceId);
537 9121 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
538 : }
539 :
540 2043 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
541 : {
542 2043 : gitSocketList_[deviceId] = socket;
543 2043 : }
544 830 : void removeGitSocket(const DeviceId& deviceId)
545 : {
546 830 : auto deviceSockets = gitSocketList_.find(deviceId);
547 830 : if (deviceSockets != gitSocketList_.end())
548 444 : gitSocketList_.erase(deviceSockets);
549 830 : }
550 :
551 : /**
552 : * Remove all git sockets and all DRT nodes associated with the given peer.
553 : * This is used when a swarm member is banned to ensure that we stop syncing
554 : * with them or sending them message notifications.
555 : */
556 : void disconnectFromPeer(const std::string& peerUri);
557 :
558 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, bool includeLeft) const;
559 :
560 : std::mutex membersMtx_ {};
561 : std::set<std::string> checkedMembers_; // Store members we tried
562 : std::function<void()> bootstrapCb_;
563 : std::mutex bootstrapMtx_ {};
564 : #ifdef LIBJAMI_TEST
565 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
566 : #endif
567 :
568 : std::mutex writeMtx_ {};
569 : const std::unique_ptr<ConversationRepository> repository_;
570 : const std::weak_ptr<JamiAccount> account_;
571 : const std::string accountId_;
572 : const std::string userId_;
573 : const std::string deviceId_;
574 : const std::shared_ptr<SwarmManager> swarmManager_;
575 : std::atomic_bool isRemoving_ {false};
576 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
577 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options, History* optHistory = nullptr);
578 : void pull(const std::string& deviceId);
579 : std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
580 :
581 : // Avoid multiple fetch/merges at the same time.
582 : std::mutex pullcbsMtx_ {};
583 : // store current remote in fetch
584 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {};
585 : const std::shared_ptr<TransferManager> transferManager_ {};
586 : const std::filesystem::path repoPath_ {};
587 : const std::filesystem::path conversationDataPath_ {};
588 : const std::filesystem::path fetchedPath_ {};
589 :
590 : // Manage last message displayed and status
591 : const std::filesystem::path sendingPath_ {};
592 : const std::filesystem::path preferencesPath_ {};
593 : const std::filesystem::path statusPath_ {};
594 :
595 : OnMembersChanged onMembersChanged_ {};
596 :
597 : // Manage hosted calls on this device
598 : std::filesystem::path hostedCallsPath_ {};
599 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
600 : // Manage active calls for this conversation (can be hosted by other devices)
601 : std::filesystem::path activeCallsPath_ {};
602 : mutable std::mutex activeCallsMtx_ {};
603 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
604 :
605 : GitSocketList gitSocketList_ {};
606 :
607 : // Bootstrap
608 : const std::shared_ptr<asio::io_context> ioContext_;
609 : const std::unique_ptr<asio::steady_timer> fallbackTimer_;
610 :
611 : /**
612 : * Loaded history represents the linearized history to show for clients
613 : */
614 : History loadedHistory_ {};
615 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
616 : History& history,
617 : const std::vector<std::map<std::string, std::string>>& commits,
618 : bool messageReceived = false,
619 : bool commitFromSelf = false);
620 :
621 : void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
622 : void handleEdition(History& history,
623 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
624 : bool messageReceived) const;
625 : bool handleMessage(History& history,
626 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
627 : bool messageReceived) const;
628 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const;
629 : /**
630 : * {uri, {
631 : * {"fetch", "commitId"},
632 : * {"fetched_ts", "timestamp"},
633 : * {"read", "commitId"},
634 : * {"read_ts", "timestamp"}
635 : * }
636 : * }
637 : */
638 : mutable std::mutex messageStatusMtx_;
639 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
640 : std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
641 : /**
642 : * Status: 0 = commited, 1 = fetched, 2 = read
643 : * This cache the curent status to add in the messages
644 : */
645 : // Note: only store int32_t cause it's easy to pass to dbus this way
646 : // memberToStatus serves as a cache for loading messages
647 : std::map<std::string, int32_t> memberToStatus;
648 :
649 : // futureStatus is used to store the status for receiving messages
650 : // (because we're not sure to fetch the commit before receiving a status change for this)
651 : std::map<std::string, std::map<std::string, int32_t>> futureStatus;
652 : // Update internal structures regarding status
653 : void updateStatus(const std::string& uri,
654 : libjami::Account::MessageStates status,
655 : const std::string& commitId,
656 : const std::string& ts,
657 : bool emit = false);
658 :
659 : std::shared_ptr<Typers> typers_;
660 : };
661 :
662 : bool
663 17 : Conversation::Impl::isAdmin() const
664 : {
665 17 : auto adminsPath = repoPath_ / MemberPath::ADMINS;
666 34 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
667 17 : }
668 :
669 : void
670 17 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
671 : {
672 : // Remove nodes from swarmManager
673 17 : const auto nodes = swarmManager_->getAllNodes();
674 17 : std::vector<NodeId> toRemove;
675 39 : for (const auto node : nodes)
676 22 : if (peerUri == repository_->uriFromDevice(node.toString()))
677 11 : toRemove.emplace_back(node);
678 17 : swarmManager_->deleteNode(toRemove);
679 :
680 : // Remove git sockets with this member
681 39 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
682 22 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
683 11 : it = gitSocketList_.erase(it);
684 : else
685 11 : ++it;
686 : }
687 17 : }
688 :
689 : std::vector<std::map<std::string, std::string>>
690 419 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
691 : {
692 419 : std::vector<std::map<std::string, std::string>> result;
693 419 : auto members = repository_->members();
694 419 : std::lock_guard lk(messageStatusMtx_);
695 1165 : for (const auto& member : members) {
696 746 : if (member.role == MemberRole::BANNED && !includeBanned) {
697 169 : continue;
698 : }
699 746 : if (member.role == MemberRole::INVITED && !includeInvited)
700 169 : continue;
701 577 : if (member.role == MemberRole::LEFT && !includeLeft)
702 0 : continue;
703 577 : auto mm = member.map();
704 577 : auto it = messagesStatus_.find(member.uri);
705 577 : if (it != messagesStatus_.end()) {
706 210 : auto readIt = it->second.find("read");
707 210 : if (readIt != it->second.end())
708 148 : mm[ConversationMapKeys::LAST_DISPLAYED] = readIt->second;
709 : }
710 577 : result.emplace_back(std::move(mm));
711 577 : }
712 838 : return result;
713 419 : }
714 :
715 : std::vector<std::string>
716 18 : Conversation::Impl::commitsEndedCalls()
717 : {
718 : // Handle current calls
719 18 : std::vector<std::string> commits {};
720 18 : std::unique_lock lk(writeMtx_);
721 18 : std::unique_lock lkA(activeCallsMtx_);
722 18 : for (const auto& hostedCall : hostedCalls_) {
723 : // In this case, this means that we left
724 : // the conference while still hosting it, so activeCalls
725 : // will not be correctly updated
726 : // We don't need to send notifications there, as peers will sync with presence
727 0 : Json::Value value;
728 0 : value["uri"] = userId_;
729 0 : value["device"] = deviceId_;
730 0 : value["confId"] = hostedCall.first;
731 0 : value["type"] = "application/call-history+json";
732 0 : auto now = std::chrono::system_clock::now();
733 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
734 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
735 0 : auto itActive = std::find_if(activeCalls_.begin(),
736 : activeCalls_.end(),
737 0 : [this, confId = hostedCall.first](const auto& value) {
738 0 : return value.at("id") == confId && value.at("uri") == userId_
739 0 : && value.at("device") == deviceId_;
740 : });
741 0 : if (itActive != activeCalls_.end())
742 0 : activeCalls_.erase(itActive);
743 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
744 :
745 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
746 0 : }
747 18 : hostedCalls_.clear();
748 18 : saveActiveCalls();
749 18 : saveHostedCalls();
750 36 : return commits;
751 18 : }
752 :
753 : std::vector<std::map<std::string, std::string>>
754 15 : Conversation::Impl::loadMessages(const LogOptions& options)
755 : {
756 15 : if (!repository_)
757 0 : return {};
758 15 : std::vector<ConversationCommit> commits;
759 15 : auto startLogging = options.from == "";
760 15 : auto breakLogging = false;
761 30 : repository_->log(
762 64 : [&](const auto& id, const auto& author, const auto& commit) {
763 64 : if (!commits.empty()) {
764 : // Set linearized parent
765 49 : commits.rbegin()->linearized_parent = id;
766 : }
767 64 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
768 0 : return CallbackResult::Skip;
769 : }
770 64 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
771 0 : return CallbackResult::Break; // Stop logging
772 64 : if (breakLogging)
773 0 : return CallbackResult::Break; // Stop logging
774 64 : if (id == options.to) {
775 15 : if (options.includeTo)
776 0 : breakLogging = true; // For the next commit
777 : else
778 15 : return CallbackResult::Break; // Stop logging
779 : }
780 :
781 49 : if (!startLogging && options.from != "" && options.from == id)
782 0 : startLogging = true;
783 49 : if (!startLogging)
784 0 : return CallbackResult::Skip; // Start logging after this one
785 :
786 49 : if (options.fastLog) {
787 0 : if (options.authorUri != "") {
788 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
789 0 : return CallbackResult::Break; // Found author, stop
790 : }
791 : }
792 : // Used to only count commit
793 0 : commits.emplace(commits.end(), ConversationCommit {});
794 0 : return CallbackResult::Skip;
795 : }
796 :
797 49 : return CallbackResult::Ok; // Continue
798 : },
799 49 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
800 49 : [](auto, auto, auto) { return false; },
801 15 : options.from,
802 15 : options.logIfNotFound);
803 15 : return repository_->convCommitsToMap(commits);
804 15 : }
805 :
806 : std::vector<libjami::SwarmMessage>
807 1523 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
808 : {
809 1523 : auto history = optHistory ? optHistory : &loadedHistory_;
810 :
811 : // history->mutex is locked by the caller
812 1523 : if (!repository_ || history->loading) {
813 0 : return {};
814 : }
815 1523 : history->loading = true;
816 :
817 : // By convention, if options.nbOfCommits is zero, then we
818 : // don't impose a limit on the number of commits returned.
819 1523 : bool limitNbOfCommits = options.nbOfCommits > 0;
820 :
821 1523 : auto startLogging = options.from == "";
822 1523 : auto breakLogging = false;
823 1523 : auto currentHistorySize = loadedHistory_.messageList.size();
824 1523 : std::vector<std::string> replies;
825 1523 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
826 3046 : repository_->log(
827 : /* preCondition */
828 8401 : [&](const auto& id, const auto& author, const auto& commit) {
829 8401 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
830 1 : return CallbackResult::Skip;
831 : }
832 8402 : if (id == options.to) {
833 700 : if (options.includeTo)
834 0 : breakLogging = true; // For the next commit
835 : }
836 8393 : if (replies.empty()) { // This avoid load until
837 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
838 : // until this commit
839 16802 : if ((limitNbOfCommits
840 8401 : && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
841 5 : return CallbackResult::Break; // Stop logging
842 8396 : if (breakLogging)
843 0 : return CallbackResult::Break; // Stop logging
844 8396 : if (id == options.to && !options.includeTo) {
845 699 : return CallbackResult::Break; // Stop logging
846 : }
847 : }
848 :
849 7701 : if (!startLogging && options.from != "" && options.from == id)
850 1107 : startLogging = true;
851 7705 : if (!startLogging)
852 24 : return CallbackResult::Skip; // Start logging after this one
853 :
854 7681 : if (options.fastLog) {
855 6998 : if (options.authorUri != "") {
856 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
857 1 : return CallbackResult::Break; // Found author, stop
858 : }
859 : }
860 : }
861 :
862 7674 : return CallbackResult::Ok; // Continue
863 : },
864 : /* emplaceCb */
865 7679 : [&](auto&& cc) {
866 7679 : if (limitNbOfCommits && (msgList.size() == options.nbOfCommits))
867 274 : return;
868 7406 : auto optMessage = repository_->convCommitToMap(cc);
869 7408 : if (!optMessage.has_value())
870 1 : return;
871 7402 : auto message = optMessage.value();
872 7404 : if (message.find("reply-to") != message.end()) {
873 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
874 1 : if (it == replies.end()) {
875 1 : replies.emplace_back(message.at("reply-to"));
876 : }
877 : }
878 7405 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
879 7406 : if (it != replies.end()) {
880 1 : replies.erase(it);
881 : }
882 7404 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
883 7404 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
884 0 : firstMsg = *loadedHistory_.messageList.rbegin();
885 : }
886 22212 : auto added = addToHistory(*history, {message}, false, false);
887 7402 : if (!added.empty() && firstMsg) {
888 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg);
889 : }
890 7403 : msgList.insert(msgList.end(), added.begin(), added.end());
891 7398 : },
892 : /* postCondition */
893 7678 : [&](auto, auto, auto) {
894 : // Stop logging if there was a limit set on the number of commits
895 : // to return and we reached it. This isn't strictly necessary since
896 : // the check at the beginning of `emplaceCb` ensures that we won't
897 : // return too many messages, but it prevents us from needlessly
898 : // iterating over a (potentially) large number of commits.
899 7678 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
900 : },
901 1523 : options.from,
902 1523 : options.logIfNotFound);
903 :
904 1523 : history->loading = false;
905 1523 : history->cv.notify_all();
906 :
907 : // Convert for client (remove ptr)
908 1523 : std::vector<libjami::SwarmMessage> ret;
909 1523 : ret.reserve(msgList.size());
910 8922 : for (const auto& msg : msgList) {
911 7399 : ret.emplace_back(*msg);
912 : }
913 1524 : return ret;
914 1523 : }
915 :
916 : void
917 3 : Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
918 : {
919 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
920 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
921 3 : if (peditIt != history.pendingEditions.end()) {
922 0 : auto oldBody = sharedCommit->body;
923 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
924 0 : if (sharedCommit->body.at("body").empty())
925 0 : return;
926 0 : history.pendingEditions.erase(peditIt);
927 0 : }
928 3 : if (it != history.quickAccess.end()) {
929 3 : it->second->reactions.emplace_back(sharedCommit->body);
930 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
931 3 : repository_->id(),
932 3 : it->second->id,
933 3 : sharedCommit->body);
934 : } else {
935 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
936 : }
937 : }
938 :
939 : void
940 8 : Conversation::Impl::handleEdition(History& history,
941 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
942 : bool messageReceived) const
943 : {
944 16 : auto editId = sharedCommit->body.at("edit");
945 8 : auto it = history.quickAccess.find(editId);
946 8 : if (it != history.quickAccess.end()) {
947 6 : auto baseCommit = it->second;
948 6 : if (baseCommit) {
949 6 : auto itReact = baseCommit->body.find("react-to");
950 6 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ? "tid" : "body";
951 6 : auto body = sharedCommit->body.at(toReplace);
952 : // Edit reaction
953 6 : if (itReact != baseCommit->body.end()) {
954 1 : baseCommit->body[toReplace] = body; // Replace body if pending
955 1 : it = history.quickAccess.find(itReact->second);
956 1 : auto itPending = history.pendingReactions.find(itReact->second);
957 1 : if (it != history.quickAccess.end()) {
958 1 : baseCommit = it->second; // Base commit
959 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
960 1 : baseCommit->reactions.end(),
961 1 : [&](const auto& reaction) {
962 1 : return reaction.at("id") == editId;
963 : });
964 1 : if (itPreviousReact != baseCommit->reactions.end()) {
965 1 : (*itPreviousReact)[toReplace] = body;
966 1 : if (body.empty()) {
967 1 : baseCommit->reactions.erase(itPreviousReact);
968 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
969 1 : repository_->id(),
970 1 : baseCommit->id,
971 : editId);
972 : }
973 : }
974 0 : } else if (itPending != history.pendingReactions.end()) {
975 : // Else edit if pending
976 0 : auto itReaction = std::find_if(itPending->second.begin(),
977 0 : itPending->second.end(),
978 0 : [&](const auto& reaction) { return reaction.at("id") == editId; });
979 0 : if (itReaction != itPending->second.end()) {
980 0 : (*itReaction)[toReplace] = body;
981 0 : if (body.empty())
982 0 : itPending->second.erase(itReaction);
983 : }
984 : } else {
985 : // Add to pending edtions
986 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
987 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
988 : }
989 : } else {
990 : // Normal message
991 5 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
992 5 : it->second->body[toReplace] = sharedCommit->body[toReplace];
993 5 : if (toReplace == "tid") {
994 : // Avoid to replace fileId in client
995 1 : it->second->body["fileId"] = "";
996 : }
997 : // Remove reactions
998 5 : if (sharedCommit->body.at(toReplace).empty())
999 2 : it->second->reactions.clear();
1000 5 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1001 : }
1002 6 : }
1003 6 : } else {
1004 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1005 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1006 : }
1007 8 : }
1008 :
1009 : bool
1010 8630 : Conversation::Impl::handleMessage(History& history,
1011 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1012 : bool messageReceived) const
1013 : {
1014 8630 : if (messageReceived) {
1015 : // For a received message, we place it at the beginning of the list
1016 1217 : if (!history.messageList.empty())
1017 972 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1018 1217 : history.messageList.emplace_front(sharedCommit);
1019 : } else {
1020 : // For a loaded message, we load from newest to oldest
1021 : // So we change the parent of the last message.
1022 7413 : if (!history.messageList.empty())
1023 5898 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1024 7412 : history.messageList.emplace_back(sharedCommit);
1025 : }
1026 : // Handle pending reactions/editions
1027 8630 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1028 8615 : if (reactIt != history.pendingReactions.end()) {
1029 0 : for (const auto& commitBody : reactIt->second)
1030 0 : sharedCommit->reactions.emplace_back(commitBody);
1031 0 : history.pendingReactions.erase(reactIt);
1032 : }
1033 8616 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1034 8622 : if (peditIt != history.pendingEditions.end()) {
1035 0 : auto oldBody = sharedCommit->body;
1036 0 : if (sharedCommit->type == "application/data-transfer+json") {
1037 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1038 0 : sharedCommit->body["fileId"] = "";
1039 : } else {
1040 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1041 : }
1042 0 : peditIt->second.pop_front();
1043 0 : for (const auto& commit : peditIt->second) {
1044 0 : sharedCommit->editions.emplace_back(commit->body);
1045 : }
1046 0 : sharedCommit->editions.emplace_back(oldBody);
1047 0 : history.pendingEditions.erase(peditIt);
1048 0 : }
1049 : // Announce to client
1050 8624 : if (messageReceived)
1051 1217 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
1052 8627 : return !messageReceived;
1053 : }
1054 :
1055 : void
1056 8641 : Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const
1057 : {
1058 8641 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1059 8641 : auto currentMessage = message;
1060 :
1061 20670 : while (parentIt != history.quickAccess.end()) {
1062 12028 : const auto& parent = parentIt->second;
1063 13355 : for (const auto& [peer, value] : message->status) {
1064 12257 : auto parentStatusIt = parent->status.find(peer);
1065 12257 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1066 1328 : parent->status[peer] = value;
1067 2656 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
1068 1328 : repository_->id(),
1069 : peer,
1070 1328 : parent->id,
1071 : value);
1072 10929 : } else if (parentStatusIt->second >= value) {
1073 10929 : break;
1074 : }
1075 : }
1076 12028 : currentMessage = parent;
1077 12028 : parentIt = history.quickAccess.find(parent->linearizedParent);
1078 : }
1079 8640 : }
1080 :
1081 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1082 8627 : Conversation::Impl::addToHistory(History& history,
1083 : const std::vector<std::map<std::string, std::string>>& commits,
1084 : bool messageReceived,
1085 : bool commitFromSelf)
1086 : {
1087 : //
1088 : // NOTE: This function makes the following assumptions on its arguments:
1089 : // - The messages in "history" are in reverse chronological order (newest message
1090 : // first, oldest message last).
1091 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1092 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1093 : // They are therefore inserted at the beginning of the message list.
1094 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1095 : // reverse chronological order (newest to oldest) and to be older than the ones in
1096 : // "history". They are therefore appended at the end of the message list.
1097 : //
1098 8627 : auto acc = account_.lock();
1099 8626 : if (!acc)
1100 0 : return {};
1101 8626 : auto username = acc->getUsername();
1102 8625 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1103 0 : std::unique_lock lk(history.mutex);
1104 0 : history.cv.wait(lk, [&] { return !history.loading; });
1105 0 : }
1106 :
1107 : // Only set messages' status on history for client
1108 8625 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1109 :
1110 8625 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1111 17298 : for (const auto& commit : commits) {
1112 17352 : auto commitId = commit.at("id");
1113 8675 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1114 6 : continue; // Already present
1115 8666 : auto typeIt = commit.find("type");
1116 : // Nothing to show for the client, skip
1117 8663 : if (typeIt != commit.end() && typeIt->second == "merge")
1118 26 : continue;
1119 :
1120 8640 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1121 8641 : sharedCommit->fromMapStringString(commit);
1122 :
1123 8643 : if (needToSetMessageStatus) {
1124 928 : std::lock_guard lk(messageStatusMtx_);
1125 : // Check if we already have status information for the commit.
1126 928 : auto itFuture = futureStatus.find(sharedCommit->id);
1127 928 : if (itFuture != futureStatus.end()) {
1128 16 : sharedCommit->status = std::move(itFuture->second);
1129 16 : futureStatus.erase(itFuture);
1130 : }
1131 928 : }
1132 8643 : sharedCommits.emplace_back(sharedCommit);
1133 8671 : }
1134 :
1135 8627 : if (needToSetMessageStatus) {
1136 914 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1137 914 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1138 914 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1139 :
1140 914 : std::lock_guard lk(messageStatusMtx_);
1141 13500 : for (const auto& member : repository_->members()) {
1142 : // For each member, we iterate over the commits to add in reverse chronological
1143 : // order (i.e. from newest to oldest) and set their status from the point of view
1144 : // of that member (as best we can given the information we have).
1145 : //
1146 : // The key assumption made in order to compute the status is that it can never decrease
1147 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1148 : // therefore start by setting the "status" variable below to the lowest possible value,
1149 : // and increase it when we encounter a commit for which it is justified to do so.
1150 : //
1151 : // If messageReceived is true, then the commits we are adding are the most recent in the
1152 : // conversation history, so the lowest possible value is SENDING.
1153 : //
1154 : // If messageReceived is false, then the commits we are adding are older than the ones
1155 : // that are already in the history, so the lowest possible value is the status of the
1156 : // oldest message in the history so far, which is stored in memberToStatus.
1157 12586 : auto status = SENDING;
1158 12586 : if (!messageReceived) {
1159 12 : auto cache = memberToStatus[member.uri];
1160 12 : if (cache > status)
1161 9 : status = cache;
1162 : }
1163 12586 : auto& messagesStatus = messagesStatus_[member.uri];
1164 :
1165 25205 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1166 12622 : auto sharedCommit = *it;
1167 12624 : auto previousStatus = status;
1168 12624 : auto& commitStatus = sharedCommit->status[member.uri];
1169 :
1170 : // Compute status for the current commit.
1171 12620 : if (status < SENT && messagesStatus["fetched"] == sharedCommit->id) {
1172 13 : status = SENT;
1173 : }
1174 12623 : if (messagesStatus["read"] == sharedCommit->id) {
1175 1 : status = DISPLAYED;
1176 : }
1177 12622 : if (member.uri == sharedCommit->body.at("author")) {
1178 927 : status = DISPLAYED;
1179 : }
1180 12622 : if (status < commitStatus) {
1181 1 : status = commitStatus;
1182 : }
1183 :
1184 : // Store computed value.
1185 12622 : commitStatus = status;
1186 :
1187 : // Update messagesStatus_ if needed.
1188 12622 : if (previousStatus == SENDING && status >= SENT) {
1189 926 : messagesStatus["fetched"] = sharedCommit->id;
1190 : }
1191 12622 : if (previousStatus <= SENT && status == DISPLAYED) {
1192 913 : messagesStatus["read"] = sharedCommit->id;
1193 : }
1194 12622 : }
1195 :
1196 12586 : if (!messageReceived) {
1197 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1198 12 : memberToStatus[member.uri] = status;
1199 : }
1200 914 : }
1201 914 : }
1202 :
1203 8627 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1204 17259 : for (const auto& sharedCommit : sharedCommits) {
1205 8632 : history.quickAccess[sharedCommit->id] = sharedCommit;
1206 :
1207 8641 : auto reactToIt = sharedCommit->body.find("react-to");
1208 8641 : auto editIt = sharedCommit->body.find("edit");
1209 8639 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1210 3 : handleReaction(history, sharedCommit);
1211 8633 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1212 8 : handleEdition(history, sharedCommit, messageReceived);
1213 8625 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1214 7407 : messages.emplace_back(sharedCommit);
1215 : }
1216 8638 : rectifyStatus(sharedCommit, history);
1217 : }
1218 :
1219 8615 : return messages;
1220 8617 : }
1221 :
1222 186 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1223 : ConversationMode mode,
1224 186 : const std::string& otherMember)
1225 186 : : pimpl_ {new Impl {account, mode, otherMember}}
1226 186 : {}
1227 :
1228 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
1229 18 : : pimpl_ {new Impl {account, conversationId}}
1230 18 : {}
1231 :
1232 191 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1233 : const std::string& remoteDevice,
1234 191 : const std::string& conversationId)
1235 191 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1236 191 : {}
1237 :
1238 389 : Conversation::~Conversation() {}
1239 :
1240 : std::string
1241 4743 : Conversation::id() const
1242 : {
1243 4743 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1244 : }
1245 :
1246 : void
1247 138 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1248 : {
1249 : try {
1250 138 : if (mode() == ConversationMode::ONE_TO_ONE) {
1251 : // Only authorize to add left members
1252 1 : auto initialMembers = getInitialMembers();
1253 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1254 1 : if (it == initialMembers.end()) {
1255 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1256 1 : cb(false, "");
1257 1 : return;
1258 : }
1259 1 : }
1260 0 : } catch (const std::exception& e) {
1261 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1262 0 : cb(false, "");
1263 0 : return;
1264 0 : }
1265 137 : if (isMember(contactUri, true)) {
1266 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1267 0 : cb(false, "");
1268 0 : return;
1269 : }
1270 137 : if (isBanned(contactUri)) {
1271 2 : if (pimpl_->isAdmin()) {
1272 1 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1273 1 : if (auto sthis = w.lock()) {
1274 1 : auto members = sthis->pimpl_->repository_->members();
1275 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1276 1 : if (type.empty()) {
1277 0 : cb(false, {});
1278 0 : return;
1279 : }
1280 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1281 2 : }
1282 : });
1283 : } else {
1284 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1285 1 : cb(false, "");
1286 : }
1287 2 : return;
1288 : }
1289 :
1290 135 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1291 135 : if (auto sthis = w.lock()) {
1292 : // Add member files and commit
1293 135 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1294 135 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1295 135 : sthis->pimpl_->announce(commit, true);
1296 135 : lk.unlock();
1297 135 : if (cb)
1298 135 : cb(!commit.empty(), commit);
1299 270 : }
1300 135 : });
1301 : }
1302 :
1303 : std::shared_ptr<dhtnet::ChannelSocket>
1304 4561 : Conversation::gitSocket(const DeviceId& deviceId) const
1305 : {
1306 4561 : return pimpl_->gitSocket(deviceId);
1307 : }
1308 :
1309 : void
1310 2043 : Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1311 : {
1312 2043 : pimpl_->addGitSocket(deviceId, socket);
1313 2043 : }
1314 :
1315 : void
1316 830 : Conversation::removeGitSocket(const DeviceId& deviceId)
1317 : {
1318 830 : pimpl_->removeGitSocket(deviceId);
1319 830 : }
1320 :
1321 : void
1322 380 : Conversation::shutdownConnections()
1323 : {
1324 380 : pimpl_->fallbackTimer_->cancel();
1325 380 : pimpl_->gitSocketList_.clear();
1326 380 : if (pimpl_->swarmManager_)
1327 380 : pimpl_->swarmManager_->shutdown();
1328 380 : std::lock_guard lk(pimpl_->membersMtx_);
1329 380 : pimpl_->checkedMembers_.clear();
1330 380 : }
1331 :
1332 : void
1333 0 : Conversation::connectivityChanged()
1334 : {
1335 0 : if (pimpl_->swarmManager_)
1336 0 : pimpl_->swarmManager_->maintainBuckets();
1337 0 : }
1338 :
1339 : std::vector<jami::DeviceId>
1340 0 : Conversation::getDeviceIdList() const
1341 : {
1342 0 : return pimpl_->swarmManager_->getAllNodes();
1343 : }
1344 :
1345 : std::shared_ptr<Typers>
1346 9 : Conversation::typers() const
1347 : {
1348 9 : return pimpl_->typers_;
1349 : }
1350 :
1351 : bool
1352 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1353 : {
1354 0 : if (!pimpl_->swarmManager_)
1355 0 : return false;
1356 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1357 : }
1358 :
1359 : void
1360 1 : Conversation::Impl::voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb)
1361 : {
1362 : // Check if admin
1363 1 : if (!isAdmin()) {
1364 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1365 0 : cb(false, {});
1366 0 : return;
1367 : }
1368 :
1369 : // Vote for removal
1370 1 : std::unique_lock lk(writeMtx_);
1371 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1372 1 : if (voteCommit.empty()) {
1373 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1374 0 : cb(false, "");
1375 0 : return;
1376 : }
1377 :
1378 1 : auto lastId = voteCommit;
1379 1 : std::vector<std::string> commits;
1380 1 : commits.emplace_back(voteCommit);
1381 :
1382 : // If admin, check vote
1383 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1384 1 : if (!resolveCommit.empty()) {
1385 1 : commits.emplace_back(resolveCommit);
1386 1 : lastId = resolveCommit;
1387 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1388 : }
1389 1 : announce(commits, true);
1390 1 : lk.unlock();
1391 1 : if (cb)
1392 1 : cb(!lastId.empty(), lastId);
1393 1 : }
1394 :
1395 : void
1396 14 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1397 : {
1398 42 : dht::ThreadPool::io().run(
1399 28 : [w = weak(), contactUri = std::move(contactUri), isDevice = std::move(isDevice), cb = std::move(cb)] {
1400 14 : if (auto sthis = w.lock()) {
1401 : // Check if admin
1402 14 : if (!sthis->pimpl_->isAdmin()) {
1403 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1404 1 : cb(false, {});
1405 2 : return;
1406 : }
1407 :
1408 : // Get current user type
1409 13 : std::string type;
1410 13 : if (isDevice) {
1411 0 : type = "devices";
1412 : } else {
1413 13 : auto members = sthis->pimpl_->repository_->members();
1414 29 : for (const auto& member : members) {
1415 29 : if (member.uri == contactUri) {
1416 13 : if (member.role == MemberRole::INVITED) {
1417 2 : type = "invited";
1418 11 : } else if (member.role == MemberRole::ADMIN) {
1419 1 : type = "admins";
1420 10 : } else if (member.role == MemberRole::MEMBER) {
1421 10 : type = "members";
1422 : }
1423 13 : break;
1424 : }
1425 : }
1426 13 : if (type.empty()) {
1427 0 : cb(false, {});
1428 0 : return;
1429 : }
1430 13 : }
1431 :
1432 : // Vote for removal
1433 13 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1434 13 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1435 13 : if (voteCommit.empty()) {
1436 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1437 1 : cb(false, "");
1438 1 : return;
1439 : }
1440 :
1441 12 : auto lastId = voteCommit;
1442 12 : std::vector<std::string> commits;
1443 12 : commits.emplace_back(voteCommit);
1444 :
1445 : // If admin, check vote
1446 24 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1447 12 : if (!resolveCommit.empty()) {
1448 12 : commits.emplace_back(resolveCommit);
1449 12 : lastId = resolveCommit;
1450 12 : JAMI_WARN("Vote solved for %s. %s banned", contactUri.c_str(), isDevice ? "Device" : "Member");
1451 12 : sthis->pimpl_->disconnectFromPeer(contactUri);
1452 : }
1453 :
1454 12 : sthis->pimpl_->announce(commits, true);
1455 12 : lk.unlock();
1456 12 : cb(!lastId.empty(), lastId);
1457 29 : }
1458 : });
1459 14 : }
1460 :
1461 : std::vector<std::map<std::string, std::string>>
1462 419 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1463 : {
1464 419 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1465 : }
1466 :
1467 : std::set<std::string>
1468 1996 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1469 : {
1470 1996 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1471 : }
1472 :
1473 : std::vector<NodeId>
1474 1720 : Conversation::peersToSyncWith() const
1475 : {
1476 1720 : auto s = pimpl_->swarmManager_->getConnectedNodes();
1477 12851 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1478 11130 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1479 5104 : s.emplace_back(deviceId);
1480 1721 : return s;
1481 0 : }
1482 :
1483 : bool
1484 1720 : Conversation::isBootstrapped() const
1485 : {
1486 1720 : return pimpl_->swarmManager_->isConnected();
1487 : }
1488 :
1489 : std::string
1490 13104 : Conversation::uriFromDevice(const std::string& deviceId) const
1491 : {
1492 13104 : return pimpl_->repository_->uriFromDevice(deviceId);
1493 : }
1494 :
1495 : void
1496 0 : Conversation::monitor()
1497 : {
1498 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1499 0 : }
1500 :
1501 : std::string
1502 185 : Conversation::join()
1503 : {
1504 185 : return pimpl_->repository_->join();
1505 : }
1506 :
1507 : bool
1508 3326 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1509 : {
1510 3326 : auto uriCrt = uri + ".crt"sv;
1511 6652 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::ADMINS / uriCrt)
1512 6652 : || std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::MEMBERS / uriCrt)) {
1513 2213 : return true;
1514 : }
1515 1113 : if (includeInvited) {
1516 894 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::INVITED / uri)) {
1517 616 : return true;
1518 : }
1519 278 : if (mode() == ConversationMode::ONE_TO_ONE) {
1520 3 : for (const auto& member : getInitialMembers()) {
1521 2 : if (member == uri)
1522 0 : return true;
1523 1 : }
1524 : }
1525 : }
1526 497 : return false;
1527 3326 : }
1528 :
1529 : bool
1530 6500 : Conversation::isBanned(const std::string& uri) const
1531 : {
1532 6500 : return !pimpl_->bannedType(uri).empty();
1533 : }
1534 :
1535 : void
1536 0 : Conversation::sendMessage(
1537 : std::string&& message, const std::string& type, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1538 : {
1539 0 : Json::Value json;
1540 0 : json["body"] = std::move(message);
1541 0 : json["type"] = type;
1542 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1543 0 : }
1544 :
1545 : void
1546 139 : Conversation::sendMessage(Json::Value&& value, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1547 : {
1548 139 : if (!replyTo.empty()) {
1549 2 : auto commit = pimpl_->repository_->getCommit(replyTo);
1550 2 : if (commit == std::nullopt) {
1551 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1552 1 : return;
1553 : }
1554 1 : value["reply-to"] = replyTo;
1555 2 : }
1556 552 : dht::ThreadPool::io().run(
1557 414 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1558 138 : if (auto sthis = w.lock()) {
1559 138 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1560 138 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1561 138 : lk.unlock();
1562 138 : if (onCommit)
1563 13 : onCommit(commit);
1564 138 : sthis->pimpl_->announce(commit, true);
1565 138 : if (cb)
1566 138 : cb(!commit.empty(), commit);
1567 276 : }
1568 138 : });
1569 : }
1570 :
1571 : void
1572 0 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1573 : {
1574 0 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1575 0 : if (auto sthis = w.lock()) {
1576 0 : std::vector<std::string> commits;
1577 0 : commits.reserve(messages.size());
1578 0 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1579 0 : for (const auto& message : messages) {
1580 0 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1581 0 : commits.emplace_back(std::move(commit));
1582 0 : }
1583 0 : lk.unlock();
1584 0 : sthis->pimpl_->announce(commits, true);
1585 0 : if (cb)
1586 0 : cb(commits);
1587 0 : }
1588 0 : });
1589 0 : }
1590 :
1591 : std::optional<std::map<std::string, std::string>>
1592 11953 : Conversation::getCommit(const std::string& commitId) const
1593 : {
1594 11953 : auto commit = pimpl_->repository_->getCommit(commitId);
1595 11951 : if (commit == std::nullopt)
1596 1674 : return std::nullopt;
1597 10277 : return pimpl_->repository_->convCommitToMap(*commit);
1598 11953 : }
1599 :
1600 : void
1601 0 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1602 : {
1603 0 : if (!cb)
1604 0 : return;
1605 0 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1606 0 : if (auto sthis = w.lock()) {
1607 0 : cb(sthis->pimpl_->loadMessages(options));
1608 0 : }
1609 0 : });
1610 : }
1611 :
1612 : void
1613 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1614 : {
1615 2 : if (!cb)
1616 0 : return;
1617 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1618 2 : if (auto sthis = w.lock()) {
1619 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1620 2 : auto result = sthis->pimpl_->loadMessages2(options);
1621 2 : lk.unlock();
1622 2 : cb(std::move(result));
1623 4 : }
1624 2 : });
1625 : }
1626 :
1627 : void
1628 0 : Conversation::clearCache()
1629 : {
1630 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1631 0 : pimpl_->loadedHistory_.messageList.clear();
1632 0 : pimpl_->loadedHistory_.quickAccess.clear();
1633 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1634 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1635 : {
1636 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1637 0 : pimpl_->memberToStatus.clear();
1638 0 : }
1639 0 : }
1640 :
1641 : std::string
1642 2180 : Conversation::lastCommitId() const
1643 : {
1644 : {
1645 2180 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1646 2180 : if (!pimpl_->loadedHistory_.messageList.empty())
1647 3552 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1648 2180 : }
1649 404 : LogOptions options;
1650 404 : options.nbOfCommits = 1;
1651 404 : options.skipMerge = true;
1652 404 : History optHistory;
1653 404 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1654 404 : auto res = pimpl_->loadMessages2(options, &optHistory);
1655 404 : if (res.empty())
1656 2 : return {};
1657 804 : return (*optHistory.messageList.begin())->id;
1658 404 : }
1659 :
1660 : std::vector<std::map<std::string, std::string>>
1661 1770 : Conversation::Impl::mergeHistory(const std::string& uri)
1662 : {
1663 1770 : if (not repository_) {
1664 0 : JAMI_WARNING("{} Invalid repo. Abort merge", toString());
1665 0 : return {};
1666 : }
1667 3540 : auto remoteHead = repository_->remoteHead(uri);
1668 1770 : if (remoteHead.empty()) {
1669 4 : JAMI_WARNING("{} Unable to get HEAD of {}", toString(), uri);
1670 1 : return {};
1671 : }
1672 :
1673 : // Validate commit
1674 1769 : auto [newCommits, err] = repository_->validFetch(uri);
1675 1769 : if (newCommits.empty()) {
1676 863 : if (err)
1677 68 : JAMI_ERROR("{} Unable to validate history with {}", toString(), uri);
1678 863 : repository_->removeBranchWith(uri);
1679 863 : return {};
1680 : }
1681 :
1682 : // If validated, merge
1683 906 : auto [ok, cid] = repository_->merge(remoteHead);
1684 906 : if (!ok) {
1685 0 : JAMI_ERROR("{} Unable to merge history with {}", toString(), uri);
1686 0 : repository_->removeBranchWith(uri);
1687 0 : return {};
1688 : }
1689 906 : if (!cid.empty()) {
1690 : // A merge commit was generated, should be added in new commits
1691 12 : auto commit = repository_->getCommit(cid);
1692 12 : if (commit != std::nullopt)
1693 12 : newCommits.emplace_back(*commit);
1694 12 : }
1695 :
1696 3624 : JAMI_LOG("{} Successfully merged history with {:s}", toString(), uri);
1697 906 : auto result = repository_->convCommitsToMap(newCommits);
1698 1835 : for (auto& commit : result) {
1699 929 : auto it = commit.find("type");
1700 929 : if (it != commit.end() && it->second == "member") {
1701 770 : repository_->refreshMembers();
1702 :
1703 770 : if (commit["action"] == "ban")
1704 5 : disconnectFromPeer(commit["uri"]);
1705 : }
1706 : }
1707 906 : return result;
1708 1770 : }
1709 :
1710 : bool
1711 1857 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1712 : {
1713 1857 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1714 5571 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId,
1715 3713 : std::deque<std::pair<std::string, OnPullCb>>());
1716 1858 : auto& pullcbs = it->second;
1717 1858 : auto itPull = std::find_if(pullcbs.begin(), pullcbs.end(), [&](const auto& elem) {
1718 0 : return std::get<0>(elem) == commitId;
1719 3716 : });
1720 1858 : if (itPull != pullcbs.end()) {
1721 0 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1722 : pimpl_->toString(),
1723 : deviceId,
1724 : commitId);
1725 0 : cb(false);
1726 0 : return false;
1727 : }
1728 7431 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1729 1858 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1730 1858 : if (notInProgress)
1731 1850 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1732 1850 : if (auto sthis_ = w.lock())
1733 1850 : sthis_->pimpl_->pull(deviceId);
1734 1850 : });
1735 1858 : return true;
1736 1858 : }
1737 :
1738 : void
1739 1850 : Conversation::Impl::pull(const std::string& deviceId)
1740 : {
1741 1850 : auto& repo = repository_;
1742 :
1743 1850 : std::string commitId;
1744 1850 : OnPullCb cb;
1745 : while (true) {
1746 : {
1747 3708 : std::lock_guard lk(pullcbsMtx_);
1748 3708 : auto it = fetchingRemotes_.find(deviceId);
1749 3708 : if (it == fetchingRemotes_.end()) {
1750 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1751 0 : break;
1752 : }
1753 3708 : auto& pullcbs = it->second;
1754 3708 : if (pullcbs.empty()) {
1755 1850 : fetchingRemotes_.erase(it);
1756 1850 : break;
1757 : }
1758 1858 : auto& elem = pullcbs.front();
1759 1858 : commitId = std::move(std::get<0>(elem));
1760 1858 : cb = std::move(std::get<1>(elem));
1761 1858 : pullcbs.pop_front();
1762 3708 : }
1763 : // If recently fetched, the commit can already be there, so no need to do complex operations
1764 1858 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1765 64 : cb(true);
1766 88 : continue;
1767 : }
1768 : // Pull from remote
1769 1794 : auto fetched = repo->fetch(deviceId);
1770 1794 : if (!fetched) {
1771 24 : cb(false);
1772 24 : continue;
1773 : }
1774 1770 : auto oldHead = repo->getHead();
1775 1770 : std::string newHead = oldHead;
1776 1770 : std::unique_lock lk(writeMtx_);
1777 1770 : auto commits = mergeHistory(deviceId);
1778 1770 : if (!commits.empty()) {
1779 906 : newHead = commits.rbegin()->at("id");
1780 : // Note: Because clients needs to linearize the history, they need to know all commits
1781 : // that can be updated.
1782 : // In this case, all commits until the common merge base should be announced.
1783 : // The client ill need to update it's model after this.
1784 906 : std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1785 906 : auto newHeadCommit = repo->getCommit(newHead);
1786 906 : if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1787 15 : mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1788 15 : LogOptions options;
1789 15 : options.to = mergeBase;
1790 15 : auto updatedCommits = loadMessages(options);
1791 : // We announce commits from oldest to update to newest. This generally avoid
1792 : // to get detached commits until they are all announced.
1793 15 : std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1794 15 : announce(updatedCommits);
1795 15 : } else {
1796 891 : announce(commits);
1797 : }
1798 906 : }
1799 1770 : lk.unlock();
1800 :
1801 1770 : bool commitFound = false;
1802 1770 : if (commitId != "") {
1803 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1804 : // We need to check if we actually got it; the fact that the fetch above was
1805 : // successful doesn't guarantee that we did.
1806 1441 : for (const auto& commit : commits) {
1807 909 : if (commit.at("id") == commitId) {
1808 897 : commitFound = true;
1809 897 : break;
1810 : }
1811 : }
1812 : } else {
1813 341 : commitFound = true;
1814 : }
1815 1770 : if (!commitFound)
1816 2128 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1817 : deviceId,
1818 : commitId);
1819 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1820 : // for commit `commitId` to other members of the swarm. It's important that we only
1821 : // send these notifications if we actually have the commit. Otherwise, we can end up
1822 : // in a situation where the members of the swarm keep sending notifications to each
1823 : // other for a commit that none of them have (note that we are unable to rule this out, as
1824 : // nothing prevents a malicious user from intentionally sending a notification with
1825 : // a fake commit ID).
1826 1770 : if (cb)
1827 1770 : cb(commitFound);
1828 : // Announce if profile changed
1829 1770 : if (oldHead != newHead) {
1830 906 : auto diffStats = repo->diffStats(newHead, oldHead);
1831 906 : auto changedFiles = repo->changedFiles(diffStats);
1832 906 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") != changedFiles.end()) {
1833 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(accountId_,
1834 5 : repo->id(),
1835 10 : repo->infos());
1836 : }
1837 906 : }
1838 3628 : }
1839 1850 : }
1840 :
1841 : void
1842 1857 : Conversation::sync(const std::string& member, const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1843 : {
1844 1857 : pull(deviceId, std::move(cb), commitId);
1845 1858 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1846 1858 : auto sthis = w.lock();
1847 : // For waiting request, downloadFile
1848 1858 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1849 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1850 1858 : }
1851 1858 : });
1852 1858 : }
1853 :
1854 : std::map<std::string, std::string>
1855 267 : Conversation::generateInvitation() const
1856 : {
1857 : // Invite the new member to the conversation
1858 267 : Json::Value root;
1859 267 : auto& metadata = root[ConversationMapKeys::METADATAS];
1860 540 : for (const auto& [k, v] : infos()) {
1861 273 : if (v.size() >= 64000) {
1862 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1863 0 : continue;
1864 0 : }
1865 273 : metadata[k] = v;
1866 267 : }
1867 267 : root[ConversationMapKeys::CONVERSATIONID] = id();
1868 801 : return {{"application/invite+json", json::toString(root)}};
1869 267 : }
1870 :
1871 : std::string
1872 7 : Conversation::leave()
1873 : {
1874 7 : setRemovingFlag();
1875 7 : std::lock_guard lk(pimpl_->writeMtx_);
1876 14 : return pimpl_->repository_->leave();
1877 7 : }
1878 :
1879 : void
1880 10 : Conversation::setRemovingFlag()
1881 : {
1882 10 : pimpl_->isRemoving_ = true;
1883 10 : }
1884 :
1885 : bool
1886 3842 : Conversation::isRemoving()
1887 : {
1888 3842 : return pimpl_->isRemoving_;
1889 : }
1890 :
1891 : void
1892 21 : Conversation::erase()
1893 : {
1894 21 : if (pimpl_->conversationDataPath_ != "")
1895 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1896 21 : if (!pimpl_->repository_)
1897 0 : return;
1898 21 : std::lock_guard lk(pimpl_->writeMtx_);
1899 21 : pimpl_->repository_->erase();
1900 21 : }
1901 :
1902 : ConversationMode
1903 4754 : Conversation::mode() const
1904 : {
1905 4754 : return pimpl_->repository_->mode();
1906 : }
1907 :
1908 : std::vector<std::string>
1909 30 : Conversation::getInitialMembers() const
1910 : {
1911 30 : return pimpl_->repository_->getInitialMembers();
1912 : }
1913 :
1914 : bool
1915 0 : Conversation::isInitialMember(const std::string& uri) const
1916 : {
1917 0 : auto members = getInitialMembers();
1918 0 : return std::find(members.begin(), members.end(), uri) != members.end();
1919 0 : }
1920 :
1921 : void
1922 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
1923 : {
1924 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
1925 9 : if (auto sthis = w.lock()) {
1926 9 : auto& repo = sthis->pimpl_->repository_;
1927 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1928 9 : auto commit = repo->updateInfos(map);
1929 9 : sthis->pimpl_->announce(commit, true);
1930 9 : lk.unlock();
1931 9 : if (cb)
1932 9 : cb(!commit.empty(), commit);
1933 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
1934 9 : repo->id(),
1935 18 : repo->infos());
1936 18 : }
1937 9 : });
1938 9 : }
1939 :
1940 : std::map<std::string, std::string>
1941 314 : Conversation::infos() const
1942 : {
1943 314 : return pimpl_->repository_->infos();
1944 : }
1945 :
1946 : void
1947 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
1948 : {
1949 8 : const auto& filePath = pimpl_->preferencesPath_;
1950 8 : auto prefs = map;
1951 8 : auto itLast = prefs.find(LAST_MODIFIED);
1952 8 : if (itLast != prefs.end()) {
1953 3 : std::error_code ec;
1954 3 : if (std::filesystem::is_regular_file(filePath, ec)) {
1955 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
1956 : try {
1957 1 : if (lastModified >= to_int<uint64_t>(itLast->second))
1958 0 : return;
1959 0 : } catch (...) {
1960 0 : return;
1961 0 : }
1962 : }
1963 3 : prefs.erase(itLast);
1964 : }
1965 :
1966 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
1967 8 : msgpack::pack(file, prefs);
1968 8 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_, id(), std::move(prefs));
1969 8 : }
1970 :
1971 : std::map<std::string, std::string>
1972 102 : Conversation::preferences(bool includeLastModified) const
1973 : {
1974 : try {
1975 102 : std::map<std::string, std::string> preferences;
1976 102 : const auto& filePath = pimpl_->preferencesPath_;
1977 194 : auto file = fileutils::loadFile(filePath);
1978 10 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
1979 10 : oh.get().convert(preferences);
1980 10 : if (includeLastModified)
1981 7 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
1982 10 : return preferences;
1983 193 : } catch (const std::exception& e) {
1984 92 : }
1985 92 : return {};
1986 : }
1987 :
1988 : std::vector<uint8_t>
1989 0 : Conversation::vCard() const
1990 : {
1991 : try {
1992 0 : return fileutils::loadFile(pimpl_->repoPath_ / "profile.vcf");
1993 0 : } catch (...) {
1994 0 : }
1995 0 : return {};
1996 : }
1997 :
1998 : std::shared_ptr<TransferManager>
1999 1961 : Conversation::dataTransfer() const
2000 : {
2001 1961 : return pimpl_->transferManager_;
2002 : }
2003 :
2004 : bool
2005 13 : Conversation::onFileChannelRequest(const std::string& member,
2006 : const std::string& fileId,
2007 : std::filesystem::path& path,
2008 : std::string& sha3sum) const
2009 : {
2010 13 : if (!isMember(member))
2011 0 : return false;
2012 :
2013 13 : auto sep = fileId.find('_');
2014 13 : if (sep == std::string::npos)
2015 0 : return false;
2016 :
2017 13 : auto interactionId = fileId.substr(0, sep);
2018 13 : auto commit = getCommit(interactionId);
2019 26 : if (commit == std::nullopt || commit->find("type") == commit->end() || commit->find("tid") == commit->end()
2020 26 : || commit->find("sha3sum") == commit->end() || commit->at("type") != "application/data-transfer+json") {
2021 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2022 : pimpl_->accountId_,
2023 : member,
2024 : interactionId);
2025 0 : return false;
2026 : }
2027 :
2028 13 : path = dataTransfer()->path(fileId);
2029 13 : sha3sum = commit->at("sha3sum");
2030 :
2031 13 : return true;
2032 13 : }
2033 :
2034 : bool
2035 14 : Conversation::downloadFile(const std::string& interactionId,
2036 : const std::string& fileId,
2037 : const std::string& path,
2038 : const std::string&,
2039 : const std::string& deviceId)
2040 : {
2041 14 : auto commit = getCommit(interactionId);
2042 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2043 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2044 0 : return false;
2045 : }
2046 14 : auto tid = commit->find("tid");
2047 14 : auto sha3sum = commit->find("sha3sum");
2048 14 : auto size_str = commit->find("totalSize");
2049 :
2050 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2051 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2052 0 : return false;
2053 : }
2054 :
2055 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2056 14 : if (totalSize < 0) {
2057 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2058 0 : return false;
2059 : }
2060 :
2061 : // Be sure to not lock conversation
2062 28 : dht::ThreadPool().io().run(
2063 14 : [w = weak(), deviceId, fileId, interactionId, sha3sum = sha3sum->second, path, totalSize] {
2064 14 : if (auto shared = w.lock()) {
2065 14 : std::filesystem::path filePath(path);
2066 14 : if (filePath.empty()) {
2067 0 : filePath = shared->dataTransfer()->path(fileId);
2068 : }
2069 :
2070 14 : if (fileutils::size(filePath) == totalSize) {
2071 1 : if (fileutils::sha3File(filePath) == sha3sum) {
2072 4 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2073 1 : return;
2074 : }
2075 : }
2076 :
2077 13 : std::filesystem::path tempFilePath(filePath);
2078 13 : tempFilePath += ".tmp";
2079 13 : auto start = fileutils::size(tempFilePath);
2080 13 : if (start < 0)
2081 11 : start = 0;
2082 13 : size_t end = 0;
2083 :
2084 13 : auto acc = shared->pimpl_->account_.lock();
2085 13 : if (!acc)
2086 0 : return;
2087 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2088 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2089 28 : }
2090 : });
2091 14 : return true;
2092 14 : }
2093 :
2094 : void
2095 1114 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2096 : {
2097 1114 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2098 1114 : auto sthis = w.lock();
2099 1114 : if (!sthis)
2100 0 : return;
2101 : // Update fetched for Uri
2102 1114 : auto uri = sthis->uriFromDevice(deviceId);
2103 1114 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2104 45 : return;
2105 : // When a user fetches a commit, the message is sent for this person
2106 1069 : sthis->pimpl_->updateStatus(uri,
2107 : libjami::Account::MessageStates::SENT,
2108 1069 : commitId,
2109 2138 : std::to_string(std::time(nullptr)),
2110 : true);
2111 1159 : });
2112 1114 : }
2113 :
2114 : void
2115 1115 : Conversation::Impl::updateStatus(const std::string& uri,
2116 : libjami::Account::MessageStates st,
2117 : const std::string& commitId,
2118 : const std::string& ts,
2119 : bool emit)
2120 : {
2121 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer
2122 : // send us a status and will emit to other connected devices.
2123 1115 : LogOptions options;
2124 1115 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2125 : {
2126 : // Update internal structures.
2127 1115 : std::lock_guard lk(messageStatusMtx_);
2128 1115 : auto& status = messagesStatus_[uri];
2129 1115 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2130 1115 : if (oldStatus == commitId)
2131 2 : return; // Nothing to do
2132 1113 : options.to = oldStatus;
2133 1113 : options.from = commitId;
2134 1113 : oldStatus = commitId;
2135 1113 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2136 1113 : saveStatus();
2137 1113 : if (emit)
2138 1084 : newStatus[uri].insert(status.begin(), status.end());
2139 1115 : }
2140 1113 : if (emit && messageStatusCb_) {
2141 1084 : messageStatusCb_(newStatus);
2142 : }
2143 : // Update messages status for all commit between the old and new one
2144 1113 : options.logIfNotFound = false;
2145 1113 : options.fastLog = true;
2146 1113 : History optHistory;
2147 1113 : std::unique_lock lk(optHistory.mutex); // Avoid to announce messages while updating status.
2148 1113 : auto res = loadMessages2(options, &optHistory);
2149 1113 : std::unique_lock mlk(messageStatusMtx_);
2150 1113 : std::vector<std::pair<std::string, int32_t>> statusToUpdate;
2151 1113 : if (res.size() == 0) {
2152 : // In this case, commit is not received yet, so we cache it
2153 8 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2154 : }
2155 8100 : for (const auto& [cid, _] : optHistory.quickAccess) {
2156 6987 : auto message = loadedHistory_.quickAccess.find(cid);
2157 6987 : if (message != loadedHistory_.quickAccess.end()) {
2158 : // Update message and emit to client,
2159 3540 : if (static_cast<int32_t>(st) > message->second->status[uri]) {
2160 3536 : message->second->status[uri] = static_cast<int32_t>(st);
2161 3536 : statusToUpdate.emplace_back(cid, static_cast<int32_t>(st));
2162 : }
2163 : } else {
2164 : // In this case, commit is not loaded by client, so we cache it
2165 : // No need to emit to client, they will get a correct status on load.
2166 3447 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2167 : }
2168 : }
2169 1113 : mlk.unlock();
2170 1113 : lk.unlock();
2171 4649 : for (const auto& [cid, status] : statusToUpdate)
2172 7072 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
2173 3536 : repository_->id(),
2174 : uri,
2175 : cid,
2176 : static_cast<int>(status));
2177 1117 : }
2178 :
2179 : bool
2180 18 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2181 : {
2182 18 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2183 18 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2184 1 : return false; // Nothing to do
2185 17 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2186 17 : auto sthis = w.lock();
2187 17 : if (!sthis)
2188 0 : return;
2189 17 : sthis->pimpl_->updateStatus(uri,
2190 : libjami::Account::MessageStates::DISPLAYED,
2191 17 : interactionId,
2192 34 : std::to_string(std::time(nullptr)),
2193 : true);
2194 17 : });
2195 17 : return true;
2196 18 : }
2197 :
2198 : std::map<std::string, std::map<std::string, std::string>>
2199 84 : Conversation::messageStatus() const
2200 : {
2201 84 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2202 168 : return pimpl_->messagesStatus_;
2203 84 : }
2204 :
2205 : void
2206 45 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2207 : {
2208 45 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2209 45 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2210 95 : for (const auto& [uri, status] : messageStatus) {
2211 50 : auto& oldMs = pimpl_->messagesStatus_[uri];
2212 50 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2213 26 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2214 0 : stVec.emplace_back(libjami::Account::MessageStates::SENT,
2215 : uri,
2216 26 : status.at("fetched"),
2217 52 : status.at("fetched_ts"));
2218 : }
2219 : }
2220 50 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2221 3 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2222 0 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED,
2223 : uri,
2224 3 : status.at("read"),
2225 6 : status.at("read_ts"));
2226 : }
2227 : }
2228 : }
2229 45 : lk.unlock();
2230 :
2231 74 : for (const auto& [status, uri, commitId, ts] : stVec) {
2232 29 : pimpl_->updateStatus(uri, status, commitId, ts);
2233 : }
2234 45 : }
2235 :
2236 : void
2237 389 : Conversation::onMessageStatusChanged(
2238 : const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2239 : {
2240 389 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2241 389 : pimpl_->messageStatusCb_ = cb;
2242 389 : }
2243 :
2244 : #ifdef LIBJAMI_TEST
2245 : void
2246 513 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2247 : {
2248 513 : pimpl_->bootstrapCbTest_ = cb;
2249 513 : }
2250 : #endif
2251 :
2252 : void
2253 617 : Conversation::checkBootstrapMember(const asio::error_code& ec, std::vector<std::map<std::string, std::string>> members)
2254 : {
2255 617 : if (ec == asio::error::operation_aborted)
2256 321 : return;
2257 591 : auto acc = pimpl_->account_.lock();
2258 591 : if (pimpl_->swarmManager_->isConnected() or not acc)
2259 10 : return;
2260 : // We bootstrap the DRT with devices who already wrote in the repository.
2261 : // However, in a conversation, a large number of devices may just watch
2262 : // the conversation, but never write any message.
2263 581 : std::unique_lock lock(pimpl_->membersMtx_);
2264 :
2265 581 : std::string uri;
2266 870 : while (!members.empty()) {
2267 299 : auto member = std::move(members.back());
2268 299 : members.pop_back();
2269 299 : uri = std::move(member.at("uri"));
2270 299 : if (uri != pimpl_->userId_ && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2271 10 : break;
2272 299 : }
2273 285 : auto fallbackFailed = [](auto sthis) {
2274 1140 : JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.", sthis->pimpl_->toString());
2275 : #ifdef LIBJAMI_TEST
2276 285 : if (sthis->pimpl_->bootstrapCbTest_)
2277 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2278 : #endif
2279 285 : };
2280 : // If members is empty, we finished the fallback un-successfully
2281 581 : if (members.empty() && uri.empty()) {
2282 285 : lock.unlock();
2283 285 : fallbackFailed(this);
2284 285 : return;
2285 : }
2286 :
2287 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2288 296 : pimpl_->checkedMembers_.emplace(uri);
2289 296 : auto devices = std::make_shared<std::vector<NodeId>>();
2290 1184 : acc->forEachDevice(
2291 592 : dht::InfoHash(uri),
2292 294 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2293 : // Test if already sent
2294 294 : if (auto sthis = w.lock()) {
2295 292 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2296 281 : devices->emplace_back(dev->getLongId());
2297 294 : }
2298 294 : },
2299 296 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed = std::move(fallbackFailed)](bool ok) {
2300 296 : auto sthis = w.lock();
2301 296 : if (!sthis)
2302 2 : return;
2303 294 : auto checkNext = true;
2304 294 : if (ok && devices->size() != 0) {
2305 : #ifdef LIBJAMI_TEST
2306 281 : if (sthis->pimpl_->bootstrapCbTest_)
2307 7 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2308 : #endif
2309 1124 : JAMI_LOG("{} Bootstrap: Fallback with member: {}", sthis->pimpl_->toString(), uri);
2310 281 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2311 0 : checkNext = false;
2312 : }
2313 294 : if (checkNext) {
2314 : // Check next member
2315 294 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2316 588 : sthis->pimpl_->fallbackTimer_->async_wait(
2317 588 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2318 : } else {
2319 : // In this case, all members are checked. Fallback failed
2320 0 : fallbackFailed(sthis);
2321 : }
2322 296 : });
2323 1161 : }
2324 :
2325 : void
2326 512 : Conversation::bootstrap(std::function<void()> onBootstrapped, const std::vector<DeviceId>& knownDevices)
2327 : {
2328 512 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2329 512 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2330 0 : return;
2331 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2332 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2333 : // If it works, the callback onConnectionChanged will be called with ok=true
2334 512 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2335 512 : std::vector<DeviceId> devices = knownDevices;
2336 1530 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2337 1018 : if (!isBanned(member))
2338 1018 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2339 512 : }
2340 2048 : JAMI_DEBUG("{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2341 : // set callback
2342 323 : auto fallback = [](std::shared_ptr<Conversation> sthis, bool now = false) {
2343 : // Fallback
2344 323 : auto acc = sthis->pimpl_->account_.lock();
2345 323 : if (!acc)
2346 0 : return;
2347 323 : auto members = sthis->getMembers(false, false);
2348 323 : std::shuffle(members.begin(), members.end(), acc->rand);
2349 323 : if (now) {
2350 289 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2351 : } else {
2352 34 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2353 102 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2354 68 : - std::chrono::seconds(timeForBootstrap));
2355 136 : JAMI_DEBUG("{} Fallback in {} seconds", sthis->pimpl_->toString(), (20 - timeForBootstrap));
2356 : }
2357 646 : sthis->pimpl_->fallbackTimer_->async_wait(
2358 646 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2359 323 : };
2360 :
2361 512 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2362 : // This will call methods from accounts, so trigger on another thread.
2363 342 : dht::ThreadPool::io().run([w, ok, fallback = std::move(fallback)] {
2364 342 : auto sthis = w.lock();
2365 342 : if (!sthis)
2366 0 : return;
2367 342 : if (ok) {
2368 : // Bootstrap succeeded!
2369 : {
2370 308 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2371 308 : sthis->pimpl_->checkedMembers_.clear();
2372 308 : }
2373 308 : if (sthis->pimpl_->bootstrapCb_)
2374 308 : sthis->pimpl_->bootstrapCb_();
2375 : #ifdef LIBJAMI_TEST
2376 307 : if (sthis->pimpl_->bootstrapCbTest_)
2377 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2378 : #endif
2379 308 : return;
2380 : }
2381 34 : fallback(sthis);
2382 342 : });
2383 342 : });
2384 : {
2385 512 : std::lock_guard lock(pimpl_->membersMtx_);
2386 512 : pimpl_->checkedMembers_.clear();
2387 512 : }
2388 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic
2389 : // connectivity change
2390 512 : if (pimpl_->swarmManager_->isShutdown()) {
2391 26 : pimpl_->swarmManager_->restart();
2392 26 : pimpl_->swarmManager_->maintainBuckets();
2393 486 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2394 289 : fallback(shared_from_this(), true);
2395 : }
2396 512 : }
2397 :
2398 : std::vector<std::string>
2399 18 : Conversation::commitsEndedCalls()
2400 : {
2401 18 : pimpl_->loadActiveCalls();
2402 18 : pimpl_->loadHostedCalls();
2403 18 : auto commits = pimpl_->commitsEndedCalls();
2404 18 : if (!commits.empty()) {
2405 : // Announce to client
2406 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2407 0 : if (auto sthis = w.lock())
2408 0 : sthis->pimpl_->announce(commits, true);
2409 0 : });
2410 : }
2411 18 : return commits;
2412 0 : }
2413 :
2414 : void
2415 389 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2416 : {
2417 389 : pimpl_->onMembersChanged_ = std::move(cb);
2418 389 : pimpl_->repository_->onMembersChanged([w = weak()](const std::set<std::string>& memberUris) {
2419 1080 : if (auto sthis = w.lock())
2420 1080 : sthis->pimpl_->onMembersChanged_(memberUris);
2421 1080 : });
2422 389 : }
2423 :
2424 : void
2425 389 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2426 : {
2427 778 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket), w = weak()](const std::string& deviceId,
2428 : ChannelCb&& cb) {
2429 682 : if (auto sthis = w.lock())
2430 682 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2431 1460 : };
2432 389 : }
2433 :
2434 : void
2435 631 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2436 : {
2437 631 : auto deviceId = channel->deviceId();
2438 : // Transmit avatar if necessary
2439 : // We do this here, because at this point we know both sides are connected and in
2440 : // the same conversation
2441 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2442 631 : auto cert = channel->peerCertificate();
2443 631 : if (!cert || !cert->issuer)
2444 0 : return;
2445 631 : auto member = cert->issuer->getId().toString();
2446 631 : pimpl_->swarmManager_->addChannel(std::move(channel));
2447 631 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2448 631 : auto sthis = w.lock();
2449 631 : if (auto account = a.lock()) {
2450 630 : account->sendProfile(sthis->id(), member, deviceId.toString());
2451 631 : }
2452 631 : });
2453 631 : }
2454 :
2455 : uint32_t
2456 4 : Conversation::countInteractions(const std::string& toId, const std::string& fromId, const std::string& authorUri) const
2457 : {
2458 4 : LogOptions options;
2459 4 : options.to = toId;
2460 4 : options.from = fromId;
2461 4 : options.authorUri = authorUri;
2462 4 : options.logIfNotFound = false;
2463 4 : options.fastLog = true;
2464 4 : History history;
2465 4 : std::lock_guard lk(history.mutex);
2466 4 : auto res = pimpl_->loadMessages2(options, &history);
2467 8 : return res.size();
2468 4 : }
2469 :
2470 : void
2471 4 : Conversation::search(uint32_t req, const Filter& filter, const std::shared_ptr<std::atomic_int>& flag) const
2472 : {
2473 : // Because logging a conversation can take quite some time,
2474 : // do it asynchronously
2475 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2476 4 : if (auto sthis = w.lock()) {
2477 4 : History history;
2478 4 : std::vector<std::map<std::string, std::string>> commits {};
2479 : // std::regex_constants::ECMAScript is the default flag.
2480 4 : auto re = std::regex(filter.regexSearch,
2481 4 : filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2482 4 : sthis->pimpl_->repository_->log(
2483 20 : [&](const auto& id, const auto& author, auto& commit) {
2484 20 : if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
2485 : // Filter author
2486 0 : return CallbackResult::Skip;
2487 : }
2488 20 : auto commitTime = git_commit_time(commit.get());
2489 20 : if (filter.before && filter.before < commitTime) {
2490 : // Only get commits before this date
2491 0 : return CallbackResult::Skip;
2492 : }
2493 20 : if (filter.after && filter.after > commitTime) {
2494 : // Only get commits before this date
2495 0 : if (git_commit_parentcount(commit.get()) <= 1)
2496 0 : return CallbackResult::Break;
2497 : else
2498 0 : return CallbackResult::Skip; // Because we are sorting it with
2499 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2500 : }
2501 :
2502 20 : return CallbackResult::Ok; // Continue
2503 : },
2504 20 : [&](auto&& cc) {
2505 40 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2506 40 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2507 20 : },
2508 20 : [&](auto id, auto, auto) {
2509 20 : if (id == filter.lastId)
2510 0 : return true;
2511 20 : return false;
2512 : },
2513 : "",
2514 : false);
2515 : // Search on generated history
2516 24 : for (auto& message : history.messageList) {
2517 20 : auto contentType = message->type;
2518 20 : auto isSearchable = contentType == "text/plain" || contentType == "application/data-transfer+json";
2519 20 : if (filter.type.empty() && !isSearchable) {
2520 : // Not searchable, at least for now
2521 8 : continue;
2522 12 : } else if (contentType == filter.type || filter.type.empty()) {
2523 12 : if (isSearchable) {
2524 : // If it's a text match the body, else the display name
2525 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2526 36 : : message->body.at("displayName");
2527 12 : std::smatch body_match;
2528 12 : if (std::regex_search(body, body_match, re)) {
2529 5 : auto commit = message->body;
2530 5 : commit["id"] = message->id;
2531 5 : commit["type"] = message->type;
2532 5 : commits.emplace_back(commit);
2533 5 : }
2534 12 : } else {
2535 : // Matching type, just add it to the results
2536 0 : commits.emplace_back(message->body);
2537 : }
2538 :
2539 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2540 0 : break;
2541 : }
2542 20 : }
2543 :
2544 4 : if (commits.size() > 0)
2545 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2546 3 : sthis->pimpl_->accountId_,
2547 6 : sthis->id(),
2548 3 : std::move(commits));
2549 : // If we're the latest thread, inform client that the search is finished
2550 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2551 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2552 8 : req, sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2553 : }
2554 8 : }
2555 4 : });
2556 4 : }
2557 :
2558 : void
2559 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2560 : {
2561 14 : if (!message.isMember("confId")) {
2562 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2563 0 : return;
2564 : }
2565 :
2566 14 : auto now = std::chrono::system_clock::now();
2567 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2568 : {
2569 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2570 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2571 14 : pimpl_->saveHostedCalls();
2572 14 : }
2573 :
2574 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2575 : }
2576 :
2577 : bool
2578 20 : Conversation::isHosting(const std::string& confId) const
2579 : {
2580 20 : auto info = infos();
2581 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2582 0 : return true; // We are the current device Host
2583 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2584 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2585 20 : }
2586 :
2587 : void
2588 10 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2589 : {
2590 10 : if (!message.isMember("confId")) {
2591 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2592 0 : return;
2593 : }
2594 :
2595 10 : auto erased = false;
2596 : {
2597 10 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2598 10 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2599 10 : }
2600 10 : if (erased) {
2601 10 : pimpl_->saveHostedCalls();
2602 10 : sendMessage(std::move(message), "", {}, std::move(cb));
2603 : } else
2604 0 : cb(false, "");
2605 : }
2606 :
2607 : std::vector<std::map<std::string, std::string>>
2608 38 : Conversation::currentCalls() const
2609 : {
2610 38 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2611 76 : return pimpl_->activeCalls_;
2612 38 : }
2613 : } // namespace jami
|