Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/jami_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #include "conversationrepository.h"
25 : #ifdef ENABLE_PLUGIN
26 : #include "manager.h"
27 : #include "plugin/jamipluginmanager.h"
28 : #include "plugin/streamdata.h"
29 : #endif
30 : #include "jami/conversation_interface.h"
31 : #include "jami/configurationmanager_interface.h"
32 :
33 : #include "fileutils.h"
34 : #include "json_utils.h"
35 : #include "logger.h"
36 :
37 : #include <opendht/thread_pool.h>
38 : #include <opendht/infohash.h>
39 : #include <fmt/compile.h>
40 : #include <asio/error_code.hpp>
41 :
42 : #include <algorithm>
43 : #include <memory>
44 : #include <charconv>
45 : #include <string_view>
46 : #include <tuple>
47 : #include <optional>
48 : #include <utility>
49 : #include <deque>
50 : #include <chrono>
51 : #include <ctime>
52 : #include <functional>
53 : #include <atomic>
54 : #include <regex>
55 :
56 : namespace jami {
57 :
58 : static const char* const LAST_MODIFIED = "lastModified";
59 :
60 13 : ConvInfo::ConvInfo(const Json::Value& json)
61 : {
62 13 : id = json[ConversationMapKeys::ID].asString();
63 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
64 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
65 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
66 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
67 20 : members.emplace(v["uri"].asString());
68 : }
69 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
70 13 : }
71 :
72 : Json::Value
73 25 : ConvInfo::toJson() const
74 : {
75 25 : Json::Value json;
76 25 : json[ConversationMapKeys::ID] = id;
77 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
78 25 : if (removed) {
79 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
80 : }
81 25 : if (erased) {
82 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
83 : }
84 64 : for (const auto& m : members) {
85 39 : Json::Value member;
86 39 : member["uri"] = m;
87 39 : json[ConversationMapKeys::MEMBERS].append(member);
88 39 : }
89 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
90 25 : return json;
91 0 : }
92 :
93 : // ConversationRequest
94 261 : ConversationRequest::ConversationRequest(const Json::Value& json)
95 : {
96 261 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
97 261 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
98 261 : from = json[ConversationMapKeys::FROM].asString();
99 261 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
100 261 : auto& md = json[ConversationMapKeys::METADATAS];
101 524 : for (const auto& member : md.getMemberNames()) {
102 263 : metadatas.emplace(member, md[member].asString());
103 261 : }
104 261 : }
105 :
106 : Json::Value
107 2 : ConversationRequest::toJson() const
108 : {
109 2 : Json::Value json;
110 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
111 2 : json[ConversationMapKeys::FROM] = from;
112 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
113 2 : if (declined)
114 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
115 4 : for (const auto& [key, value] : metadatas) {
116 2 : json[ConversationMapKeys::METADATAS][key] = value;
117 : }
118 2 : return json;
119 0 : }
120 :
121 : std::map<std::string, std::string>
122 217 : ConversationRequest::toMap() const
123 : {
124 217 : auto result = metadatas;
125 217 : result[ConversationMapKeys::ID] = conversationId;
126 217 : result[ConversationMapKeys::FROM] = from;
127 217 : if (declined)
128 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
129 217 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
130 217 : return result;
131 0 : }
132 :
133 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
134 :
135 : struct History
136 : {
137 : // While loading the history, we need to avoid:
138 : // - reloading history (can just be ignored)
139 : // - adding new commits (should wait for history to be loaded)
140 : std::mutex mutex {};
141 : std::condition_variable cv {};
142 : bool loading {false};
143 : MessageList messageList {};
144 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
145 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
146 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
147 : };
148 :
149 : class Conversation::Impl
150 : {
151 : private:
152 394 : Impl(std::unique_ptr<ConversationRepository>&& repository,
153 : const std::shared_ptr<JamiAccount>& account,
154 : std::vector<ConversationCommit>&& commits = {})
155 394 : : repository_(std::move(repository ? repository : throw std::logic_error("Invalid repository")))
156 389 : , account_(account)
157 389 : , accountId_(account->getAccountID())
158 389 : , userId_(account->getUsername())
159 389 : , deviceId_(account->currentDeviceId())
160 389 : , swarmManager_(std::make_shared<SwarmManager>(NodeId(deviceId_),
161 778 : Manager::instance().getSeededRandomEngine(),
162 389 : [account = account_](const DeviceId& deviceId) {
163 2630 : if (auto acc = account.lock()) {
164 2630 : return acc->isConnectedWith(deviceId);
165 2630 : }
166 0 : return false;
167 : }))
168 389 : , transferManager_(std::make_shared<TransferManager>(accountId_,
169 : "",
170 389 : repository_->id(),
171 389 : Manager::instance().getSeededRandomEngine()))
172 389 : , repoPath_(fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id())
173 389 : , conversationDataPath_(fileutils::get_data_dir() / accountId_ / "conversation_data" / repository_->id())
174 389 : , fetchedPath_(conversationDataPath_ / ConversationDirectories::FETCHED)
175 389 : , sendingPath_(conversationDataPath_ / ConversationDirectories::SENDING)
176 389 : , preferencesPath_(conversationDataPath_ / ConversationDirectories::PREFERENCES)
177 389 : , statusPath_(conversationDataPath_ / ConversationDirectories::STATUS)
178 389 : , hostedCallsPath_(conversationDataPath_ / ConversationDirectories::HOSTED_CALLS)
179 389 : , activeCallsPath_(conversationDataPath_ / ConversationDirectories::ACTIVE_CALLS)
180 389 : , ioContext_(Manager::instance().ioContext())
181 389 : , fallbackTimer_(std::make_unique<asio::steady_timer>(*ioContext_))
182 2339 : , typers_(std::make_shared<Typers>(account, repository_->id()))
183 : {
184 389 : if (!commits.empty())
185 185 : initActiveCalls(repository_->convCommitsToMap(commits));
186 389 : loadStatus();
187 409 : }
188 :
189 190 : Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&& repoAndCommits,
190 : const std::shared_ptr<JamiAccount>& account)
191 190 : : Impl(std::move(repoAndCommits.first), account, std::move(repoAndCommits.second))
192 185 : {}
193 :
194 : public:
195 186 : Impl(const std::shared_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "")
196 186 : : Impl(ConversationRepository::createConversation(account, mode, otherMember), account)
197 186 : {}
198 :
199 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
200 18 : : Impl(std::make_unique<ConversationRepository>(account, conversationId), account)
201 18 : {}
202 :
203 191 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& remoteDevice, const std::string& conversationId)
204 191 : : Impl(ConversationRepository::cloneConversation(account, remoteDevice, conversationId), account)
205 185 : {}
206 :
207 3102 : std::string toString() const
208 : {
209 12408 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
210 : }
211 : mutable std::string fmtStr_;
212 :
213 388 : ~Impl()
214 5820 : {
215 : try {
216 388 : if (fallbackTimer_)
217 388 : fallbackTimer_->cancel();
218 0 : } catch (const std::exception& e) {
219 0 : JAMI_ERROR("{:s} {:s}", toString(), e.what());
220 0 : }
221 388 : }
222 :
223 : /**
224 : * If, for whatever reason, the daemon is stopped while hosting a conference,
225 : * we need to announce the end of this call when restarting.
226 : * To avoid to keep active calls forever.
227 : */
228 : std::vector<std::string> commitsEndedCalls();
229 : bool isAdmin() const;
230 :
231 282 : void announce(const std::string& commitId, bool commitFromSelf = false)
232 : {
233 282 : std::vector<std::string> vec;
234 282 : if (!commitId.empty())
235 280 : vec.emplace_back(commitId);
236 282 : announce(vec, commitFromSelf);
237 282 : }
238 :
239 294 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
240 : {
241 294 : std::vector<ConversationCommit> convcommits;
242 294 : convcommits.reserve(commits.size());
243 598 : for (const auto& cid : commits) {
244 304 : if (auto commit = repository_->getCommit(cid)) {
245 304 : convcommits.emplace_back(*commit);
246 304 : }
247 : }
248 294 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
249 294 : }
250 :
251 : /**
252 : * Initialize activeCalls_ from the list of commits in the repository
253 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
254 : */
255 185 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
256 : {
257 185 : std::unordered_set<std::string> invalidHostUris;
258 185 : std::unordered_set<std::string> invalidCallIds;
259 :
260 185 : std::lock_guard lk(activeCallsMtx_);
261 1161 : for (const auto& commit : commits) {
262 976 : if (commit.at("type") == "member") {
263 : // Each commit of type "member" has an "action" field whose value can be one
264 : // of the following: "add", "join", "remove", "ban", "unban"
265 : // In the case of "remove" and "ban", we need to add the member's URI to
266 : // invalidHostUris to ensure that any call they may have started in the past
267 : // is no longer considered active.
268 : // For the other actions, there's no harm in adding the member's URI anyway,
269 : // since it's not possible to start hosting a call before joining the swarm (or
270 : // before getting unbanned in the case of previously banned members).
271 770 : invalidHostUris.emplace(commit.at("uri"));
272 413 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
273 413 : && commit.find("device") != commit.end()) {
274 : // The commit indicates either the end or the beginning of a call
275 : // (depending on whether there is a "duration" field or not).
276 1 : auto convId = repository_->id();
277 2 : auto confId = commit.at("confId");
278 2 : auto uri = commit.at("uri");
279 2 : auto device = commit.at("device");
280 :
281 3 : if (commit.find("duration") == commit.end() && invalidCallIds.find(confId) == invalidCallIds.end()
282 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
283 1 : std::map<std::string, std::string> activeCall;
284 1 : activeCall["id"] = confId;
285 1 : activeCall["uri"] = uri;
286 1 : activeCall["device"] = device;
287 1 : activeCalls_.emplace_back(activeCall);
288 1 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
289 : convId,
290 : confId,
291 : device,
292 : uri);
293 1 : }
294 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
295 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
296 : // there are sometimes multiple commits indicating the beginning of the same call.)
297 1 : invalidCallIds.emplace(confId);
298 1 : }
299 : }
300 185 : saveActiveCalls();
301 185 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_, repository_->id(), activeCalls_);
302 185 : }
303 :
304 : /**
305 : * Update activeCalls_ via announced commits (in load or via new commits)
306 : * @param commit Commit to check
307 : * @param eraseOnly If we want to ignore added commits
308 : * @param emitSig If we want to emit to client
309 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
310 : */
311 75 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
312 : bool eraseOnly = false,
313 : bool emitSig = true) const
314 : {
315 75 : if (!repository_)
316 0 : return;
317 75 : if (commit.at("type") == "member") {
318 : // In this case, we need to check if we are not removing a hosting member or device
319 17 : std::lock_guard lk(activeCallsMtx_);
320 17 : auto it = activeCalls_.begin();
321 17 : auto updateActives = false;
322 17 : while (it != activeCalls_.end()) {
323 0 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
324 0 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left", it->at("id"), commit.at("uri"));
325 0 : it = activeCalls_.erase(it);
326 0 : updateActives = true;
327 : } else {
328 0 : ++it;
329 : }
330 : }
331 17 : if (updateActives) {
332 0 : saveActiveCalls();
333 0 : if (emitSig)
334 0 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
335 0 : repository_->id(),
336 0 : activeCalls_);
337 : }
338 17 : return;
339 17 : }
340 : // Else, it's a call information
341 171 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
342 171 : && commit.find("device") != commit.end()) {
343 55 : auto convId = repository_->id();
344 110 : auto confId = commit.at("confId");
345 110 : auto uri = commit.at("uri");
346 110 : auto device = commit.at("device");
347 55 : std::lock_guard lk(activeCallsMtx_);
348 55 : auto itActive = std::find_if(activeCalls_.begin(), activeCalls_.end(), [&](const auto& value) {
349 24 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
350 : });
351 55 : if (commit.find("duration") == commit.end()) {
352 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
353 124 : JAMI_DEBUG("swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
354 : convId,
355 : confId,
356 : device,
357 : uri);
358 155 : activeCalls_.emplace_back(std::map<std::string, std::string> {
359 : {"id", confId},
360 : {"uri", uri},
361 : {"device", device},
362 124 : });
363 31 : saveActiveCalls();
364 31 : if (emitSig)
365 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
366 31 : repository_->id(),
367 31 : activeCalls_);
368 : }
369 : } else {
370 24 : if (itActive != activeCalls_.end()) {
371 24 : itActive = activeCalls_.erase(itActive);
372 : // Unlikely, but we must ensure that no duplicate exists
373 24 : while (itActive != activeCalls_.end()) {
374 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
375 0 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
376 : });
377 0 : if (itActive != activeCalls_.end()) {
378 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
379 0 : itActive = activeCalls_.erase(itActive);
380 : }
381 : }
382 :
383 24 : if (eraseOnly) {
384 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
385 : "{:s}, account {:s}",
386 : convId,
387 : confId,
388 : device,
389 : uri);
390 : } else {
391 96 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
392 : convId,
393 : confId,
394 : device,
395 : uri);
396 : }
397 : }
398 24 : saveActiveCalls();
399 24 : if (emitSig)
400 24 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
401 24 : repository_->id(),
402 24 : activeCalls_);
403 : }
404 55 : }
405 : }
406 :
407 1200 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
408 : {
409 1200 : if (!repository_)
410 0 : return;
411 1200 : auto convId = repository_->id();
412 1200 : auto ok = !commits.empty();
413 3598 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
414 1200 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
415 1200 : if (ok) {
416 1198 : bool announceMember = false;
417 2439 : for (const auto& c : commits) {
418 : // Announce member events
419 1241 : if (c.at("type") == "member") {
420 921 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
421 921 : const auto& uri = c.at("uri");
422 921 : const auto& actionStr = c.at("action");
423 921 : auto action = -1;
424 921 : if (actionStr == "add")
425 442 : action = 0;
426 479 : else if (actionStr == "join")
427 461 : action = 1;
428 18 : else if (actionStr == "remove")
429 1 : action = 2;
430 17 : else if (actionStr == "ban")
431 16 : action = 3;
432 1 : else if (actionStr == "unban")
433 1 : action = 4;
434 921 : if (actionStr == "ban" || actionStr == "remove") {
435 : // In this case, a potential host was removed during a call.
436 17 : updateActiveCalls(c);
437 17 : typers_->removeTyper(uri);
438 : }
439 921 : if (action != -1) {
440 921 : announceMember = true;
441 921 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(accountId_,
442 : convId,
443 : uri,
444 : action);
445 : }
446 : }
447 320 : } else if (c.at("type") == "application/call-history+json") {
448 58 : updateActiveCalls(c);
449 : }
450 : #ifdef ENABLE_PLUGIN
451 1241 : auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager();
452 1241 : if (pluginChatManager.hasHandlers()) {
453 0 : auto cm = std::make_shared<JamiMessage>(accountId_, convId, c.at("author") != userId_, c, false);
454 0 : cm->isSwarm = true;
455 0 : pluginChatManager.publishMessage(std::move(cm));
456 0 : }
457 : #endif
458 : }
459 :
460 1198 : if (announceMember && onMembersChanged_) {
461 920 : onMembersChanged_(repository_->memberUris("", {}));
462 : }
463 : }
464 1200 : }
465 :
466 389 : void loadStatus()
467 : {
468 : try {
469 : // read file
470 776 : auto file = fileutils::loadFile(statusPath_);
471 : // load values
472 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
473 2 : std::lock_guard lk {messageStatusMtx_};
474 2 : oh.get().convert(messagesStatus_);
475 389 : } catch (const std::exception& e) {
476 387 : }
477 389 : }
478 1113 : void saveStatus()
479 : {
480 1113 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
481 1113 : msgpack::pack(file, messagesStatus_);
482 1113 : }
483 :
484 18 : void loadActiveCalls() const
485 : {
486 : try {
487 : // read file
488 29 : auto file = fileutils::loadFile(activeCallsPath_);
489 : // load values
490 7 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
491 7 : std::lock_guard lk {activeCallsMtx_};
492 7 : oh.get().convert(activeCalls_);
493 18 : } catch (const std::exception& e) {
494 11 : return;
495 11 : }
496 : }
497 :
498 258 : void saveActiveCalls() const
499 : {
500 258 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
501 258 : msgpack::pack(file, activeCalls_);
502 258 : }
503 :
504 18 : void loadHostedCalls() const
505 : {
506 : try {
507 : // read file
508 30 : auto file = fileutils::loadFile(hostedCallsPath_);
509 : // load values
510 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
511 6 : std::lock_guard lk {activeCallsMtx_};
512 6 : oh.get().convert(hostedCalls_);
513 18 : } catch (const std::exception& e) {
514 12 : return;
515 12 : }
516 : }
517 :
518 42 : void saveHostedCalls() const
519 : {
520 42 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
521 42 : msgpack::pack(file, hostedCalls_);
522 42 : }
523 :
524 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
525 :
526 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
527 : bool includeLeft,
528 : bool includeBanned) const;
529 :
530 6615 : std::string_view bannedType(const std::string& uri) const
531 : {
532 6615 : auto crt = fmt::format("{}.crt", uri);
533 13230 : auto bannedMember = repoPath_ / MemberPath::BANNED / MemberPath::MEMBERS / crt;
534 6614 : if (std::filesystem::is_regular_file(bannedMember))
535 15 : return "members"sv;
536 13196 : auto bannedAdmin = repoPath_ / MemberPath::BANNED / MemberPath::ADMINS / crt;
537 6598 : if (std::filesystem::is_regular_file(bannedAdmin))
538 0 : return "admins"sv;
539 13197 : auto bannedInvited = repoPath_ / MemberPath::BANNED / MemberPath::INVITED / uri;
540 6599 : if (std::filesystem::is_regular_file(bannedInvited))
541 0 : return "invited"sv;
542 13197 : auto bannedDevice = repoPath_ / MemberPath::BANNED / MemberPath::DEVICES / crt;
543 6598 : if (std::filesystem::is_regular_file(bannedDevice))
544 0 : return "devices"sv;
545 6600 : return {};
546 6614 : }
547 :
548 4791 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
549 : {
550 4791 : auto deviceSockets = gitSocketList_.find(deviceId);
551 9582 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
552 : }
553 :
554 2181 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
555 : {
556 2181 : gitSocketList_[deviceId] = socket;
557 2180 : }
558 819 : void removeGitSocket(const DeviceId& deviceId)
559 : {
560 819 : auto deviceSockets = gitSocketList_.find(deviceId);
561 819 : if (deviceSockets != gitSocketList_.end())
562 429 : gitSocketList_.erase(deviceSockets);
563 819 : }
564 :
565 : /**
566 : * Remove all git sockets and all DRT nodes associated with the given peer.
567 : * This is used when a swarm member is banned to ensure that we stop syncing
568 : * with them or sending them message notifications.
569 : */
570 : void disconnectFromPeer(const std::string& peerUri);
571 :
572 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, bool includeLeft) const;
573 :
574 : std::mutex membersMtx_ {};
575 : std::set<std::string> checkedMembers_; // Store members we tried
576 : std::function<void()> bootstrapCb_;
577 : std::mutex bootstrapMtx_ {};
578 : #ifdef LIBJAMI_TEST
579 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
580 : #endif
581 :
582 : std::mutex writeMtx_ {};
583 : const std::unique_ptr<ConversationRepository> repository_;
584 : const std::weak_ptr<JamiAccount> account_;
585 : const std::string accountId_;
586 : const std::string userId_;
587 : const std::string deviceId_;
588 : const std::shared_ptr<SwarmManager> swarmManager_;
589 : std::atomic_bool isRemoving_ {false};
590 : std::vector<libjami::SwarmMessage> loadMessages(const LogOptions& options, History* optHistory = nullptr);
591 : void pull(const std::string& deviceId);
592 :
593 : // Avoid multiple fetch/merges at the same time.
594 : std::mutex pullcbsMtx_ {};
595 : // store current remote in fetch
596 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {};
597 : const std::shared_ptr<TransferManager> transferManager_ {};
598 : const std::filesystem::path repoPath_ {};
599 : const std::filesystem::path conversationDataPath_ {};
600 : const std::filesystem::path fetchedPath_ {};
601 :
602 : // Manage last message displayed and status
603 : const std::filesystem::path sendingPath_ {};
604 : const std::filesystem::path preferencesPath_ {};
605 : const std::filesystem::path statusPath_ {};
606 :
607 : OnMembersChanged onMembersChanged_ {};
608 :
609 : // Manage hosted calls on this device
610 : std::filesystem::path hostedCallsPath_ {};
611 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
612 : // Manage active calls for this conversation (can be hosted by other devices)
613 : std::filesystem::path activeCallsPath_ {};
614 : mutable std::mutex activeCallsMtx_ {};
615 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
616 :
617 : GitSocketList gitSocketList_ {};
618 :
619 : // Bootstrap
620 : const std::shared_ptr<asio::io_context> ioContext_;
621 : const std::unique_ptr<asio::steady_timer> fallbackTimer_;
622 :
623 : /**
624 : * Loaded history represents the linearized history to show for clients
625 : */
626 : History loadedHistory_ {};
627 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
628 : History& history,
629 : const std::vector<std::map<std::string, std::string>>& commits,
630 : bool messageReceived = false,
631 : bool commitFromSelf = false);
632 :
633 : void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
634 : void handleEdition(History& history,
635 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
636 : bool messageReceived) const;
637 : bool handleMessage(History& history,
638 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
639 : bool messageReceived) const;
640 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const;
641 : /**
642 : * {uri, {
643 : * {"fetch", "commitId"},
644 : * {"fetched_ts", "timestamp"},
645 : * {"read", "commitId"},
646 : * {"read_ts", "timestamp"}
647 : * }
648 : * }
649 : */
650 : mutable std::mutex messageStatusMtx_;
651 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
652 : std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
653 : /**
654 : * Status: 0 = commited, 1 = fetched, 2 = read
655 : * This cache the curent status to add in the messages
656 : */
657 : // Note: only store int32_t cause it's easy to pass to dbus this way
658 : // memberToStatus serves as a cache for loading messages
659 : std::map<std::string, int32_t> memberToStatus;
660 :
661 : // futureStatus is used to store the status for receiving messages
662 : // (because we're not sure to fetch the commit before receiving a status change for this)
663 : std::map<std::string, std::map<std::string, int32_t>> futureStatus;
664 : // Update internal structures regarding status
665 : void updateStatus(const std::string& uri,
666 : libjami::Account::MessageStates status,
667 : const std::string& commitId,
668 : const std::string& ts,
669 : bool emit = false);
670 :
671 : std::shared_ptr<Typers> typers_;
672 : };
673 :
674 : bool
675 16 : Conversation::Impl::isAdmin() const
676 : {
677 16 : auto adminsPath = repoPath_ / MemberPath::ADMINS;
678 32 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
679 16 : }
680 :
681 : void
682 16 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
683 : {
684 : // Remove nodes from swarmManager
685 16 : const auto nodes = swarmManager_->getAllNodes();
686 16 : std::vector<NodeId> toRemove;
687 39 : for (const auto node : nodes)
688 23 : if (peerUri == repository_->uriFromDevice(node.toString()))
689 11 : toRemove.emplace_back(node);
690 16 : swarmManager_->deleteNode(toRemove);
691 :
692 : // Remove git sockets with this member
693 38 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
694 22 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
695 11 : it = gitSocketList_.erase(it);
696 : else
697 11 : ++it;
698 : }
699 16 : }
700 :
701 : std::vector<std::map<std::string, std::string>>
702 422 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
703 : {
704 422 : std::vector<std::map<std::string, std::string>> result;
705 422 : auto members = repository_->members();
706 422 : std::lock_guard lk(messageStatusMtx_);
707 1173 : for (const auto& member : members) {
708 751 : if (member.role == MemberRole::BANNED && !includeBanned) {
709 170 : continue;
710 : }
711 751 : if (member.role == MemberRole::INVITED && !includeInvited)
712 170 : continue;
713 581 : if (member.role == MemberRole::LEFT && !includeLeft)
714 0 : continue;
715 581 : auto mm = member.map();
716 581 : auto it = messagesStatus_.find(member.uri);
717 581 : if (it != messagesStatus_.end()) {
718 209 : auto readIt = it->second.find("read");
719 209 : if (readIt != it->second.end())
720 151 : mm[ConversationMapKeys::LAST_DISPLAYED] = readIt->second;
721 : }
722 581 : result.emplace_back(std::move(mm));
723 581 : }
724 844 : return result;
725 422 : }
726 :
727 : std::vector<std::string>
728 18 : Conversation::Impl::commitsEndedCalls()
729 : {
730 : // Handle current calls
731 18 : std::vector<std::string> commits {};
732 18 : std::unique_lock lk(writeMtx_);
733 18 : std::unique_lock lkA(activeCallsMtx_);
734 18 : for (const auto& hostedCall : hostedCalls_) {
735 : // In this case, this means that we left
736 : // the conference while still hosting it, so activeCalls
737 : // will not be correctly updated
738 : // We don't need to send notifications there, as peers will sync with presence
739 0 : Json::Value value;
740 0 : value["uri"] = userId_;
741 0 : value["device"] = deviceId_;
742 0 : value["confId"] = hostedCall.first;
743 0 : value["type"] = "application/call-history+json";
744 0 : auto now = std::chrono::system_clock::now();
745 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
746 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
747 0 : auto itActive = std::find_if(activeCalls_.begin(),
748 : activeCalls_.end(),
749 0 : [this, confId = hostedCall.first](const auto& value) {
750 0 : return value.at("id") == confId && value.at("uri") == userId_
751 0 : && value.at("device") == deviceId_;
752 : });
753 0 : if (itActive != activeCalls_.end())
754 0 : activeCalls_.erase(itActive);
755 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
756 :
757 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
758 0 : }
759 18 : hostedCalls_.clear();
760 18 : saveActiveCalls();
761 18 : saveHostedCalls();
762 36 : return commits;
763 18 : }
764 :
765 : std::vector<libjami::SwarmMessage>
766 1531 : Conversation::Impl::loadMessages(const LogOptions& options, History* optHistory)
767 : {
768 1531 : auto history = optHistory ? optHistory : &loadedHistory_;
769 :
770 : // history->mutex is locked by the caller
771 1531 : if (!repository_ || history->loading) {
772 0 : return {};
773 : }
774 1531 : history->loading = true;
775 :
776 : // By convention, if options.nbOfCommits is zero, then we
777 : // don't impose a limit on the number of commits returned.
778 1531 : bool limitNbOfCommits = options.nbOfCommits > 0;
779 :
780 1531 : auto startLogging = options.from == "";
781 1531 : auto breakLogging = false;
782 1531 : auto currentHistorySize = loadedHistory_.messageList.size();
783 1531 : std::vector<std::string> replies;
784 1531 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
785 3062 : repository_->log(
786 : /* preCondition */
787 7453 : [&](const auto& id, const auto& author, const auto& commit) {
788 7453 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
789 1 : return CallbackResult::Skip;
790 : }
791 7454 : if (id == options.to) {
792 710 : if (options.includeTo)
793 0 : breakLogging = true; // For the next commit
794 : }
795 7447 : if (replies.empty()) { // This avoid load until
796 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
797 : // until this commit
798 14898 : if ((limitNbOfCommits
799 7449 : && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
800 5 : return CallbackResult::Break; // Stop logging
801 7444 : if (breakLogging)
802 0 : return CallbackResult::Break; // Stop logging
803 7444 : if (id == options.to && !options.includeTo) {
804 709 : return CallbackResult::Break; // Stop logging
805 : }
806 : }
807 :
808 6740 : if (!startLogging && options.from != "" && options.from == id)
809 1108 : startLogging = true;
810 6740 : if (!startLogging)
811 24 : return CallbackResult::Skip; // Start logging after this one
812 :
813 6716 : if (options.fastLog) {
814 6012 : if (options.authorUri != "") {
815 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
816 1 : return CallbackResult::Break; // Found author, stop
817 : }
818 : }
819 : }
820 :
821 6707 : return CallbackResult::Ok; // Continue
822 : },
823 : /* emplaceCb */
824 6712 : [&](auto&& cc) {
825 6712 : if (limitNbOfCommits && (msgList.size() == options.nbOfCommits))
826 285 : return;
827 6428 : auto optMessage = repository_->convCommitToMap(cc);
828 6431 : if (!optMessage.has_value())
829 1 : return;
830 6430 : auto message = optMessage.value();
831 6426 : if (message.find("reply-to") != message.end()) {
832 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
833 1 : if (it == replies.end()) {
834 1 : replies.emplace_back(message.at("reply-to"));
835 : }
836 : }
837 6430 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
838 6428 : if (it != replies.end()) {
839 1 : replies.erase(it);
840 : }
841 6429 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
842 6429 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
843 0 : firstMsg = *loadedHistory_.messageList.rbegin();
844 : }
845 19289 : auto added = addToHistory(*history, {message}, false, false);
846 6432 : if (!added.empty() && firstMsg) {
847 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg);
848 : }
849 6432 : msgList.insert(msgList.end(), added.begin(), added.end());
850 6433 : },
851 : /* postCondition */
852 6705 : [&](auto, auto, auto) {
853 : // Stop logging if there was a limit set on the number of commits
854 : // to return and we reached it. This isn't strictly necessary since
855 : // the check at the beginning of `emplaceCb` ensures that we won't
856 : // return too many messages, but it prevents us from needlessly
857 : // iterating over a (potentially) large number of commits.
858 6705 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
859 : },
860 1531 : options.from,
861 1531 : options.logIfNotFound);
862 :
863 1531 : history->loading = false;
864 1531 : history->cv.notify_all();
865 :
866 : // Convert for client (remove ptr)
867 1531 : std::vector<libjami::SwarmMessage> ret;
868 1531 : ret.reserve(msgList.size());
869 7954 : for (const auto& msg : msgList) {
870 6422 : ret.emplace_back(*msg);
871 : }
872 1532 : return ret;
873 1531 : }
874 :
875 : void
876 3 : Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
877 : {
878 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
879 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
880 3 : if (peditIt != history.pendingEditions.end()) {
881 0 : auto oldBody = sharedCommit->body;
882 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
883 0 : if (sharedCommit->body.at("body").empty())
884 0 : return;
885 0 : history.pendingEditions.erase(peditIt);
886 0 : }
887 3 : if (it != history.quickAccess.end()) {
888 3 : it->second->reactions.emplace_back(sharedCommit->body);
889 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
890 3 : repository_->id(),
891 3 : it->second->id,
892 3 : sharedCommit->body);
893 : } else {
894 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
895 : }
896 : }
897 :
898 : void
899 8 : Conversation::Impl::handleEdition(History& history,
900 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
901 : bool messageReceived) const
902 : {
903 16 : auto editId = sharedCommit->body.at("edit");
904 8 : auto it = history.quickAccess.find(editId);
905 8 : if (it != history.quickAccess.end()) {
906 6 : auto baseCommit = it->second;
907 6 : if (baseCommit) {
908 6 : auto itReact = baseCommit->body.find("react-to");
909 6 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ? "tid" : "body";
910 6 : auto body = sharedCommit->body.at(toReplace);
911 : // Edit reaction
912 6 : if (itReact != baseCommit->body.end()) {
913 1 : baseCommit->body[toReplace] = body; // Replace body if pending
914 1 : it = history.quickAccess.find(itReact->second);
915 1 : auto itPending = history.pendingReactions.find(itReact->second);
916 1 : if (it != history.quickAccess.end()) {
917 1 : baseCommit = it->second; // Base commit
918 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
919 1 : baseCommit->reactions.end(),
920 1 : [&](const auto& reaction) {
921 1 : return reaction.at("id") == editId;
922 : });
923 1 : if (itPreviousReact != baseCommit->reactions.end()) {
924 1 : (*itPreviousReact)[toReplace] = body;
925 1 : if (body.empty()) {
926 1 : baseCommit->reactions.erase(itPreviousReact);
927 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
928 1 : repository_->id(),
929 1 : baseCommit->id,
930 : editId);
931 : }
932 : }
933 0 : } else if (itPending != history.pendingReactions.end()) {
934 : // Else edit if pending
935 0 : auto itReaction = std::find_if(itPending->second.begin(),
936 0 : itPending->second.end(),
937 0 : [&](const auto& reaction) { return reaction.at("id") == editId; });
938 0 : if (itReaction != itPending->second.end()) {
939 0 : (*itReaction)[toReplace] = body;
940 0 : if (body.empty())
941 0 : itPending->second.erase(itReaction);
942 : }
943 : } else {
944 : // Add to pending edtions
945 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
946 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
947 : }
948 : } else {
949 : // Normal message
950 5 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
951 5 : it->second->body[toReplace] = sharedCommit->body[toReplace];
952 5 : if (toReplace == "tid") {
953 : // Avoid to replace fileId in client
954 1 : it->second->body["fileId"] = "";
955 : }
956 : // Remove reactions
957 5 : if (sharedCommit->body.at(toReplace).empty())
958 2 : it->second->reactions.clear();
959 5 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
960 : }
961 6 : }
962 6 : } else {
963 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
964 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
965 : }
966 8 : }
967 :
968 : bool
969 7646 : Conversation::Impl::handleMessage(History& history,
970 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
971 : bool messageReceived) const
972 : {
973 7646 : if (messageReceived) {
974 : // For a received message, we place it at the beginning of the list
975 1207 : if (!history.messageList.empty())
976 963 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
977 1207 : history.messageList.emplace_front(sharedCommit);
978 : } else {
979 : // For a loaded message, we load from newest to oldest
980 : // So we change the parent of the last message.
981 6439 : if (!history.messageList.empty())
982 4910 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
983 6438 : history.messageList.emplace_back(sharedCommit);
984 : }
985 : // Handle pending reactions/editions
986 7650 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
987 7644 : if (reactIt != history.pendingReactions.end()) {
988 0 : for (const auto& commitBody : reactIt->second)
989 0 : sharedCommit->reactions.emplace_back(commitBody);
990 0 : history.pendingReactions.erase(reactIt);
991 : }
992 7647 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
993 7646 : if (peditIt != history.pendingEditions.end()) {
994 0 : auto oldBody = sharedCommit->body;
995 0 : if (sharedCommit->type == "application/data-transfer+json") {
996 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
997 0 : sharedCommit->body["fileId"] = "";
998 : } else {
999 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1000 : }
1001 0 : peditIt->second.pop_front();
1002 0 : for (const auto& commit : peditIt->second) {
1003 0 : sharedCommit->editions.emplace_back(commit->body);
1004 : }
1005 0 : sharedCommit->editions.emplace_back(oldBody);
1006 0 : history.pendingEditions.erase(peditIt);
1007 0 : }
1008 : // Announce to client
1009 7646 : if (messageReceived)
1010 1207 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
1011 7648 : return !messageReceived;
1012 : }
1013 :
1014 : void
1015 7662 : Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const
1016 : {
1017 7662 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1018 7661 : auto currentMessage = message;
1019 :
1020 19691 : while (parentIt != history.quickAccess.end()) {
1021 12028 : const auto& parent = parentIt->second;
1022 12428 : for (const auto& [peer, value] : message->status) {
1023 11337 : auto parentStatusIt = parent->status.find(peer);
1024 11328 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1025 397 : parent->status[peer] = value;
1026 794 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
1027 397 : repository_->id(),
1028 : peer,
1029 397 : parent->id,
1030 : value);
1031 10937 : } else if (parentStatusIt->second >= value) {
1032 10937 : break;
1033 : }
1034 : }
1035 12032 : currentMessage = parent;
1036 12033 : parentIt = history.quickAccess.find(parent->linearizedParent);
1037 : }
1038 7661 : }
1039 :
1040 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1041 7653 : Conversation::Impl::addToHistory(History& history,
1042 : const std::vector<std::map<std::string, std::string>>& commits,
1043 : bool messageReceived,
1044 : bool commitFromSelf)
1045 : {
1046 : //
1047 : // NOTE: This function makes the following assumptions on its arguments:
1048 : // - The messages in "history" are in reverse chronological order (newest message
1049 : // first, oldest message last).
1050 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1051 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1052 : // They are therefore inserted at the beginning of the message list.
1053 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1054 : // reverse chronological order (newest to oldest) and to be older than the ones in
1055 : // "history". They are therefore appended at the end of the message list.
1056 : //
1057 7653 : auto acc = account_.lock();
1058 7652 : if (!acc)
1059 0 : return {};
1060 7653 : auto username = acc->getUsername();
1061 7652 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1062 0 : std::unique_lock lk(history.mutex);
1063 0 : history.cv.wait(lk, [&] { return !history.loading; });
1064 0 : }
1065 :
1066 : // Only set messages' status on history for client
1067 7652 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1068 :
1069 7652 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1070 15337 : for (const auto& commit : commits) {
1071 15382 : auto commitId = commit.at("id");
1072 7688 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1073 1 : continue; // Already present
1074 7691 : auto typeIt = commit.find("type");
1075 : // Nothing to show for the client, skip
1076 7685 : if (typeIt != commit.end() && typeIt->second == "merge")
1077 31 : continue;
1078 :
1079 7655 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1080 7657 : sharedCommit->fromMapStringString(commit);
1081 :
1082 7656 : if (needToSetMessageStatus) {
1083 920 : std::lock_guard lk(messageStatusMtx_);
1084 : // Check if we already have status information for the commit.
1085 920 : auto itFuture = futureStatus.find(sharedCommit->id);
1086 920 : if (itFuture != futureStatus.end()) {
1087 10 : sharedCommit->status = std::move(itFuture->second);
1088 10 : futureStatus.erase(itFuture);
1089 : }
1090 920 : }
1091 7656 : sharedCommits.emplace_back(sharedCommit);
1092 7688 : }
1093 :
1094 7646 : if (needToSetMessageStatus) {
1095 914 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1096 914 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1097 914 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1098 :
1099 914 : std::lock_guard lk(messageStatusMtx_);
1100 13521 : for (const auto& member : repository_->members()) {
1101 : // For each member, we iterate over the commits to add in reverse chronological
1102 : // order (i.e. from newest to oldest) and set their status from the point of view
1103 : // of that member (as best we can given the information we have).
1104 : //
1105 : // The key assumption made in order to compute the status is that it can never decrease
1106 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1107 : // therefore start by setting the "status" variable below to the lowest possible value,
1108 : // and increase it when we encounter a commit for which it is justified to do so.
1109 : //
1110 : // If messageReceived is true, then the commits we are adding are the most recent in the
1111 : // conversation history, so the lowest possible value is SENDING.
1112 : //
1113 : // If messageReceived is false, then the commits we are adding are older than the ones
1114 : // that are already in the history, so the lowest possible value is the status of the
1115 : // oldest message in the history so far, which is stored in memberToStatus.
1116 12602 : auto status = SENDING;
1117 12602 : if (!messageReceived) {
1118 12 : auto cache = memberToStatus[member.uri];
1119 12 : if (cache > status)
1120 9 : status = cache;
1121 : }
1122 12602 : auto& messagesStatus = messagesStatus_[member.uri];
1123 :
1124 25222 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1125 12624 : auto sharedCommit = *it;
1126 12625 : auto previousStatus = status;
1127 12625 : auto& commitStatus = sharedCommit->status[member.uri];
1128 :
1129 : // Compute status for the current commit.
1130 12621 : if (status < SENT && messagesStatus["fetched"] == sharedCommit->id) {
1131 9 : status = SENT;
1132 : }
1133 12607 : if (messagesStatus["read"] == sharedCommit->id) {
1134 1 : status = DISPLAYED;
1135 : }
1136 12614 : if (member.uri == sharedCommit->body.at("author")) {
1137 919 : status = DISPLAYED;
1138 : }
1139 12612 : if (status < commitStatus) {
1140 1 : status = commitStatus;
1141 : }
1142 :
1143 : // Store computed value.
1144 12612 : commitStatus = status;
1145 :
1146 : // Update messagesStatus_ if needed.
1147 12612 : if (previousStatus == SENDING && status >= SENT) {
1148 916 : messagesStatus["fetched"] = sharedCommit->id;
1149 : }
1150 12612 : if (previousStatus <= SENT && status == DISPLAYED) {
1151 907 : messagesStatus["read"] = sharedCommit->id;
1152 : }
1153 12612 : }
1154 :
1155 12607 : if (!messageReceived) {
1156 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1157 12 : memberToStatus[member.uri] = status;
1158 : }
1159 913 : }
1160 914 : }
1161 :
1162 7646 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1163 15308 : for (const auto& sharedCommit : sharedCommits) {
1164 7660 : history.quickAccess[sharedCommit->id] = sharedCommit;
1165 :
1166 7660 : auto reactToIt = sharedCommit->body.find("react-to");
1167 7658 : auto editIt = sharedCommit->body.find("edit");
1168 7655 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1169 3 : handleReaction(history, sharedCommit);
1170 7657 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1171 8 : handleEdition(history, sharedCommit, messageReceived);
1172 7647 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1173 6438 : messages.emplace_back(sharedCommit);
1174 : }
1175 7660 : rectifyStatus(sharedCommit, history);
1176 : }
1177 :
1178 7651 : return messages;
1179 7651 : }
1180 :
1181 186 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1182 : ConversationMode mode,
1183 186 : const std::string& otherMember)
1184 186 : : pimpl_ {new Impl {account, mode, otherMember}}
1185 186 : {}
1186 :
1187 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
1188 18 : : pimpl_ {new Impl {account, conversationId}}
1189 18 : {}
1190 :
1191 191 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1192 : const std::string& remoteDevice,
1193 191 : const std::string& conversationId)
1194 191 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1195 191 : {}
1196 :
1197 388 : Conversation::~Conversation() {}
1198 :
1199 : std::string
1200 4847 : Conversation::id() const
1201 : {
1202 4847 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1203 : }
1204 :
1205 : void
1206 138 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1207 : {
1208 : try {
1209 138 : if (mode() == ConversationMode::ONE_TO_ONE) {
1210 : // Only authorize to add left members
1211 1 : auto initialMembers = getInitialMembers();
1212 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1213 1 : if (it == initialMembers.end()) {
1214 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1215 1 : cb(false, "");
1216 1 : return;
1217 : }
1218 1 : }
1219 0 : } catch (const std::exception& e) {
1220 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1221 0 : cb(false, "");
1222 0 : return;
1223 0 : }
1224 137 : if (isMember(contactUri, true)) {
1225 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1226 0 : cb(false, "");
1227 0 : return;
1228 : }
1229 137 : if (isBanned(contactUri)) {
1230 2 : if (pimpl_->isAdmin()) {
1231 1 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1232 1 : if (auto sthis = w.lock()) {
1233 1 : auto members = sthis->pimpl_->repository_->members();
1234 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1235 1 : if (type.empty()) {
1236 0 : cb(false, {});
1237 0 : return;
1238 : }
1239 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1240 2 : }
1241 : });
1242 : } else {
1243 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1244 1 : cb(false, "");
1245 : }
1246 2 : return;
1247 : }
1248 :
1249 135 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1250 135 : if (auto sthis = w.lock()) {
1251 : // Add member files and commit
1252 135 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1253 135 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1254 135 : if (not commit.empty())
1255 135 : sthis->pimpl_->announce(commit, true);
1256 135 : lk.unlock();
1257 135 : if (cb)
1258 135 : cb(!commit.empty(), commit);
1259 270 : }
1260 135 : });
1261 : }
1262 :
1263 : std::shared_ptr<dhtnet::ChannelSocket>
1264 4791 : Conversation::gitSocket(const DeviceId& deviceId) const
1265 : {
1266 4791 : return pimpl_->gitSocket(deviceId);
1267 : }
1268 :
1269 : void
1270 2181 : Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1271 : {
1272 2181 : pimpl_->addGitSocket(deviceId, socket);
1273 2180 : }
1274 :
1275 : void
1276 819 : Conversation::removeGitSocket(const DeviceId& deviceId)
1277 : {
1278 819 : pimpl_->removeGitSocket(deviceId);
1279 819 : }
1280 :
1281 : void
1282 380 : Conversation::shutdownConnections()
1283 : {
1284 380 : pimpl_->fallbackTimer_->cancel();
1285 380 : pimpl_->gitSocketList_.clear();
1286 380 : if (pimpl_->swarmManager_)
1287 380 : pimpl_->swarmManager_->shutdown();
1288 380 : std::lock_guard lk(pimpl_->membersMtx_);
1289 380 : pimpl_->checkedMembers_.clear();
1290 380 : }
1291 :
1292 : void
1293 0 : Conversation::connectivityChanged()
1294 : {
1295 0 : if (pimpl_->swarmManager_)
1296 0 : pimpl_->swarmManager_->maintainBuckets();
1297 0 : }
1298 :
1299 : std::vector<jami::DeviceId>
1300 0 : Conversation::getDeviceIdList() const
1301 : {
1302 0 : return pimpl_->swarmManager_->getAllNodes();
1303 : }
1304 :
1305 : std::shared_ptr<Typers>
1306 9 : Conversation::typers() const
1307 : {
1308 9 : return pimpl_->typers_;
1309 : }
1310 :
1311 : bool
1312 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1313 : {
1314 0 : if (!pimpl_->swarmManager_)
1315 0 : return false;
1316 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1317 : }
1318 :
1319 : void
1320 1 : Conversation::Impl::voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb)
1321 : {
1322 : // Check if admin
1323 1 : if (!isAdmin()) {
1324 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1325 0 : cb(false, {});
1326 0 : return;
1327 : }
1328 :
1329 : // Vote for removal
1330 1 : std::unique_lock lk(writeMtx_);
1331 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1332 1 : if (voteCommit.empty()) {
1333 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1334 0 : cb(false, "");
1335 0 : return;
1336 : }
1337 :
1338 1 : auto lastId = voteCommit;
1339 1 : std::vector<std::string> commits;
1340 1 : commits.emplace_back(voteCommit);
1341 :
1342 : // If admin, check vote
1343 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1344 1 : if (!resolveCommit.empty()) {
1345 1 : commits.emplace_back(resolveCommit);
1346 1 : lastId = resolveCommit;
1347 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1348 : }
1349 1 : announce(commits, true);
1350 1 : lk.unlock();
1351 1 : if (cb)
1352 1 : cb(!lastId.empty(), lastId);
1353 1 : }
1354 :
1355 : void
1356 13 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1357 : {
1358 39 : dht::ThreadPool::io().run(
1359 26 : [w = weak(), contactUri = std::move(contactUri), isDevice = std::move(isDevice), cb = std::move(cb)] {
1360 13 : if (auto sthis = w.lock()) {
1361 : // Check if admin
1362 13 : if (!sthis->pimpl_->isAdmin()) {
1363 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1364 1 : cb(false, {});
1365 2 : return;
1366 : }
1367 :
1368 : // Get current user type
1369 12 : std::string type;
1370 12 : if (isDevice) {
1371 0 : type = "devices";
1372 : } else {
1373 12 : auto members = sthis->pimpl_->repository_->members();
1374 26 : for (const auto& member : members) {
1375 26 : if (member.uri == contactUri) {
1376 12 : if (member.role == MemberRole::INVITED) {
1377 2 : type = "invited";
1378 10 : } else if (member.role == MemberRole::ADMIN) {
1379 1 : type = "admins";
1380 9 : } else if (member.role == MemberRole::MEMBER) {
1381 9 : type = "members";
1382 : }
1383 12 : break;
1384 : }
1385 : }
1386 12 : if (type.empty()) {
1387 0 : cb(false, {});
1388 0 : return;
1389 : }
1390 12 : }
1391 :
1392 : // Vote for removal
1393 12 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1394 12 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1395 12 : if (voteCommit.empty()) {
1396 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1397 1 : cb(false, "");
1398 1 : return;
1399 : }
1400 :
1401 11 : auto lastId = voteCommit;
1402 11 : std::vector<std::string> commits;
1403 11 : commits.emplace_back(voteCommit);
1404 :
1405 : // If admin, check vote
1406 22 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1407 11 : if (!resolveCommit.empty()) {
1408 11 : commits.emplace_back(resolveCommit);
1409 11 : lastId = resolveCommit;
1410 11 : JAMI_WARN("Vote solved for %s. %s banned", contactUri.c_str(), isDevice ? "Device" : "Member");
1411 11 : sthis->pimpl_->disconnectFromPeer(contactUri);
1412 : }
1413 :
1414 11 : sthis->pimpl_->announce(commits, true);
1415 11 : lk.unlock();
1416 11 : cb(!lastId.empty(), lastId);
1417 27 : }
1418 : });
1419 13 : }
1420 :
1421 : std::vector<std::map<std::string, std::string>>
1422 422 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1423 : {
1424 422 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1425 : }
1426 :
1427 : std::set<std::string>
1428 2073 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1429 : {
1430 2073 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1431 : }
1432 :
1433 : std::vector<NodeId>
1434 1796 : Conversation::peersToSyncWith() const
1435 : {
1436 1796 : auto s = pimpl_->swarmManager_->getConnectedNodes();
1437 13674 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1438 11878 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1439 5697 : s.emplace_back(deviceId);
1440 1797 : return s;
1441 0 : }
1442 :
1443 : bool
1444 1797 : Conversation::isBootstrapped() const
1445 : {
1446 1797 : return pimpl_->swarmManager_->isConnected();
1447 : }
1448 :
1449 : std::string
1450 13831 : Conversation::uriFromDevice(const std::string& deviceId) const
1451 : {
1452 13831 : return pimpl_->repository_->uriFromDevice(deviceId);
1453 : }
1454 :
1455 : void
1456 0 : Conversation::monitor()
1457 : {
1458 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1459 0 : }
1460 :
1461 : std::string
1462 185 : Conversation::join()
1463 : {
1464 185 : return pimpl_->repository_->join();
1465 : }
1466 :
1467 : bool
1468 3416 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1469 : {
1470 3416 : auto uriCrt = uri + ".crt"sv;
1471 6833 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::ADMINS / uriCrt)
1472 6834 : || std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::MEMBERS / uriCrt)) {
1473 2371 : return true;
1474 : }
1475 1045 : if (includeInvited) {
1476 871 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::INVITED / uri)) {
1477 594 : return true;
1478 : }
1479 277 : if (mode() == ConversationMode::ONE_TO_ONE) {
1480 3 : for (const auto& member : getInitialMembers()) {
1481 2 : if (member == uri)
1482 0 : return true;
1483 1 : }
1484 : }
1485 : }
1486 451 : return false;
1487 3416 : }
1488 :
1489 : bool
1490 6613 : Conversation::isBanned(const std::string& uri) const
1491 : {
1492 6613 : return !pimpl_->bannedType(uri).empty();
1493 : }
1494 :
1495 : void
1496 0 : Conversation::sendMessage(
1497 : std::string&& message, const std::string& type, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1498 : {
1499 0 : Json::Value json;
1500 0 : json["body"] = std::move(message);
1501 0 : json["type"] = type;
1502 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1503 0 : }
1504 :
1505 : void
1506 139 : Conversation::sendMessage(Json::Value&& value, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1507 : {
1508 139 : if (!replyTo.empty()) {
1509 2 : if (!pimpl_->repository_->hasCommit(replyTo)) {
1510 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1511 1 : return;
1512 : }
1513 1 : value["reply-to"] = replyTo;
1514 : }
1515 552 : dht::ThreadPool::io().run(
1516 414 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1517 138 : if (auto sthis = w.lock()) {
1518 138 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1519 138 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1520 138 : lk.unlock();
1521 138 : if (onCommit)
1522 13 : onCommit(commit);
1523 138 : sthis->pimpl_->announce(commit, true);
1524 138 : if (cb)
1525 138 : cb(!commit.empty(), commit);
1526 276 : }
1527 138 : });
1528 : }
1529 :
1530 : void
1531 0 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1532 : {
1533 0 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1534 0 : if (auto sthis = w.lock()) {
1535 0 : std::vector<std::string> commits;
1536 0 : commits.reserve(messages.size());
1537 0 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1538 0 : for (const auto& message : messages) {
1539 0 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1540 0 : commits.emplace_back(std::move(commit));
1541 0 : }
1542 0 : lk.unlock();
1543 0 : sthis->pimpl_->announce(commits, true);
1544 0 : if (cb)
1545 0 : cb(commits);
1546 0 : }
1547 0 : });
1548 0 : }
1549 :
1550 : bool
1551 12391 : Conversation::hasCommit(const std::string& commitId) const
1552 : {
1553 12391 : return pimpl_->repository_->hasCommit(commitId);
1554 : }
1555 :
1556 : std::optional<std::map<std::string, std::string>>
1557 34 : Conversation::getCommit(const std::string& commitId) const
1558 : {
1559 34 : auto commit = pimpl_->repository_->getCommit(commitId);
1560 34 : if (commit == std::nullopt)
1561 1 : return std::nullopt;
1562 33 : return pimpl_->repository_->convCommitToMap(*commit);
1563 34 : }
1564 :
1565 : void
1566 2 : Conversation::loadMessages(const OnLoadMessages& cb, const LogOptions& options)
1567 : {
1568 2 : if (!cb)
1569 0 : return;
1570 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1571 2 : if (auto sthis = w.lock()) {
1572 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1573 2 : auto result = sthis->pimpl_->loadMessages(options);
1574 2 : lk.unlock();
1575 2 : cb(std::move(result));
1576 4 : }
1577 2 : });
1578 : }
1579 :
1580 : void
1581 0 : Conversation::clearCache()
1582 : {
1583 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1584 0 : pimpl_->loadedHistory_.messageList.clear();
1585 0 : pimpl_->loadedHistory_.quickAccess.clear();
1586 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1587 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1588 : {
1589 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1590 0 : pimpl_->memberToStatus.clear();
1591 0 : }
1592 0 : }
1593 :
1594 : std::string
1595 2342 : Conversation::lastCommitId() const
1596 : {
1597 : {
1598 2342 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1599 2341 : if (!pimpl_->loadedHistory_.messageList.empty())
1600 3859 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1601 2340 : }
1602 412 : LogOptions options;
1603 412 : options.nbOfCommits = 1;
1604 412 : options.skipMerge = true;
1605 412 : History optHistory;
1606 411 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1607 412 : auto res = pimpl_->loadMessages(options, &optHistory);
1608 412 : if (res.empty())
1609 0 : return {};
1610 824 : return (*optHistory.messageList.begin())->id;
1611 412 : }
1612 :
1613 : bool
1614 1996 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1615 : {
1616 1996 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1617 5988 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId,
1618 3992 : std::deque<std::pair<std::string, OnPullCb>>());
1619 1995 : auto& pullcbs = it->second;
1620 1995 : auto itPull = std::find_if(pullcbs.begin(), pullcbs.end(), [&](const auto& elem) {
1621 1 : return std::get<0>(elem) == commitId;
1622 3991 : });
1623 1996 : if (itPull != pullcbs.end()) {
1624 4 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1625 : pimpl_->toString(),
1626 : deviceId,
1627 : commitId);
1628 1 : cb(false);
1629 1 : return false;
1630 : }
1631 7980 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1632 1995 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1633 1995 : if (notInProgress)
1634 1977 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1635 1977 : if (auto sthis_ = w.lock())
1636 1977 : sthis_->pimpl_->pull(deviceId);
1637 1977 : });
1638 1995 : return true;
1639 1996 : }
1640 :
1641 : void
1642 1977 : Conversation::Impl::pull(const std::string& deviceId)
1643 : {
1644 1977 : auto& repo = repository_;
1645 :
1646 1977 : std::string commitId;
1647 1977 : OnPullCb cb;
1648 : while (true) {
1649 : {
1650 3972 : std::lock_guard lk(pullcbsMtx_);
1651 3972 : auto it = fetchingRemotes_.find(deviceId);
1652 3972 : if (it == fetchingRemotes_.end()) {
1653 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1654 0 : break;
1655 : }
1656 3972 : auto& pullcbs = it->second;
1657 3972 : if (pullcbs.empty()) {
1658 1977 : fetchingRemotes_.erase(it);
1659 1977 : break;
1660 : }
1661 1995 : auto& elem = pullcbs.front();
1662 1995 : commitId = std::move(std::get<0>(elem));
1663 1995 : cb = std::move(std::get<1>(elem));
1664 1995 : pullcbs.pop_front();
1665 3972 : }
1666 : // If recently fetched, the commit can already be there, so no need to do complex operations
1667 1995 : if (commitId != "" && repo->hasCommit(commitId)) {
1668 75 : cb(true);
1669 102 : continue;
1670 : }
1671 : // Pull from remote
1672 1920 : auto fetched = repo->fetch(deviceId);
1673 1920 : if (!fetched) {
1674 27 : cb(false);
1675 27 : continue;
1676 : }
1677 :
1678 1893 : auto oldHead = repo->getHead();
1679 :
1680 1893 : std::unique_lock lk(writeMtx_);
1681 : auto commits = repo->mergeHistory(deviceId,
1682 1898 : [this](const std::string& peerUri) { this->disconnectFromPeer(peerUri); });
1683 1892 : if (!commits.empty()) {
1684 906 : announce(commits);
1685 : }
1686 1892 : lk.unlock();
1687 :
1688 1893 : bool commitFound = false;
1689 1893 : if (commitId != "") {
1690 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1691 : // We need to check if we actually got it; the fact that the fetch above was
1692 : // successful doesn't guarantee that we did.
1693 1512 : for (const auto& commit : commits) {
1694 914 : if (commit.at("id") == commitId) {
1695 902 : commitFound = true;
1696 902 : break;
1697 : }
1698 : }
1699 : } else {
1700 393 : commitFound = true;
1701 : }
1702 1893 : if (!commitFound)
1703 2392 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1704 : deviceId,
1705 : commitId);
1706 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1707 : // for commit `commitId` to other members of the swarm. It's important that we only
1708 : // send these notifications if we actually have the commit. Otherwise, we can end up
1709 : // in a situation where the members of the swarm keep sending notifications to each
1710 : // other for a commit that none of them have (note that we are unable to rule this out, as
1711 : // nothing prevents a malicious user from intentionally sending a notification with
1712 : // a fake commit ID).
1713 1893 : if (cb)
1714 1893 : cb(commitFound);
1715 :
1716 : // Announce if profile changed
1717 1893 : if (!commits.empty()) {
1718 1812 : auto diffStats = repo->diffStats("HEAD", oldHead);
1719 906 : auto changedFiles = repo->changedFiles(diffStats);
1720 906 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") != changedFiles.end()) {
1721 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(accountId_,
1722 5 : repo->id(),
1723 10 : repo->infos());
1724 : }
1725 906 : }
1726 3888 : }
1727 1977 : }
1728 :
1729 : void
1730 1996 : Conversation::sync(const std::string& member, const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1731 : {
1732 1996 : pull(deviceId, std::move(cb), commitId);
1733 1996 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1734 1995 : auto sthis = w.lock();
1735 : // For waiting request, downloadFile
1736 1997 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1737 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1738 1996 : }
1739 1996 : });
1740 1996 : }
1741 :
1742 : std::map<std::string, std::string>
1743 266 : Conversation::generateInvitation() const
1744 : {
1745 : // Invite the new member to the conversation
1746 266 : Json::Value root;
1747 266 : auto& metadata = root[ConversationMapKeys::METADATAS];
1748 538 : for (const auto& [k, v] : infos()) {
1749 272 : if (v.size() >= 64000) {
1750 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1751 0 : continue;
1752 0 : }
1753 272 : metadata[k] = v;
1754 266 : }
1755 266 : root[ConversationMapKeys::CONVERSATIONID] = id();
1756 798 : return {{"application/invite+json", json::toString(root)}};
1757 266 : }
1758 :
1759 : std::string
1760 7 : Conversation::leave()
1761 : {
1762 7 : setRemovingFlag();
1763 7 : std::lock_guard lk(pimpl_->writeMtx_);
1764 14 : return pimpl_->repository_->leave();
1765 7 : }
1766 :
1767 : void
1768 10 : Conversation::setRemovingFlag()
1769 : {
1770 10 : pimpl_->isRemoving_ = true;
1771 10 : }
1772 :
1773 : bool
1774 3934 : Conversation::isRemoving()
1775 : {
1776 3934 : return pimpl_->isRemoving_;
1777 : }
1778 :
1779 : void
1780 21 : Conversation::erase()
1781 : {
1782 21 : if (pimpl_->conversationDataPath_ != "")
1783 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1784 21 : if (!pimpl_->repository_)
1785 0 : return;
1786 21 : std::lock_guard lk(pimpl_->writeMtx_);
1787 21 : pimpl_->repository_->erase();
1788 21 : }
1789 :
1790 : ConversationMode
1791 4748 : Conversation::mode() const
1792 : {
1793 4748 : return pimpl_->repository_->mode();
1794 : }
1795 :
1796 : std::vector<std::string>
1797 30 : Conversation::getInitialMembers() const
1798 : {
1799 30 : return pimpl_->repository_->getInitialMembers();
1800 : }
1801 :
1802 : bool
1803 0 : Conversation::isInitialMember(const std::string& uri) const
1804 : {
1805 0 : auto members = getInitialMembers();
1806 0 : return std::find(members.begin(), members.end(), uri) != members.end();
1807 0 : }
1808 :
1809 : void
1810 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
1811 : {
1812 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
1813 9 : if (auto sthis = w.lock()) {
1814 9 : auto& repo = sthis->pimpl_->repository_;
1815 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1816 9 : auto commit = repo->updateInfos(map);
1817 9 : sthis->pimpl_->announce(commit, true);
1818 9 : lk.unlock();
1819 9 : if (cb)
1820 9 : cb(!commit.empty(), commit);
1821 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
1822 9 : repo->id(),
1823 18 : repo->infos());
1824 18 : }
1825 9 : });
1826 9 : }
1827 :
1828 : std::map<std::string, std::string>
1829 313 : Conversation::infos() const
1830 : {
1831 313 : return pimpl_->repository_->infos();
1832 : }
1833 :
1834 : void
1835 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
1836 : {
1837 8 : const auto& filePath = pimpl_->preferencesPath_;
1838 8 : auto prefs = map;
1839 8 : auto itLast = prefs.find(LAST_MODIFIED);
1840 8 : if (itLast != prefs.end()) {
1841 3 : std::error_code ec;
1842 3 : if (std::filesystem::is_regular_file(filePath, ec)) {
1843 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
1844 : try {
1845 1 : if (lastModified >= to_int<uint64_t>(itLast->second))
1846 0 : return;
1847 0 : } catch (...) {
1848 0 : return;
1849 0 : }
1850 : }
1851 3 : prefs.erase(itLast);
1852 : }
1853 :
1854 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
1855 8 : msgpack::pack(file, prefs);
1856 8 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_, id(), std::move(prefs));
1857 8 : }
1858 :
1859 : std::map<std::string, std::string>
1860 106 : Conversation::preferences(bool includeLastModified) const
1861 : {
1862 : try {
1863 106 : std::map<std::string, std::string> preferences;
1864 106 : const auto& filePath = pimpl_->preferencesPath_;
1865 202 : auto file = fileutils::loadFile(filePath);
1866 10 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
1867 10 : oh.get().convert(preferences);
1868 10 : if (includeLastModified)
1869 7 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
1870 10 : return preferences;
1871 202 : } catch (const std::exception& e) {
1872 96 : }
1873 96 : return {};
1874 : }
1875 :
1876 : std::vector<uint8_t>
1877 0 : Conversation::vCard() const
1878 : {
1879 : try {
1880 0 : return fileutils::loadFile(pimpl_->repoPath_ / "profile.vcf");
1881 0 : } catch (...) {
1882 0 : }
1883 0 : return {};
1884 : }
1885 :
1886 : std::shared_ptr<TransferManager>
1887 2100 : Conversation::dataTransfer() const
1888 : {
1889 2100 : return pimpl_->transferManager_;
1890 : }
1891 :
1892 : bool
1893 13 : Conversation::onFileChannelRequest(const std::string& member,
1894 : const std::string& fileId,
1895 : std::filesystem::path& path,
1896 : std::string& sha3sum) const
1897 : {
1898 13 : if (!isMember(member))
1899 0 : return false;
1900 :
1901 13 : auto sep = fileId.find('_');
1902 13 : if (sep == std::string::npos)
1903 0 : return false;
1904 :
1905 13 : auto interactionId = fileId.substr(0, sep);
1906 13 : auto commit = getCommit(interactionId);
1907 26 : if (commit == std::nullopt || commit->find("type") == commit->end() || commit->find("tid") == commit->end()
1908 26 : || commit->find("sha3sum") == commit->end() || commit->at("type") != "application/data-transfer+json") {
1909 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
1910 : pimpl_->accountId_,
1911 : member,
1912 : interactionId);
1913 0 : return false;
1914 : }
1915 :
1916 13 : path = dataTransfer()->path(fileId);
1917 13 : sha3sum = commit->at("sha3sum");
1918 :
1919 13 : return true;
1920 13 : }
1921 :
1922 : bool
1923 14 : Conversation::downloadFile(const std::string& interactionId,
1924 : const std::string& fileId,
1925 : const std::string& path,
1926 : const std::string&,
1927 : const std::string& deviceId)
1928 : {
1929 14 : auto commit = getCommit(interactionId);
1930 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
1931 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
1932 0 : return false;
1933 : }
1934 14 : auto tid = commit->find("tid");
1935 14 : auto sha3sum = commit->find("sha3sum");
1936 14 : auto size_str = commit->find("totalSize");
1937 :
1938 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
1939 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
1940 0 : return false;
1941 : }
1942 :
1943 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
1944 14 : if (totalSize < 0) {
1945 0 : JAMI_ERROR("Invalid file size {}", totalSize);
1946 0 : return false;
1947 : }
1948 :
1949 : // Be sure to not lock conversation
1950 28 : dht::ThreadPool().io().run(
1951 14 : [w = weak(), deviceId, fileId, interactionId, sha3sum = sha3sum->second, path, totalSize] {
1952 14 : if (auto shared = w.lock()) {
1953 14 : std::filesystem::path filePath(path);
1954 14 : if (filePath.empty()) {
1955 0 : filePath = shared->dataTransfer()->path(fileId);
1956 : }
1957 :
1958 14 : if (fileutils::size(filePath) == totalSize) {
1959 1 : if (fileutils::sha3File(filePath) == sha3sum) {
1960 4 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
1961 1 : return;
1962 : }
1963 : }
1964 :
1965 13 : std::filesystem::path tempFilePath(filePath);
1966 13 : tempFilePath += ".tmp";
1967 13 : auto start = fileutils::size(tempFilePath);
1968 13 : if (start < 0)
1969 11 : start = 0;
1970 13 : size_t end = 0;
1971 :
1972 13 : auto acc = shared->pimpl_->account_.lock();
1973 13 : if (!acc)
1974 0 : return;
1975 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
1976 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
1977 28 : }
1978 : });
1979 14 : return true;
1980 14 : }
1981 :
1982 : void
1983 1114 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
1984 : {
1985 1114 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
1986 1114 : auto sthis = w.lock();
1987 1114 : if (!sthis)
1988 0 : return;
1989 : // Update fetched for Uri
1990 1114 : auto uri = sthis->uriFromDevice(deviceId);
1991 1114 : if (uri.empty() || uri == sthis->pimpl_->userId_)
1992 45 : return;
1993 : // When a user fetches a commit, the message is sent for this person
1994 1069 : sthis->pimpl_->updateStatus(uri,
1995 : libjami::Account::MessageStates::SENT,
1996 1069 : commitId,
1997 2138 : std::to_string(std::time(nullptr)),
1998 : true);
1999 1159 : });
2000 1114 : }
2001 :
2002 : void
2003 1114 : Conversation::Impl::updateStatus(const std::string& uri,
2004 : libjami::Account::MessageStates st,
2005 : const std::string& commitId,
2006 : const std::string& ts,
2007 : bool emit)
2008 : {
2009 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer
2010 : // send us a status and will emit to other connected devices.
2011 1114 : LogOptions options;
2012 1114 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2013 : {
2014 : // Update internal structures.
2015 1114 : std::lock_guard lk(messageStatusMtx_);
2016 1114 : auto& status = messagesStatus_[uri];
2017 1114 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2018 1114 : if (oldStatus == commitId)
2019 1 : return; // Nothing to do
2020 1113 : options.to = oldStatus;
2021 1113 : options.from = commitId;
2022 1113 : oldStatus = commitId;
2023 1113 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2024 1113 : saveStatus();
2025 1113 : if (emit)
2026 1084 : newStatus[uri].insert(status.begin(), status.end());
2027 1114 : }
2028 1113 : if (emit && messageStatusCb_) {
2029 1084 : messageStatusCb_(newStatus);
2030 : }
2031 : // Update messages status for all commit between the old and new one
2032 1113 : options.logIfNotFound = false;
2033 1113 : options.fastLog = true;
2034 1113 : History optHistory;
2035 1113 : std::unique_lock lk(optHistory.mutex); // Avoid to announce messages while updating status.
2036 1113 : auto res = loadMessages(options, &optHistory);
2037 1113 : std::unique_lock mlk(messageStatusMtx_);
2038 1113 : std::vector<std::pair<std::string, int32_t>> statusToUpdate;
2039 1113 : if (res.size() == 0) {
2040 : // In this case, commit is not received yet, so we cache it
2041 4 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2042 : }
2043 7115 : for (const auto& [cid, _] : optHistory.quickAccess) {
2044 6002 : auto message = loadedHistory_.quickAccess.find(cid);
2045 6002 : if (message != loadedHistory_.quickAccess.end()) {
2046 : // Update message and emit to client,
2047 2868 : if (static_cast<int32_t>(st) > message->second->status[uri]) {
2048 2865 : message->second->status[uri] = static_cast<int32_t>(st);
2049 2865 : statusToUpdate.emplace_back(cid, static_cast<int32_t>(st));
2050 : }
2051 : } else {
2052 : // In this case, commit is not loaded by client, so we cache it
2053 : // No need to emit to client, they will get a correct status on load.
2054 3134 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2055 : }
2056 : }
2057 1113 : mlk.unlock();
2058 1113 : lk.unlock();
2059 3978 : for (const auto& [cid, status] : statusToUpdate)
2060 5729 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
2061 2865 : repository_->id(),
2062 : uri,
2063 : cid,
2064 : static_cast<int>(status));
2065 1115 : }
2066 :
2067 : bool
2068 18 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2069 : {
2070 18 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2071 18 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2072 2 : return false; // Nothing to do
2073 16 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2074 16 : auto sthis = w.lock();
2075 16 : if (!sthis)
2076 0 : return;
2077 16 : sthis->pimpl_->updateStatus(uri,
2078 : libjami::Account::MessageStates::DISPLAYED,
2079 16 : interactionId,
2080 32 : std::to_string(std::time(nullptr)),
2081 : true);
2082 16 : });
2083 16 : return true;
2084 18 : }
2085 :
2086 : std::map<std::string, std::map<std::string, std::string>>
2087 88 : Conversation::messageStatus() const
2088 : {
2089 88 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2090 176 : return pimpl_->messagesStatus_;
2091 88 : }
2092 :
2093 : void
2094 44 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2095 : {
2096 44 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2097 44 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2098 94 : for (const auto& [uri, status] : messageStatus) {
2099 50 : auto& oldMs = pimpl_->messagesStatus_[uri];
2100 50 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2101 25 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2102 0 : stVec.emplace_back(libjami::Account::MessageStates::SENT,
2103 : uri,
2104 25 : status.at("fetched"),
2105 50 : status.at("fetched_ts"));
2106 : }
2107 : }
2108 50 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2109 4 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2110 0 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED,
2111 : uri,
2112 4 : status.at("read"),
2113 8 : status.at("read_ts"));
2114 : }
2115 : }
2116 : }
2117 44 : lk.unlock();
2118 :
2119 73 : for (const auto& [status, uri, commitId, ts] : stVec) {
2120 29 : pimpl_->updateStatus(uri, status, commitId, ts);
2121 : }
2122 44 : }
2123 :
2124 : void
2125 389 : Conversation::onMessageStatusChanged(
2126 : const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2127 : {
2128 389 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2129 389 : pimpl_->messageStatusCb_ = cb;
2130 389 : }
2131 :
2132 : #ifdef LIBJAMI_TEST
2133 : void
2134 513 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2135 : {
2136 513 : pimpl_->bootstrapCbTest_ = cb;
2137 513 : }
2138 :
2139 : std::vector<libjami::SwarmMessage>
2140 0 : Conversation::loadMessagesSync(const LogOptions& options)
2141 : {
2142 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
2143 0 : auto result = pimpl_->loadMessages(options);
2144 0 : return result;
2145 0 : }
2146 :
2147 : void
2148 0 : Conversation::announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf)
2149 : {
2150 0 : pimpl_->announce(commits, commitFromSelf);
2151 0 : }
2152 :
2153 : void
2154 0 : Conversation::announce(const std::string& commitId, bool commitFromSelf)
2155 : {
2156 0 : pimpl_->announce(commitId, commitFromSelf);
2157 0 : }
2158 : #endif
2159 :
2160 : void
2161 614 : Conversation::checkBootstrapMember(const asio::error_code& ec, std::vector<std::map<std::string, std::string>> members)
2162 : {
2163 614 : if (ec == asio::error::operation_aborted)
2164 322 : return;
2165 590 : auto acc = pimpl_->account_.lock();
2166 590 : if (pimpl_->swarmManager_->isConnected() or not acc)
2167 16 : return;
2168 : // We bootstrap the DRT with devices who already wrote in the repository.
2169 : // However, in a conversation, a large number of devices may just watch
2170 : // the conversation, but never write any message.
2171 574 : std::unique_lock lock(pimpl_->membersMtx_);
2172 :
2173 574 : std::string uri;
2174 862 : while (!members.empty()) {
2175 295 : auto member = std::move(members.back());
2176 295 : members.pop_back();
2177 295 : uri = std::move(member.at("uri"));
2178 295 : if (uri != pimpl_->userId_ && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2179 7 : break;
2180 295 : }
2181 282 : auto fallbackFailed = [](auto sthis) {
2182 1128 : JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.", sthis->pimpl_->toString());
2183 : #ifdef LIBJAMI_TEST
2184 282 : if (sthis->pimpl_->bootstrapCbTest_)
2185 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2186 : #endif
2187 282 : };
2188 : // If members is empty, we finished the fallback un-successfully
2189 574 : if (members.empty() && uri.empty()) {
2190 282 : lock.unlock();
2191 282 : fallbackFailed(this);
2192 282 : return;
2193 : }
2194 :
2195 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2196 292 : pimpl_->checkedMembers_.emplace(uri);
2197 292 : auto devices = std::make_shared<std::vector<NodeId>>();
2198 1168 : acc->forEachDevice(
2199 584 : dht::InfoHash(uri),
2200 293 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2201 : // Test if already sent
2202 293 : if (auto sthis = w.lock()) {
2203 291 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2204 280 : devices->emplace_back(dev->getLongId());
2205 293 : }
2206 293 : },
2207 292 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed = std::move(fallbackFailed)](bool ok) {
2208 292 : auto sthis = w.lock();
2209 292 : if (!sthis)
2210 4 : return;
2211 288 : auto checkNext = true;
2212 288 : if (ok && devices->size() != 0) {
2213 : #ifdef LIBJAMI_TEST
2214 278 : if (sthis->pimpl_->bootstrapCbTest_)
2215 7 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2216 : #endif
2217 1112 : JAMI_LOG("{} Bootstrap: Fallback with member: {}", sthis->pimpl_->toString(), uri);
2218 278 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2219 0 : checkNext = false;
2220 : }
2221 288 : if (checkNext) {
2222 : // Check next member
2223 288 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2224 576 : sthis->pimpl_->fallbackTimer_->async_wait(
2225 576 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2226 : } else {
2227 : // In this case, all members are checked. Fallback failed
2228 0 : fallbackFailed(sthis);
2229 : }
2230 292 : });
2231 1154 : }
2232 :
2233 : void
2234 512 : Conversation::bootstrap(std::function<void()> onBootstrapped, const std::vector<DeviceId>& knownDevices)
2235 : {
2236 512 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2237 512 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2238 0 : return;
2239 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2240 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2241 : // If it works, the callback onConnectionChanged will be called with ok=true
2242 512 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2243 512 : std::vector<DeviceId> devices = knownDevices;
2244 1530 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2245 1018 : if (!isBanned(member))
2246 1018 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2247 512 : }
2248 2048 : JAMI_DEBUG("{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2249 : // set callback
2250 327 : auto fallback = [](std::shared_ptr<Conversation> sthis, bool now = false) {
2251 : // Fallback
2252 327 : auto acc = sthis->pimpl_->account_.lock();
2253 327 : if (!acc)
2254 0 : return;
2255 327 : auto members = sthis->getMembers(false, false);
2256 327 : std::shuffle(members.begin(), members.end(), acc->rand);
2257 327 : if (now) {
2258 293 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2259 : } else {
2260 34 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2261 102 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2262 68 : - std::chrono::seconds(timeForBootstrap));
2263 136 : JAMI_DEBUG("{} Fallback in {} seconds", sthis->pimpl_->toString(), (20 - timeForBootstrap));
2264 : }
2265 654 : sthis->pimpl_->fallbackTimer_->async_wait(
2266 654 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2267 327 : };
2268 :
2269 512 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2270 : // This will call methods from accounts, so trigger on another thread.
2271 405 : dht::ThreadPool::io().run([w, ok, fallback = std::move(fallback)] {
2272 405 : auto sthis = w.lock();
2273 405 : if (!sthis)
2274 0 : return;
2275 405 : if (ok) {
2276 : // Bootstrap succeeded!
2277 : {
2278 371 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2279 371 : sthis->pimpl_->checkedMembers_.clear();
2280 371 : }
2281 371 : if (sthis->pimpl_->bootstrapCb_)
2282 371 : sthis->pimpl_->bootstrapCb_();
2283 : #ifdef LIBJAMI_TEST
2284 371 : if (sthis->pimpl_->bootstrapCbTest_)
2285 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2286 : #endif
2287 371 : return;
2288 : }
2289 34 : fallback(sthis);
2290 405 : });
2291 405 : });
2292 : {
2293 512 : std::lock_guard lock(pimpl_->membersMtx_);
2294 512 : pimpl_->checkedMembers_.clear();
2295 512 : }
2296 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic
2297 : // connectivity change
2298 512 : if (pimpl_->swarmManager_->isShutdown()) {
2299 23 : pimpl_->swarmManager_->restart();
2300 23 : pimpl_->swarmManager_->maintainBuckets();
2301 489 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2302 293 : fallback(shared_from_this(), true);
2303 : }
2304 512 : }
2305 :
2306 : std::vector<std::string>
2307 18 : Conversation::commitsEndedCalls()
2308 : {
2309 18 : pimpl_->loadActiveCalls();
2310 18 : pimpl_->loadHostedCalls();
2311 18 : auto commits = pimpl_->commitsEndedCalls();
2312 18 : if (!commits.empty()) {
2313 : // Announce to client
2314 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2315 0 : if (auto sthis = w.lock())
2316 0 : sthis->pimpl_->announce(commits, true);
2317 0 : });
2318 : }
2319 18 : return commits;
2320 0 : }
2321 :
2322 : void
2323 389 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2324 : {
2325 389 : pimpl_->onMembersChanged_ = std::move(cb);
2326 389 : pimpl_->repository_->onMembersChanged([w = weak()](const std::set<std::string>& memberUris) {
2327 1082 : if (auto sthis = w.lock())
2328 1082 : sthis->pimpl_->onMembersChanged_(memberUris);
2329 1082 : });
2330 389 : }
2331 :
2332 : void
2333 389 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2334 : {
2335 778 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket), w = weak()](const std::string& deviceId,
2336 : ChannelCb&& cb) {
2337 679 : if (auto sthis = w.lock())
2338 678 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2339 1457 : };
2340 389 : }
2341 :
2342 : void
2343 667 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2344 : {
2345 667 : auto deviceId = channel->deviceId();
2346 : // Transmit avatar if necessary
2347 : // We do this here, because at this point we know both sides are connected and in
2348 : // the same conversation
2349 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2350 667 : auto cert = channel->peerCertificate();
2351 667 : if (!cert || !cert->issuer)
2352 0 : return;
2353 667 : auto member = cert->issuer->getId().toString();
2354 667 : pimpl_->swarmManager_->addChannel(std::move(channel));
2355 667 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2356 667 : auto sthis = w.lock();
2357 667 : if (auto account = a.lock()) {
2358 666 : account->sendProfile(sthis->id(), member, deviceId.toString());
2359 667 : }
2360 667 : });
2361 667 : }
2362 :
2363 : uint32_t
2364 4 : Conversation::countInteractions(const std::string& toId, const std::string& fromId, const std::string& authorUri) const
2365 : {
2366 4 : LogOptions options;
2367 4 : options.to = toId;
2368 4 : options.from = fromId;
2369 4 : options.authorUri = authorUri;
2370 4 : options.logIfNotFound = false;
2371 4 : options.fastLog = true;
2372 4 : History history;
2373 4 : std::lock_guard lk(history.mutex);
2374 4 : auto res = pimpl_->loadMessages(options, &history);
2375 8 : return res.size();
2376 4 : }
2377 :
2378 : void
2379 4 : Conversation::search(uint32_t req, const Filter& filter, const std::shared_ptr<std::atomic_int>& flag) const
2380 : {
2381 : // Because logging a conversation can take quite some time,
2382 : // do it asynchronously
2383 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2384 4 : if (auto sthis = w.lock()) {
2385 4 : History history;
2386 4 : std::vector<std::map<std::string, std::string>> commits {};
2387 : // std::regex_constants::ECMAScript is the default flag.
2388 4 : auto re = std::regex(filter.regexSearch,
2389 4 : filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2390 4 : sthis->pimpl_->repository_->log(
2391 20 : [&](const std::string& /*id*/, const GitAuthor& author, const GitCommit& commit) {
2392 20 : if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
2393 : // Filter author
2394 0 : return CallbackResult::Skip;
2395 : }
2396 20 : auto commitTime = git_commit_time(commit.get());
2397 20 : if (filter.before && filter.before < commitTime) {
2398 : // Only get commits before this date
2399 0 : return CallbackResult::Skip;
2400 : }
2401 20 : if (filter.after && filter.after > commitTime) {
2402 : // Only get commits before this date
2403 0 : if (git_commit_parentcount(commit.get()) <= 1)
2404 0 : return CallbackResult::Break;
2405 : else
2406 0 : return CallbackResult::Skip; // Because we are sorting it with
2407 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2408 : }
2409 :
2410 20 : return CallbackResult::Ok; // Continue
2411 : },
2412 20 : [&](ConversationCommit&& cc) {
2413 20 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2414 40 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2415 20 : },
2416 20 : [&](const std::string& id, const GitAuthor&, ConversationCommit&) {
2417 20 : if (id == filter.lastId)
2418 0 : return true;
2419 20 : return false;
2420 : },
2421 : "",
2422 : false);
2423 : // Search on generated history
2424 24 : for (auto& message : history.messageList) {
2425 20 : auto contentType = message->type;
2426 20 : auto isSearchable = contentType == "text/plain" || contentType == "application/data-transfer+json";
2427 20 : if (filter.type.empty() && !isSearchable) {
2428 : // Not searchable, at least for now
2429 8 : continue;
2430 12 : } else if (contentType == filter.type || filter.type.empty()) {
2431 12 : if (isSearchable) {
2432 : // If it's a text match the body, else the display name
2433 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2434 36 : : message->body.at("displayName");
2435 12 : std::smatch body_match;
2436 12 : if (std::regex_search(body, body_match, re)) {
2437 5 : auto commit = message->body;
2438 5 : commit["id"] = message->id;
2439 5 : commit["type"] = message->type;
2440 5 : commits.emplace_back(commit);
2441 5 : }
2442 12 : } else {
2443 : // Matching type, just add it to the results
2444 0 : commits.emplace_back(message->body);
2445 : }
2446 :
2447 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2448 0 : break;
2449 : }
2450 20 : }
2451 :
2452 4 : if (commits.size() > 0)
2453 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2454 3 : sthis->pimpl_->accountId_,
2455 6 : sthis->id(),
2456 3 : std::move(commits));
2457 : // If we're the latest thread, inform client that the search is finished
2458 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2459 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2460 8 : req, sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2461 : }
2462 8 : }
2463 4 : });
2464 4 : }
2465 :
2466 : void
2467 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2468 : {
2469 14 : if (!message.isMember("confId")) {
2470 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2471 0 : return;
2472 : }
2473 :
2474 14 : auto now = std::chrono::system_clock::now();
2475 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2476 : {
2477 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2478 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2479 14 : pimpl_->saveHostedCalls();
2480 14 : }
2481 :
2482 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2483 : }
2484 :
2485 : bool
2486 20 : Conversation::isHosting(const std::string& confId) const
2487 : {
2488 20 : auto info = infos();
2489 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2490 0 : return true; // We are the current device Host
2491 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2492 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2493 20 : }
2494 :
2495 : void
2496 10 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2497 : {
2498 10 : if (!message.isMember("confId")) {
2499 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2500 0 : return;
2501 : }
2502 :
2503 10 : auto erased = false;
2504 : {
2505 10 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2506 10 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2507 10 : }
2508 10 : if (erased) {
2509 10 : pimpl_->saveHostedCalls();
2510 10 : sendMessage(std::move(message), "", {}, std::move(cb));
2511 : } else
2512 0 : cb(false, "");
2513 : }
2514 :
2515 : std::vector<std::map<std::string, std::string>>
2516 38 : Conversation::currentCalls() const
2517 : {
2518 38 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2519 76 : return pimpl_->activeCalls_;
2520 38 : }
2521 : } // namespace jami
|