Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/jami_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #include "conversationrepository.h"
25 : #ifdef ENABLE_PLUGIN
26 : #include "manager.h"
27 : #include "plugin/jamipluginmanager.h"
28 : #include "plugin/streamdata.h"
29 : #endif
30 : #include "jami/conversation_interface.h"
31 : #include "jami/configurationmanager_interface.h"
32 :
33 : #include "fileutils.h"
34 : #include "json_utils.h"
35 : #include "logger.h"
36 :
37 : #include <opendht/thread_pool.h>
38 : #include <opendht/infohash.h>
39 : #include <fmt/compile.h>
40 : #include <asio/error_code.hpp>
41 :
42 : #include <algorithm>
43 : #include <memory>
44 : #include <charconv>
45 : #include <string_view>
46 : #include <tuple>
47 : #include <optional>
48 : #include <utility>
49 : #include <deque>
50 : #include <chrono>
51 : #include <ctime>
52 : #include <functional>
53 : #include <atomic>
54 : #include <regex>
55 :
56 : namespace jami {
57 :
58 : static const char* const LAST_MODIFIED = "lastModified";
59 :
60 13 : ConvInfo::ConvInfo(const Json::Value& json)
61 : {
62 13 : id = json[ConversationMapKeys::ID].asString();
63 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
64 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
65 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
66 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
67 20 : members.emplace(v["uri"].asString());
68 : }
69 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
70 13 : }
71 :
72 : Json::Value
73 25 : ConvInfo::toJson() const
74 : {
75 25 : Json::Value json;
76 25 : json[ConversationMapKeys::ID] = id;
77 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
78 25 : if (removed) {
79 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
80 : }
81 25 : if (erased) {
82 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
83 : }
84 64 : for (const auto& m : members) {
85 39 : Json::Value member;
86 39 : member["uri"] = m;
87 39 : json[ConversationMapKeys::MEMBERS].append(member);
88 39 : }
89 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
90 25 : return json;
91 0 : }
92 :
93 : // ConversationRequest
94 261 : ConversationRequest::ConversationRequest(const Json::Value& json)
95 : {
96 261 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
97 261 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
98 261 : from = json[ConversationMapKeys::FROM].asString();
99 261 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
100 261 : auto& md = json[ConversationMapKeys::METADATAS];
101 525 : for (const auto& member : md.getMemberNames()) {
102 264 : metadatas.emplace(member, md[member].asString());
103 261 : }
104 261 : }
105 :
106 : Json::Value
107 2 : ConversationRequest::toJson() const
108 : {
109 2 : Json::Value json;
110 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
111 2 : json[ConversationMapKeys::FROM] = from;
112 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
113 2 : if (declined)
114 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
115 4 : for (const auto& [key, value] : metadatas) {
116 2 : json[ConversationMapKeys::METADATAS][key] = value;
117 : }
118 2 : return json;
119 0 : }
120 :
121 : std::map<std::string, std::string>
122 220 : ConversationRequest::toMap() const
123 : {
124 220 : auto result = metadatas;
125 220 : result[ConversationMapKeys::ID] = conversationId;
126 220 : result[ConversationMapKeys::FROM] = from;
127 220 : if (declined)
128 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
129 220 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
130 220 : return result;
131 0 : }
132 :
133 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
134 :
135 : struct History
136 : {
137 : // While loading the history, we need to avoid:
138 : // - reloading history (can just be ignored)
139 : // - adding new commits (should wait for history to be loaded)
140 : std::mutex mutex {};
141 : std::condition_variable cv {};
142 : bool loading {false};
143 : MessageList messageList {};
144 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
145 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
146 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
147 : };
148 :
149 : class Conversation::Impl
150 : {
151 : private:
152 395 : Impl(std::unique_ptr<ConversationRepository>&& repository,
153 : const std::shared_ptr<JamiAccount>& account,
154 : std::vector<ConversationCommit>&& commits = {})
155 395 : : repository_(std::move(repository ? repository : throw std::logic_error("Invalid repository")))
156 390 : , account_(account)
157 390 : , accountId_(account->getAccountID())
158 390 : , userId_(account->getUsername())
159 390 : , deviceId_(account->currentDeviceId())
160 390 : , swarmManager_(std::make_shared<SwarmManager>(NodeId(deviceId_),
161 780 : Manager::instance().getSeededRandomEngine(),
162 390 : [account = account_](const DeviceId& deviceId) {
163 2629 : if (auto acc = account.lock()) {
164 2629 : return acc->isConnectedWith(deviceId);
165 2629 : }
166 0 : return false;
167 : }))
168 390 : , transferManager_(std::make_shared<TransferManager>(accountId_,
169 : "",
170 390 : repository_->id(),
171 390 : Manager::instance().getSeededRandomEngine()))
172 390 : , repoPath_(fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id())
173 390 : , conversationDataPath_(fileutils::get_data_dir() / accountId_ / "conversation_data" / repository_->id())
174 390 : , fetchedPath_(conversationDataPath_ / ConversationDirectories::FETCHED)
175 390 : , sendingPath_(conversationDataPath_ / ConversationDirectories::SENDING)
176 390 : , preferencesPath_(conversationDataPath_ / ConversationDirectories::PREFERENCES)
177 390 : , statusPath_(conversationDataPath_ / ConversationDirectories::STATUS)
178 390 : , hostedCallsPath_(conversationDataPath_ / ConversationDirectories::HOSTED_CALLS)
179 390 : , activeCallsPath_(conversationDataPath_ / ConversationDirectories::ACTIVE_CALLS)
180 390 : , ioContext_(Manager::instance().ioContext())
181 390 : , fallbackTimer_(std::make_unique<asio::steady_timer>(*ioContext_))
182 2345 : , typers_(std::make_shared<Typers>(account, repository_->id()))
183 : {
184 390 : if (!commits.empty())
185 186 : initActiveCalls(repository_->convCommitsToMap(commits));
186 390 : loadStatus();
187 410 : }
188 :
189 191 : Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&& repoAndCommits,
190 : const std::shared_ptr<JamiAccount>& account)
191 191 : : Impl(std::move(repoAndCommits.first), account, std::move(repoAndCommits.second))
192 186 : {}
193 :
194 : public:
195 186 : Impl(const std::shared_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "")
196 186 : : Impl(ConversationRepository::createConversation(account, mode, otherMember), account)
197 186 : {}
198 :
199 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
200 18 : : Impl(std::make_unique<ConversationRepository>(account, conversationId), account)
201 18 : {}
202 :
203 192 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& remoteDevice, const std::string& conversationId)
204 192 : : Impl(ConversationRepository::cloneConversation(account, remoteDevice, conversationId), account)
205 186 : {}
206 :
207 3155 : std::string toString() const
208 : {
209 12618 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
210 : }
211 : mutable std::string fmtStr_;
212 :
213 389 : ~Impl()
214 5835 : {
215 : try {
216 389 : if (fallbackTimer_)
217 389 : fallbackTimer_->cancel();
218 0 : } catch (const std::exception& e) {
219 0 : JAMI_ERROR("{:s} {:s}", toString(), e.what());
220 0 : }
221 389 : }
222 :
223 : /**
224 : * If, for whatever reason, the daemon is stopped while hosting a conference,
225 : * we need to announce the end of this call when restarting.
226 : * To avoid to keep active calls forever.
227 : */
228 : std::vector<std::string> commitsEndedCalls();
229 : bool isAdmin() const;
230 :
231 284 : void announce(const std::string& commitId, bool commitFromSelf = false)
232 : {
233 284 : std::vector<std::string> vec;
234 284 : if (!commitId.empty())
235 282 : vec.emplace_back(commitId);
236 284 : announce(vec, commitFromSelf);
237 284 : }
238 :
239 297 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
240 : {
241 297 : std::vector<ConversationCommit> convcommits;
242 297 : convcommits.reserve(commits.size());
243 605 : for (const auto& cid : commits) {
244 308 : if (auto commit = repository_->getCommit(cid)) {
245 308 : convcommits.emplace_back(*commit);
246 308 : }
247 : }
248 297 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
249 297 : }
250 :
251 : /**
252 : * Initialize activeCalls_ from the list of commits in the repository
253 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
254 : */
255 186 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
256 : {
257 186 : std::unordered_set<std::string> invalidHostUris;
258 186 : std::unordered_set<std::string> invalidCallIds;
259 :
260 186 : std::lock_guard lk(activeCallsMtx_);
261 1164 : for (const auto& commit : commits) {
262 978 : if (commit.at("type") == "member") {
263 : // Each commit of type "member" has an "action" field whose value can be one
264 : // of the following: "add", "join", "remove", "ban", "unban"
265 : // In the case of "remove" and "ban", we need to add the member's URI to
266 : // invalidHostUris to ensure that any call they may have started in the past
267 : // is no longer considered active.
268 : // For the other actions, there's no harm in adding the member's URI anyway,
269 : // since it's not possible to start hosting a call before joining the swarm (or
270 : // before getting unbanned in the case of previously banned members).
271 771 : invalidHostUris.emplace(commit.at("uri"));
272 415 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
273 415 : && commit.find("device") != commit.end()) {
274 : // The commit indicates either the end or the beginning of a call
275 : // (depending on whether there is a "duration" field or not).
276 1 : auto convId = repository_->id();
277 2 : auto confId = commit.at("confId");
278 2 : auto uri = commit.at("uri");
279 2 : auto device = commit.at("device");
280 :
281 3 : if (commit.find("duration") == commit.end() && invalidCallIds.find(confId) == invalidCallIds.end()
282 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
283 1 : std::map<std::string, std::string> activeCall;
284 1 : activeCall["id"] = confId;
285 1 : activeCall["uri"] = uri;
286 1 : activeCall["device"] = device;
287 1 : activeCalls_.emplace_back(activeCall);
288 1 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
289 : convId,
290 : confId,
291 : device,
292 : uri);
293 1 : }
294 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
295 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
296 : // there are sometimes multiple commits indicating the beginning of the same call.)
297 1 : invalidCallIds.emplace(confId);
298 1 : }
299 : }
300 186 : saveActiveCalls();
301 186 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_, repository_->id(), activeCalls_);
302 186 : }
303 :
304 : /**
305 : * Update activeCalls_ via announced commits (in load or via new commits)
306 : * @param commit Commit to check
307 : * @param eraseOnly If we want to ignore added commits
308 : * @param emitSig If we want to emit to client
309 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
310 : */
311 77 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
312 : bool eraseOnly = false,
313 : bool emitSig = true) const
314 : {
315 77 : if (!repository_)
316 0 : return;
317 77 : if (commit.at("type") == "member") {
318 : // In this case, we need to check if we are not removing a hosting member or device
319 18 : std::lock_guard lk(activeCallsMtx_);
320 18 : auto it = activeCalls_.begin();
321 18 : auto updateActives = false;
322 19 : while (it != activeCalls_.end()) {
323 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
324 4 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left", it->at("id"), commit.at("uri"));
325 1 : it = activeCalls_.erase(it);
326 1 : updateActives = true;
327 : } else {
328 0 : ++it;
329 : }
330 : }
331 18 : if (updateActives) {
332 1 : saveActiveCalls();
333 1 : if (emitSig)
334 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
335 1 : repository_->id(),
336 1 : activeCalls_);
337 : }
338 18 : return;
339 18 : }
340 : // Else, it's a call information
341 174 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
342 174 : && commit.find("device") != commit.end()) {
343 56 : auto convId = repository_->id();
344 112 : auto confId = commit.at("confId");
345 112 : auto uri = commit.at("uri");
346 112 : auto device = commit.at("device");
347 56 : std::lock_guard lk(activeCallsMtx_);
348 56 : auto itActive = std::find_if(activeCalls_.begin(), activeCalls_.end(), [&](const auto& value) {
349 25 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
350 : });
351 56 : if (commit.find("duration") == commit.end()) {
352 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
353 124 : JAMI_DEBUG("swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
354 : convId,
355 : confId,
356 : device,
357 : uri);
358 155 : activeCalls_.emplace_back(std::map<std::string, std::string> {
359 : {"id", confId},
360 : {"uri", uri},
361 : {"device", device},
362 124 : });
363 31 : saveActiveCalls();
364 31 : if (emitSig)
365 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
366 31 : repository_->id(),
367 31 : activeCalls_);
368 : }
369 : } else {
370 25 : if (itActive != activeCalls_.end()) {
371 25 : itActive = activeCalls_.erase(itActive);
372 : // Unlikely, but we must ensure that no duplicate exists
373 25 : while (itActive != activeCalls_.end()) {
374 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
375 0 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
376 : });
377 0 : if (itActive != activeCalls_.end()) {
378 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
379 0 : itActive = activeCalls_.erase(itActive);
380 : }
381 : }
382 :
383 25 : if (eraseOnly) {
384 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
385 : "{:s}, account {:s}",
386 : convId,
387 : confId,
388 : device,
389 : uri);
390 : } else {
391 100 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
392 : convId,
393 : confId,
394 : device,
395 : uri);
396 : }
397 : }
398 25 : saveActiveCalls();
399 25 : if (emitSig)
400 25 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
401 25 : repository_->id(),
402 25 : activeCalls_);
403 : }
404 56 : }
405 : }
406 :
407 1206 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
408 : {
409 1206 : if (!repository_)
410 0 : return;
411 1206 : auto convId = repository_->id();
412 1206 : auto ok = !commits.empty();
413 3616 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
414 1206 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
415 1206 : if (ok) {
416 1204 : bool announceMember = false;
417 2453 : for (const auto& c : commits) {
418 : // Announce member events
419 1249 : if (c.at("type") == "member") {
420 921 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
421 921 : const auto& uri = c.at("uri");
422 921 : const auto& actionStr = c.at("action");
423 921 : auto action = -1;
424 921 : if (actionStr == "add")
425 442 : action = 0;
426 479 : else if (actionStr == "join")
427 460 : action = 1;
428 19 : else if (actionStr == "remove")
429 1 : action = 2;
430 18 : else if (actionStr == "ban")
431 17 : action = 3;
432 1 : else if (actionStr == "unban")
433 1 : action = 4;
434 921 : if (actionStr == "ban" || actionStr == "remove") {
435 : // In this case, a potential host was removed during a call.
436 18 : updateActiveCalls(c);
437 18 : typers_->removeTyper(uri);
438 : }
439 921 : if (action != -1) {
440 921 : announceMember = true;
441 921 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(accountId_,
442 : convId,
443 : uri,
444 : action);
445 : }
446 : }
447 328 : } else if (c.at("type") == "application/call-history+json") {
448 59 : updateActiveCalls(c);
449 : }
450 : #ifdef ENABLE_PLUGIN
451 1249 : auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager();
452 1249 : if (pluginChatManager.hasHandlers()) {
453 0 : auto cm = std::make_shared<JamiMessage>(accountId_, convId, c.at("author") != userId_, c, false);
454 0 : cm->isSwarm = true;
455 0 : pluginChatManager.publishMessage(std::move(cm));
456 0 : }
457 : #endif
458 : }
459 :
460 1204 : if (announceMember && onMembersChanged_) {
461 920 : onMembersChanged_(repository_->memberUris("", {}));
462 : }
463 : }
464 1206 : }
465 :
466 390 : void loadStatus()
467 : {
468 : try {
469 : // read file
470 778 : auto file = fileutils::loadFile(statusPath_);
471 : // load values
472 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
473 2 : std::lock_guard lk {messageStatusMtx_};
474 2 : oh.get().convert(messagesStatus_);
475 390 : } catch (const std::exception& e) {
476 388 : }
477 390 : }
478 1115 : void saveStatus()
479 : {
480 1115 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
481 1115 : msgpack::pack(file, messagesStatus_);
482 1115 : }
483 :
484 18 : void loadActiveCalls() const
485 : {
486 : try {
487 : // read file
488 29 : auto file = fileutils::loadFile(activeCallsPath_);
489 : // load values
490 7 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
491 7 : std::lock_guard lk {activeCallsMtx_};
492 7 : oh.get().convert(activeCalls_);
493 18 : } catch (const std::exception& e) {
494 11 : return;
495 11 : }
496 : }
497 :
498 261 : void saveActiveCalls() const
499 : {
500 261 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
501 261 : msgpack::pack(file, activeCalls_);
502 261 : }
503 :
504 18 : void loadHostedCalls() const
505 : {
506 : try {
507 : // read file
508 30 : auto file = fileutils::loadFile(hostedCallsPath_);
509 : // load values
510 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
511 6 : std::lock_guard lk {activeCallsMtx_};
512 6 : oh.get().convert(hostedCalls_);
513 18 : } catch (const std::exception& e) {
514 12 : return;
515 12 : }
516 : }
517 :
518 42 : void saveHostedCalls() const
519 : {
520 42 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
521 42 : msgpack::pack(file, hostedCalls_);
522 42 : }
523 :
524 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
525 :
526 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
527 : bool includeLeft,
528 : bool includeBanned) const;
529 :
530 6786 : std::string_view bannedType(const std::string& uri) const
531 : {
532 6786 : auto crt = fmt::format("{}.crt", uri);
533 13570 : auto bannedMember = repoPath_ / MemberPath::BANNED / MemberPath::MEMBERS / crt;
534 6785 : if (std::filesystem::is_regular_file(bannedMember))
535 17 : return "members"sv;
536 13541 : auto bannedAdmin = repoPath_ / MemberPath::BANNED / MemberPath::ADMINS / crt;
537 6770 : if (std::filesystem::is_regular_file(bannedAdmin))
538 0 : return "admins"sv;
539 13534 : auto bannedInvited = repoPath_ / MemberPath::BANNED / MemberPath::INVITED / uri;
540 6767 : if (std::filesystem::is_regular_file(bannedInvited))
541 0 : return "invited"sv;
542 13536 : auto bannedDevice = repoPath_ / MemberPath::BANNED / MemberPath::DEVICES / crt;
543 6768 : if (std::filesystem::is_regular_file(bannedDevice))
544 0 : return "devices"sv;
545 6770 : return {};
546 6786 : }
547 :
548 4910 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
549 : {
550 4910 : auto deviceSockets = gitSocketList_.find(deviceId);
551 9820 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
552 : }
553 :
554 2237 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
555 : {
556 2237 : gitSocketList_[deviceId] = socket;
557 2237 : }
558 825 : void removeGitSocket(const DeviceId& deviceId)
559 : {
560 825 : auto deviceSockets = gitSocketList_.find(deviceId);
561 825 : if (deviceSockets != gitSocketList_.end())
562 427 : gitSocketList_.erase(deviceSockets);
563 825 : }
564 :
565 : /**
566 : * Remove all git sockets and all DRT nodes associated with the given peer.
567 : * This is used when a swarm member is banned to ensure that we stop syncing
568 : * with them or sending them message notifications.
569 : */
570 : void disconnectFromPeer(const std::string& peerUri);
571 :
572 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, bool includeLeft) const;
573 :
574 : std::mutex membersMtx_ {};
575 : std::set<std::string> checkedMembers_; // Store members we tried
576 : std::function<void()> bootstrapCb_;
577 : std::mutex bootstrapMtx_ {};
578 : #ifdef LIBJAMI_TEST
579 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
580 : #endif
581 :
582 : std::mutex writeMtx_ {};
583 : const std::unique_ptr<ConversationRepository> repository_;
584 : const std::weak_ptr<JamiAccount> account_;
585 : const std::string accountId_;
586 : const std::string userId_;
587 : const std::string deviceId_;
588 : const std::shared_ptr<SwarmManager> swarmManager_;
589 : std::atomic_bool isRemoving_ {false};
590 : std::vector<libjami::SwarmMessage> loadMessages(const LogOptions& options, History* optHistory = nullptr);
591 : void pull(const std::string& deviceId);
592 :
593 : // Avoid multiple fetch/merges at the same time.
594 : std::mutex pullcbsMtx_ {};
595 : // store current remote in fetch
596 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {};
597 : const std::shared_ptr<TransferManager> transferManager_ {};
598 : const std::filesystem::path repoPath_ {};
599 : const std::filesystem::path conversationDataPath_ {};
600 : const std::filesystem::path fetchedPath_ {};
601 :
602 : // Manage last message displayed and status
603 : const std::filesystem::path sendingPath_ {};
604 : const std::filesystem::path preferencesPath_ {};
605 : const std::filesystem::path statusPath_ {};
606 :
607 : OnMembersChanged onMembersChanged_ {};
608 :
609 : // Manage hosted calls on this device
610 : std::filesystem::path hostedCallsPath_ {};
611 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
612 : // Manage active calls for this conversation (can be hosted by other devices)
613 : std::filesystem::path activeCallsPath_ {};
614 : mutable std::mutex activeCallsMtx_ {};
615 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
616 :
617 : GitSocketList gitSocketList_ {};
618 :
619 : // Bootstrap
620 : const std::shared_ptr<asio::io_context> ioContext_;
621 : const std::unique_ptr<asio::steady_timer> fallbackTimer_;
622 :
623 : /**
624 : * Loaded history represents the linearized history to show for clients
625 : */
626 : History loadedHistory_ {};
627 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
628 : History& history,
629 : const std::vector<std::map<std::string, std::string>>& commits,
630 : bool messageReceived = false,
631 : bool commitFromSelf = false);
632 :
633 : void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
634 : void handleEdition(History& history,
635 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
636 : bool messageReceived) const;
637 : bool handleMessage(History& history,
638 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
639 : bool messageReceived) const;
640 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const;
641 : /**
642 : * {uri, {
643 : * {"fetch", "commitId"},
644 : * {"fetched_ts", "timestamp"},
645 : * {"read", "commitId"},
646 : * {"read_ts", "timestamp"}
647 : * }
648 : * }
649 : */
650 : mutable std::mutex messageStatusMtx_;
651 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
652 : std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
653 : /**
654 : * Status: 0 = commited, 1 = fetched, 2 = read
655 : * This cache the curent status to add in the messages
656 : */
657 : // Note: only store int32_t cause it's easy to pass to dbus this way
658 : // memberToStatus serves as a cache for loading messages
659 : std::map<std::string, int32_t> memberToStatus;
660 :
661 : // futureStatus is used to store the status for receiving messages
662 : // (because we're not sure to fetch the commit before receiving a status change for this)
663 : std::map<std::string, std::map<std::string, int32_t>> futureStatus;
664 : // Update internal structures regarding status
665 : void updateStatus(const std::string& uri,
666 : libjami::Account::MessageStates status,
667 : const std::string& commitId,
668 : const std::string& ts,
669 : bool emit = false);
670 :
671 : std::shared_ptr<Typers> typers_;
672 : };
673 :
674 : bool
675 17 : Conversation::Impl::isAdmin() const
676 : {
677 17 : auto adminsPath = repoPath_ / MemberPath::ADMINS;
678 34 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
679 17 : }
680 :
681 : void
682 17 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
683 : {
684 : // Remove nodes from swarmManager
685 17 : const auto nodes = swarmManager_->getAllNodes();
686 17 : std::vector<NodeId> toRemove;
687 40 : for (const auto node : nodes)
688 23 : if (peerUri == repository_->uriFromDevice(node.toString()))
689 12 : toRemove.emplace_back(node);
690 17 : swarmManager_->deleteNode(toRemove);
691 :
692 : // Remove git sockets with this member
693 39 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
694 22 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
695 12 : it = gitSocketList_.erase(it);
696 : else
697 10 : ++it;
698 : }
699 17 : }
700 :
701 : std::vector<std::map<std::string, std::string>>
702 422 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
703 : {
704 422 : std::vector<std::map<std::string, std::string>> result;
705 422 : auto members = repository_->members();
706 422 : std::lock_guard lk(messageStatusMtx_);
707 1153 : for (const auto& member : members) {
708 731 : if (member.role == MemberRole::BANNED && !includeBanned) {
709 171 : continue;
710 : }
711 730 : if (member.role == MemberRole::INVITED && !includeInvited)
712 170 : continue;
713 560 : if (member.role == MemberRole::LEFT && !includeLeft)
714 0 : continue;
715 560 : auto mm = member.map();
716 560 : auto it = messagesStatus_.find(member.uri);
717 560 : if (it != messagesStatus_.end()) {
718 191 : auto readIt = it->second.find("read");
719 191 : if (readIt != it->second.end())
720 128 : mm[ConversationMapKeys::LAST_DISPLAYED] = readIt->second;
721 : }
722 560 : result.emplace_back(std::move(mm));
723 560 : }
724 844 : return result;
725 422 : }
726 :
727 : std::vector<std::string>
728 18 : Conversation::Impl::commitsEndedCalls()
729 : {
730 : // Handle current calls
731 18 : std::vector<std::string> commits {};
732 18 : std::unique_lock lk(writeMtx_);
733 18 : std::unique_lock lkA(activeCallsMtx_);
734 18 : for (const auto& hostedCall : hostedCalls_) {
735 : // In this case, this means that we left
736 : // the conference while still hosting it, so activeCalls
737 : // will not be correctly updated
738 : // We don't need to send notifications there, as peers will sync with presence
739 0 : Json::Value value;
740 0 : value["uri"] = userId_;
741 0 : value["device"] = deviceId_;
742 0 : value["confId"] = hostedCall.first;
743 0 : value["type"] = "application/call-history+json";
744 0 : auto now = std::chrono::system_clock::now();
745 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
746 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
747 0 : auto itActive = std::find_if(activeCalls_.begin(),
748 : activeCalls_.end(),
749 0 : [this, confId = hostedCall.first](const auto& value) {
750 0 : return value.at("id") == confId && value.at("uri") == userId_
751 0 : && value.at("device") == deviceId_;
752 : });
753 0 : if (itActive != activeCalls_.end())
754 0 : activeCalls_.erase(itActive);
755 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
756 :
757 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
758 0 : }
759 18 : hostedCalls_.clear();
760 18 : saveActiveCalls();
761 18 : saveHostedCalls();
762 36 : return commits;
763 18 : }
764 :
765 : std::vector<libjami::SwarmMessage>
766 1554 : Conversation::Impl::loadMessages(const LogOptions& options, History* optHistory)
767 : {
768 1554 : auto history = optHistory ? optHistory : &loadedHistory_;
769 :
770 : // history->mutex is locked by the caller
771 1554 : if (!repository_ || history->loading) {
772 0 : return {};
773 : }
774 1555 : history->loading = true;
775 :
776 : // By convention, if options.nbOfCommits is zero, then we
777 : // don't impose a limit on the number of commits returned.
778 1555 : bool limitNbOfCommits = options.nbOfCommits > 0;
779 :
780 1555 : auto startLogging = options.from == "";
781 1554 : auto breakLogging = false;
782 1554 : auto currentHistorySize = loadedHistory_.messageList.size();
783 1555 : std::vector<std::string> replies;
784 1555 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
785 3110 : repository_->log(
786 : /* preCondition */
787 7194 : [&](const auto& id, const auto& author, const auto& commit) {
788 7194 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
789 2 : return CallbackResult::Skip;
790 : }
791 7193 : if (id == options.to) {
792 724 : if (options.includeTo)
793 0 : breakLogging = true; // For the next commit
794 : }
795 7188 : if (replies.empty()) { // This avoid load until
796 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
797 : // until this commit
798 14382 : if ((limitNbOfCommits
799 7191 : && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
800 6 : return CallbackResult::Break; // Stop logging
801 7185 : if (breakLogging)
802 0 : return CallbackResult::Break; // Stop logging
803 7185 : if (id == options.to && !options.includeTo) {
804 723 : return CallbackResult::Break; // Stop logging
805 : }
806 : }
807 :
808 6466 : if (!startLogging && options.from != "" && options.from == id)
809 1108 : startLogging = true;
810 6465 : if (!startLogging)
811 24 : return CallbackResult::Skip; // Start logging after this one
812 :
813 6441 : if (options.fastLog) {
814 5698 : if (options.authorUri != "") {
815 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
816 1 : return CallbackResult::Break; // Found author, stop
817 : }
818 : }
819 : }
820 :
821 6438 : return CallbackResult::Ok; // Continue
822 : },
823 : /* emplaceCb */
824 6438 : [&](auto&& cc) {
825 6438 : if (limitNbOfCommits && (msgList.size() == options.nbOfCommits))
826 303 : return;
827 6136 : auto optMessage = repository_->convCommitToMap(cc);
828 6140 : if (!optMessage.has_value())
829 1 : return;
830 6138 : auto message = optMessage.value();
831 6137 : if (message.find("reply-to") != message.end()) {
832 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
833 1 : if (it == replies.end()) {
834 1 : replies.emplace_back(message.at("reply-to"));
835 : }
836 : }
837 6138 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
838 6138 : if (it != replies.end()) {
839 1 : replies.erase(it);
840 : }
841 6140 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
842 6140 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
843 0 : firstMsg = *loadedHistory_.messageList.rbegin();
844 : }
845 18416 : auto added = addToHistory(*history, {message}, false, false);
846 6140 : if (!added.empty() && firstMsg) {
847 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg);
848 : }
849 6140 : msgList.insert(msgList.end(), added.begin(), added.end());
850 6142 : },
851 : /* postCondition */
852 6433 : [&](auto, auto, auto) {
853 : // Stop logging if there was a limit set on the number of commits
854 : // to return and we reached it. This isn't strictly necessary since
855 : // the check at the beginning of `emplaceCb` ensures that we won't
856 : // return too many messages, but it prevents us from needlessly
857 : // iterating over a (potentially) large number of commits.
858 6433 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
859 : },
860 1555 : options.from,
861 1555 : options.logIfNotFound);
862 :
863 1555 : history->loading = false;
864 1555 : history->cv.notify_all();
865 :
866 : // Convert for client (remove ptr)
867 1555 : std::vector<libjami::SwarmMessage> ret;
868 1555 : ret.reserve(msgList.size());
869 7680 : for (const auto& msg : msgList) {
870 6126 : ret.emplace_back(*msg);
871 : }
872 1555 : return ret;
873 1555 : }
874 :
875 : void
876 3 : Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
877 : {
878 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
879 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
880 3 : if (peditIt != history.pendingEditions.end()) {
881 0 : auto oldBody = sharedCommit->body;
882 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
883 0 : if (sharedCommit->body.at("body").empty())
884 0 : return;
885 0 : history.pendingEditions.erase(peditIt);
886 0 : }
887 3 : if (it != history.quickAccess.end()) {
888 3 : it->second->reactions.emplace_back(sharedCommit->body);
889 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
890 3 : repository_->id(),
891 3 : it->second->id,
892 3 : sharedCommit->body);
893 : } else {
894 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
895 : }
896 : }
897 :
898 : void
899 9 : Conversation::Impl::handleEdition(History& history,
900 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
901 : bool messageReceived) const
902 : {
903 18 : auto editId = sharedCommit->body.at("edit");
904 9 : auto it = history.quickAccess.find(editId);
905 9 : if (it != history.quickAccess.end()) {
906 7 : auto baseCommit = it->second;
907 7 : if (baseCommit) {
908 7 : auto itReact = baseCommit->body.find("react-to");
909 7 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ? "tid" : "body";
910 7 : auto body = sharedCommit->body.at(toReplace);
911 : // Edit reaction
912 7 : if (itReact != baseCommit->body.end()) {
913 1 : baseCommit->body[toReplace] = body; // Replace body if pending
914 1 : it = history.quickAccess.find(itReact->second);
915 1 : auto itPending = history.pendingReactions.find(itReact->second);
916 1 : if (it != history.quickAccess.end()) {
917 1 : baseCommit = it->second; // Base commit
918 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
919 1 : baseCommit->reactions.end(),
920 1 : [&](const auto& reaction) {
921 1 : return reaction.at("id") == editId;
922 : });
923 1 : if (itPreviousReact != baseCommit->reactions.end()) {
924 1 : (*itPreviousReact)[toReplace] = body;
925 1 : if (body.empty()) {
926 1 : baseCommit->reactions.erase(itPreviousReact);
927 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
928 1 : repository_->id(),
929 1 : baseCommit->id,
930 : editId);
931 : }
932 : }
933 0 : } else if (itPending != history.pendingReactions.end()) {
934 : // Else edit if pending
935 0 : auto itReaction = std::find_if(itPending->second.begin(),
936 0 : itPending->second.end(),
937 0 : [&](const auto& reaction) { return reaction.at("id") == editId; });
938 0 : if (itReaction != itPending->second.end()) {
939 0 : (*itReaction)[toReplace] = body;
940 0 : if (body.empty())
941 0 : itPending->second.erase(itReaction);
942 : }
943 : } else {
944 : // Add to pending edtions
945 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
946 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
947 : }
948 : } else {
949 : // Normal message
950 6 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
951 6 : it->second->body[toReplace] = sharedCommit->body[toReplace];
952 6 : if (toReplace == "tid") {
953 : // Avoid to replace fileId in client
954 2 : it->second->body["fileId"] = "";
955 : }
956 : // Remove reactions
957 6 : if (sharedCommit->body.at(toReplace).empty())
958 3 : it->second->reactions.clear();
959 6 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
960 : }
961 7 : }
962 7 : } else {
963 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
964 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
965 : }
966 9 : }
967 :
968 : bool
969 7360 : Conversation::Impl::handleMessage(History& history,
970 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
971 : bool messageReceived) const
972 : {
973 7360 : if (messageReceived) {
974 : // For a received message, we place it at the beginning of the list
975 1210 : if (!history.messageList.empty())
976 964 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
977 1211 : history.messageList.emplace_front(sharedCommit);
978 : } else {
979 : // For a loaded message, we load from newest to oldest
980 : // So we change the parent of the last message.
981 6150 : if (!history.messageList.empty())
982 4604 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
983 6147 : history.messageList.emplace_back(sharedCommit);
984 : }
985 : // Handle pending reactions/editions
986 7358 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
987 7357 : if (reactIt != history.pendingReactions.end()) {
988 0 : for (const auto& commitBody : reactIt->second)
989 0 : sharedCommit->reactions.emplace_back(commitBody);
990 0 : history.pendingReactions.erase(reactIt);
991 : }
992 7357 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
993 7357 : if (peditIt != history.pendingEditions.end()) {
994 0 : auto oldBody = sharedCommit->body;
995 0 : if (sharedCommit->type == "application/data-transfer+json") {
996 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
997 0 : sharedCommit->body["fileId"] = "";
998 : } else {
999 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1000 : }
1001 0 : peditIt->second.pop_front();
1002 0 : for (const auto& commit : peditIt->second) {
1003 0 : sharedCommit->editions.emplace_back(commit->body);
1004 : }
1005 0 : sharedCommit->editions.emplace_back(oldBody);
1006 0 : history.pendingEditions.erase(peditIt);
1007 0 : }
1008 : // Announce to client
1009 7360 : if (messageReceived)
1010 1211 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
1011 7358 : return !messageReceived;
1012 : }
1013 :
1014 : void
1015 7370 : Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const
1016 : {
1017 7370 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1018 7369 : auto currentMessage = message;
1019 :
1020 19372 : while (parentIt != history.quickAccess.end()) {
1021 12005 : const auto& parent = parentIt->second;
1022 12522 : for (const auto& [peer, value] : message->status) {
1023 11427 : auto parentStatusIt = parent->status.find(peer);
1024 11422 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1025 520 : parent->status[peer] = value;
1026 1037 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
1027 520 : repository_->id(),
1028 : peer,
1029 520 : parent->id,
1030 : value);
1031 10908 : } else if (parentStatusIt->second >= value) {
1032 10902 : break;
1033 : }
1034 : }
1035 11999 : currentMessage = parent;
1036 12006 : parentIt = history.quickAccess.find(parent->linearizedParent);
1037 : }
1038 7368 : }
1039 :
1040 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1041 7367 : Conversation::Impl::addToHistory(History& history,
1042 : const std::vector<std::map<std::string, std::string>>& commits,
1043 : bool messageReceived,
1044 : bool commitFromSelf)
1045 : {
1046 : //
1047 : // NOTE: This function makes the following assumptions on its arguments:
1048 : // - The messages in "history" are in reverse chronological order (newest message
1049 : // first, oldest message last).
1050 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1051 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1052 : // They are therefore inserted at the beginning of the message list.
1053 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1054 : // reverse chronological order (newest to oldest) and to be older than the ones in
1055 : // "history". They are therefore appended at the end of the message list.
1056 : //
1057 7367 : auto acc = account_.lock();
1058 7366 : if (!acc)
1059 0 : return {};
1060 7365 : auto username = acc->getUsername();
1061 7360 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1062 0 : std::unique_lock lk(history.mutex);
1063 0 : history.cv.wait(lk, [&] { return !history.loading; });
1064 0 : }
1065 :
1066 : // Only set messages' status on history for client
1067 7360 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1068 :
1069 7360 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1070 14766 : for (const auto& commit : commits) {
1071 14810 : auto commitId = commit.at("id");
1072 7406 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1073 1 : continue; // Already present
1074 7407 : auto typeIt = commit.find("type");
1075 : // Nothing to show for the client, skip
1076 7402 : if (typeIt != commit.end() && typeIt->second == "merge")
1077 36 : continue;
1078 :
1079 7366 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1080 7370 : sharedCommit->fromMapStringString(commit);
1081 :
1082 7373 : if (needToSetMessageStatus) {
1083 921 : std::lock_guard lk(messageStatusMtx_);
1084 : // Check if we already have status information for the commit.
1085 921 : auto itFuture = futureStatus.find(sharedCommit->id);
1086 921 : if (itFuture != futureStatus.end()) {
1087 11 : sharedCommit->status = std::move(itFuture->second);
1088 11 : futureStatus.erase(itFuture);
1089 : }
1090 921 : }
1091 7373 : sharedCommits.emplace_back(sharedCommit);
1092 7408 : }
1093 :
1094 7364 : if (needToSetMessageStatus) {
1095 917 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1096 917 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1097 917 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1098 :
1099 917 : std::lock_guard lk(messageStatusMtx_);
1100 13499 : for (const auto& member : repository_->members()) {
1101 : // For each member, we iterate over the commits to add in reverse chronological
1102 : // order (i.e. from newest to oldest) and set their status from the point of view
1103 : // of that member (as best we can given the information we have).
1104 : //
1105 : // The key assumption made in order to compute the status is that it can never decrease
1106 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1107 : // therefore start by setting the "status" variable below to the lowest possible value,
1108 : // and increase it when we encounter a commit for which it is justified to do so.
1109 : //
1110 : // If messageReceived is true, then the commits we are adding are the most recent in the
1111 : // conversation history, so the lowest possible value is SENDING.
1112 : //
1113 : // If messageReceived is false, then the commits we are adding are older than the ones
1114 : // that are already in the history, so the lowest possible value is the status of the
1115 : // oldest message in the history so far, which is stored in memberToStatus.
1116 12583 : auto status = SENDING;
1117 12583 : if (!messageReceived) {
1118 12 : auto cache = memberToStatus[member.uri];
1119 12 : if (cache > status)
1120 9 : status = cache;
1121 : }
1122 12583 : auto& messagesStatus = messagesStatus_[member.uri];
1123 :
1124 25173 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1125 12597 : auto sharedCommit = *it;
1126 12598 : auto previousStatus = status;
1127 12598 : auto& commitStatus = sharedCommit->status[member.uri];
1128 :
1129 : // Compute status for the current commit.
1130 12604 : if (status < SENT && messagesStatus["fetched"] == sharedCommit->id) {
1131 9 : status = SENT;
1132 : }
1133 12583 : if (messagesStatus["read"] == sharedCommit->id) {
1134 1 : status = DISPLAYED;
1135 : }
1136 12589 : if (member.uri == sharedCommit->body.at("author")) {
1137 920 : status = DISPLAYED;
1138 : }
1139 12584 : if (status < commitStatus) {
1140 1 : status = commitStatus;
1141 : }
1142 :
1143 : // Store computed value.
1144 12584 : commitStatus = status;
1145 :
1146 : // Update messagesStatus_ if needed.
1147 12584 : if (previousStatus == SENDING && status >= SENT) {
1148 917 : messagesStatus["fetched"] = sharedCommit->id;
1149 : }
1150 12584 : if (previousStatus <= SENT && status == DISPLAYED) {
1151 908 : messagesStatus["read"] = sharedCommit->id;
1152 : }
1153 12583 : }
1154 :
1155 12582 : if (!messageReceived) {
1156 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1157 12 : memberToStatus[member.uri] = status;
1158 : }
1159 915 : }
1160 917 : }
1161 :
1162 7364 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1163 14735 : for (const auto& sharedCommit : sharedCommits) {
1164 7369 : history.quickAccess[sharedCommit->id] = sharedCommit;
1165 :
1166 7370 : auto reactToIt = sharedCommit->body.find("react-to");
1167 7369 : auto editIt = sharedCommit->body.find("edit");
1168 7367 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1169 3 : handleReaction(history, sharedCommit);
1170 7369 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1171 9 : handleEdition(history, sharedCommit, messageReceived);
1172 7359 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1173 6147 : messages.emplace_back(sharedCommit);
1174 : }
1175 7370 : rectifyStatus(sharedCommit, history);
1176 : }
1177 :
1178 7366 : return messages;
1179 7363 : }
1180 :
1181 186 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1182 : ConversationMode mode,
1183 186 : const std::string& otherMember)
1184 186 : : pimpl_ {new Impl {account, mode, otherMember}}
1185 186 : {}
1186 :
1187 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
1188 18 : : pimpl_ {new Impl {account, conversationId}}
1189 18 : {}
1190 :
1191 192 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1192 : const std::string& remoteDevice,
1193 192 : const std::string& conversationId)
1194 192 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1195 192 : {}
1196 :
1197 389 : Conversation::~Conversation() {}
1198 :
1199 : std::string
1200 4931 : Conversation::id() const
1201 : {
1202 4931 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1203 : }
1204 :
1205 : void
1206 138 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1207 : {
1208 : try {
1209 138 : if (mode() == ConversationMode::ONE_TO_ONE) {
1210 : // Only authorize to add left members
1211 1 : auto initialMembers = getInitialMembers();
1212 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1213 1 : if (it == initialMembers.end()) {
1214 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1215 1 : cb(false, "");
1216 1 : return;
1217 : }
1218 1 : }
1219 0 : } catch (const std::exception& e) {
1220 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1221 0 : cb(false, "");
1222 0 : return;
1223 0 : }
1224 137 : if (isMember(contactUri, true)) {
1225 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1226 0 : cb(false, "");
1227 0 : return;
1228 : }
1229 137 : if (isBanned(contactUri)) {
1230 2 : if (pimpl_->isAdmin()) {
1231 1 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1232 1 : if (auto sthis = w.lock()) {
1233 1 : auto members = sthis->pimpl_->repository_->members();
1234 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1235 1 : if (type.empty()) {
1236 0 : cb(false, {});
1237 0 : return;
1238 : }
1239 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1240 2 : }
1241 : });
1242 : } else {
1243 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1244 1 : cb(false, "");
1245 : }
1246 2 : return;
1247 : }
1248 :
1249 135 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1250 135 : if (auto sthis = w.lock()) {
1251 : // Add member files and commit
1252 135 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1253 135 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1254 135 : if (not commit.empty())
1255 135 : sthis->pimpl_->announce(commit, true);
1256 135 : lk.unlock();
1257 135 : if (cb)
1258 135 : cb(!commit.empty(), commit);
1259 270 : }
1260 135 : });
1261 : }
1262 :
1263 : std::shared_ptr<dhtnet::ChannelSocket>
1264 4910 : Conversation::gitSocket(const DeviceId& deviceId) const
1265 : {
1266 4910 : return pimpl_->gitSocket(deviceId);
1267 : }
1268 :
1269 : void
1270 2236 : Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1271 : {
1272 2236 : pimpl_->addGitSocket(deviceId, socket);
1273 2237 : }
1274 :
1275 : void
1276 825 : Conversation::removeGitSocket(const DeviceId& deviceId)
1277 : {
1278 825 : pimpl_->removeGitSocket(deviceId);
1279 825 : }
1280 :
1281 : void
1282 381 : Conversation::shutdownConnections()
1283 : {
1284 381 : pimpl_->fallbackTimer_->cancel();
1285 381 : pimpl_->gitSocketList_.clear();
1286 381 : if (pimpl_->swarmManager_)
1287 381 : pimpl_->swarmManager_->shutdown();
1288 381 : std::lock_guard lk(pimpl_->membersMtx_);
1289 381 : pimpl_->checkedMembers_.clear();
1290 381 : }
1291 :
1292 : void
1293 0 : Conversation::connectivityChanged()
1294 : {
1295 0 : if (pimpl_->swarmManager_)
1296 0 : pimpl_->swarmManager_->maintainBuckets();
1297 0 : }
1298 :
1299 : std::vector<jami::DeviceId>
1300 0 : Conversation::getDeviceIdList() const
1301 : {
1302 0 : return pimpl_->swarmManager_->getAllNodes();
1303 : }
1304 :
1305 : std::shared_ptr<Typers>
1306 9 : Conversation::typers() const
1307 : {
1308 9 : return pimpl_->typers_;
1309 : }
1310 :
1311 : bool
1312 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1313 : {
1314 0 : if (!pimpl_->swarmManager_)
1315 0 : return false;
1316 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1317 : }
1318 :
1319 : void
1320 1 : Conversation::Impl::voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb)
1321 : {
1322 : // Check if admin
1323 1 : if (!isAdmin()) {
1324 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1325 0 : cb(false, {});
1326 0 : return;
1327 : }
1328 :
1329 : // Vote for removal
1330 1 : std::unique_lock lk(writeMtx_);
1331 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1332 1 : if (voteCommit.empty()) {
1333 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1334 0 : cb(false, "");
1335 0 : return;
1336 : }
1337 :
1338 1 : auto lastId = voteCommit;
1339 1 : std::vector<std::string> commits;
1340 1 : commits.emplace_back(voteCommit);
1341 :
1342 : // If admin, check vote
1343 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1344 1 : if (!resolveCommit.empty()) {
1345 1 : commits.emplace_back(resolveCommit);
1346 1 : lastId = resolveCommit;
1347 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1348 : }
1349 1 : announce(commits, true);
1350 1 : lk.unlock();
1351 1 : if (cb)
1352 1 : cb(!lastId.empty(), lastId);
1353 1 : }
1354 :
1355 : void
1356 14 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1357 : {
1358 42 : dht::ThreadPool::io().run(
1359 28 : [w = weak(), contactUri = std::move(contactUri), isDevice = std::move(isDevice), cb = std::move(cb)] {
1360 14 : if (auto sthis = w.lock()) {
1361 : // Check if admin
1362 14 : if (!sthis->pimpl_->isAdmin()) {
1363 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1364 1 : cb(false, {});
1365 2 : return;
1366 : }
1367 :
1368 : // Get current user type
1369 13 : std::string type;
1370 13 : if (isDevice) {
1371 0 : type = "devices";
1372 : } else {
1373 13 : auto members = sthis->pimpl_->repository_->members();
1374 30 : for (const auto& member : members) {
1375 30 : if (member.uri == contactUri) {
1376 13 : if (member.role == MemberRole::INVITED) {
1377 2 : type = "invited";
1378 11 : } else if (member.role == MemberRole::ADMIN) {
1379 1 : type = "admins";
1380 10 : } else if (member.role == MemberRole::MEMBER) {
1381 10 : type = "members";
1382 : }
1383 13 : break;
1384 : }
1385 : }
1386 13 : if (type.empty()) {
1387 0 : cb(false, {});
1388 0 : return;
1389 : }
1390 13 : }
1391 :
1392 : // Vote for removal
1393 13 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1394 13 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1395 13 : if (voteCommit.empty()) {
1396 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1397 1 : cb(false, "");
1398 1 : return;
1399 : }
1400 :
1401 12 : auto lastId = voteCommit;
1402 12 : std::vector<std::string> commits;
1403 12 : commits.emplace_back(voteCommit);
1404 :
1405 : // If admin, check vote
1406 24 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1407 12 : if (!resolveCommit.empty()) {
1408 12 : commits.emplace_back(resolveCommit);
1409 12 : lastId = resolveCommit;
1410 12 : JAMI_WARN("Vote solved for %s. %s banned", contactUri.c_str(), isDevice ? "Device" : "Member");
1411 12 : sthis->pimpl_->disconnectFromPeer(contactUri);
1412 : }
1413 :
1414 12 : sthis->pimpl_->announce(commits, true);
1415 12 : lk.unlock();
1416 12 : cb(!lastId.empty(), lastId);
1417 29 : }
1418 : });
1419 14 : }
1420 :
1421 : std::vector<std::map<std::string, std::string>>
1422 422 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1423 : {
1424 422 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1425 : }
1426 :
1427 : std::set<std::string>
1428 2061 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1429 : {
1430 2061 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1431 : }
1432 :
1433 : std::vector<NodeId>
1434 1784 : Conversation::peersToSyncWith() const
1435 : {
1436 1784 : auto s = pimpl_->swarmManager_->getConnectedNodes();
1437 13195 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1438 11410 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1439 5188 : s.emplace_back(deviceId);
1440 1784 : return s;
1441 0 : }
1442 :
1443 : bool
1444 1784 : Conversation::isBootstrapped() const
1445 : {
1446 1784 : return pimpl_->swarmManager_->isConnected();
1447 : }
1448 :
1449 : std::string
1450 13403 : Conversation::uriFromDevice(const std::string& deviceId) const
1451 : {
1452 13403 : return pimpl_->repository_->uriFromDevice(deviceId);
1453 : }
1454 :
1455 : void
1456 0 : Conversation::monitor()
1457 : {
1458 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1459 0 : }
1460 :
1461 : std::string
1462 186 : Conversation::join()
1463 : {
1464 186 : return pimpl_->repository_->join();
1465 : }
1466 :
1467 : bool
1468 3560 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1469 : {
1470 3560 : auto uriCrt = uri + ".crt"sv;
1471 7116 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::ADMINS / uriCrt)
1472 7118 : || std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::MEMBERS / uriCrt)) {
1473 2468 : return true;
1474 : }
1475 1091 : if (includeInvited) {
1476 901 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::INVITED / uri)) {
1477 624 : return true;
1478 : }
1479 277 : if (mode() == ConversationMode::ONE_TO_ONE) {
1480 3 : for (const auto& member : getInitialMembers()) {
1481 2 : if (member == uri)
1482 0 : return true;
1483 1 : }
1484 : }
1485 : }
1486 467 : return false;
1487 3559 : }
1488 :
1489 : bool
1490 6784 : Conversation::isBanned(const std::string& uri) const
1491 : {
1492 6784 : return !pimpl_->bannedType(uri).empty();
1493 : }
1494 :
1495 : void
1496 141 : Conversation::sendMessage(Json::Value&& value, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1497 : {
1498 141 : if (!replyTo.empty()) {
1499 2 : if (!pimpl_->repository_->hasCommit(replyTo)) {
1500 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1501 1 : return;
1502 : }
1503 1 : value["reply-to"] = replyTo;
1504 : }
1505 560 : dht::ThreadPool::io().run(
1506 420 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1507 139 : if (auto sthis = w.lock()) {
1508 140 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1509 140 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1510 140 : lk.unlock();
1511 140 : if (onCommit)
1512 14 : onCommit(commit);
1513 140 : sthis->pimpl_->announce(commit, true);
1514 140 : if (cb)
1515 140 : cb(!commit.empty(), commit);
1516 279 : }
1517 140 : });
1518 : }
1519 :
1520 : bool
1521 12195 : Conversation::hasCommit(const std::string& commitId) const
1522 : {
1523 12195 : return pimpl_->repository_->hasCommit(commitId);
1524 : }
1525 :
1526 : std::optional<std::map<std::string, std::string>>
1527 36 : Conversation::getCommit(const std::string& commitId) const
1528 : {
1529 36 : auto commit = pimpl_->repository_->getCommit(commitId);
1530 36 : if (commit == std::nullopt)
1531 1 : return std::nullopt;
1532 35 : return pimpl_->repository_->convCommitToMap(*commit);
1533 36 : }
1534 :
1535 : void
1536 2 : Conversation::loadMessages(const OnLoadMessages& cb, const LogOptions& options)
1537 : {
1538 2 : if (!cb)
1539 0 : return;
1540 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1541 2 : if (auto sthis = w.lock()) {
1542 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1543 2 : auto result = sthis->pimpl_->loadMessages(options);
1544 2 : lk.unlock();
1545 2 : cb(std::move(result));
1546 4 : }
1547 2 : });
1548 : }
1549 :
1550 : void
1551 0 : Conversation::clearCache()
1552 : {
1553 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1554 0 : pimpl_->loadedHistory_.messageList.clear();
1555 0 : pimpl_->loadedHistory_.quickAccess.clear();
1556 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1557 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1558 : {
1559 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1560 0 : pimpl_->memberToStatus.clear();
1561 0 : }
1562 0 : }
1563 :
1564 : std::string
1565 2412 : Conversation::lastCommitId() const
1566 : {
1567 : {
1568 2412 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1569 2412 : if (!pimpl_->loadedHistory_.messageList.empty())
1570 3955 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1571 2411 : }
1572 433 : LogOptions options;
1573 434 : options.nbOfCommits = 1;
1574 434 : options.skipMerge = true;
1575 434 : History optHistory;
1576 434 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1577 433 : auto res = pimpl_->loadMessages(options, &optHistory);
1578 434 : if (res.empty())
1579 2 : return {};
1580 864 : return (*optHistory.messageList.begin())->id;
1581 434 : }
1582 :
1583 : bool
1584 2050 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1585 : {
1586 2050 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1587 6151 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId,
1588 4102 : std::deque<std::pair<std::string, OnPullCb>>());
1589 2051 : auto& pullcbs = it->second;
1590 2051 : auto itPull = std::find_if(pullcbs.begin(), pullcbs.end(), [&](const auto& elem) {
1591 1 : return std::get<0>(elem) == commitId;
1592 4102 : });
1593 2051 : if (itPull != pullcbs.end()) {
1594 4 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1595 : pimpl_->toString(),
1596 : deviceId,
1597 : commitId);
1598 1 : cb(false);
1599 1 : return false;
1600 : }
1601 8197 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1602 2050 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1603 2050 : if (notInProgress)
1604 2028 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1605 2027 : if (auto sthis_ = w.lock())
1606 2026 : sthis_->pimpl_->pull(deviceId);
1607 2028 : });
1608 2050 : return true;
1609 2051 : }
1610 :
1611 : void
1612 2028 : Conversation::Impl::pull(const std::string& deviceId)
1613 : {
1614 2028 : auto& repo = repository_;
1615 :
1616 2028 : std::string commitId;
1617 2028 : OnPullCb cb;
1618 : while (true) {
1619 : {
1620 4078 : std::lock_guard lk(pullcbsMtx_);
1621 4078 : auto it = fetchingRemotes_.find(deviceId);
1622 4076 : if (it == fetchingRemotes_.end()) {
1623 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1624 0 : break;
1625 : }
1626 4078 : auto& pullcbs = it->second;
1627 4078 : if (pullcbs.empty()) {
1628 2028 : fetchingRemotes_.erase(it);
1629 2028 : break;
1630 : }
1631 2050 : auto& elem = pullcbs.front();
1632 2049 : commitId = std::move(std::get<0>(elem));
1633 2049 : cb = std::move(std::get<1>(elem));
1634 2050 : pullcbs.pop_front();
1635 4077 : }
1636 : // If recently fetched, the commit can already be there, so no need to do complex operations
1637 2050 : if (commitId != "" && repo->hasCommit(commitId)) {
1638 98 : cb(true);
1639 131 : continue;
1640 : }
1641 : // Pull from remote
1642 1952 : auto fetched = repo->fetch(deviceId);
1643 1952 : if (!fetched) {
1644 33 : cb(false);
1645 33 : continue;
1646 : }
1647 :
1648 1919 : auto oldHead = repo->getHead();
1649 :
1650 1919 : std::unique_lock lk(writeMtx_);
1651 : auto commits = repo->mergeHistory(deviceId,
1652 1924 : [this](const std::string& peerUri) { this->disconnectFromPeer(peerUri); });
1653 1919 : if (!commits.empty()) {
1654 909 : announce(commits);
1655 : }
1656 1919 : lk.unlock();
1657 :
1658 1919 : bool commitFound = false;
1659 1919 : if (commitId != "") {
1660 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1661 : // We need to check if we actually got it; the fact that the fetch above was
1662 : // successful doesn't guarantee that we did.
1663 1546 : for (const auto& commit : commits) {
1664 916 : if (commit.at("id") == commitId) {
1665 901 : commitFound = true;
1666 901 : break;
1667 : }
1668 : }
1669 : } else {
1670 388 : commitFound = true;
1671 : }
1672 1919 : if (!commitFound)
1673 2520 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1674 : deviceId,
1675 : commitId);
1676 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1677 : // for commit `commitId` to other members of the swarm. It's important that we only
1678 : // send these notifications if we actually have the commit. Otherwise, we can end up
1679 : // in a situation where the members of the swarm keep sending notifications to each
1680 : // other for a commit that none of them have (note that we are unable to rule this out, as
1681 : // nothing prevents a malicious user from intentionally sending a notification with
1682 : // a fake commit ID).
1683 1919 : if (cb)
1684 1919 : cb(commitFound);
1685 :
1686 : // Announce if profile changed
1687 1919 : if (!commits.empty()) {
1688 1818 : auto diffStats = repo->diffStats("HEAD", oldHead);
1689 909 : auto changedFiles = repo->changedFiles(diffStats);
1690 909 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") != changedFiles.end()) {
1691 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(accountId_,
1692 5 : repo->id(),
1693 10 : repo->infos());
1694 : }
1695 909 : }
1696 3969 : }
1697 2028 : }
1698 :
1699 : void
1700 2050 : Conversation::sync(const std::string& member, const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1701 : {
1702 2050 : pull(deviceId, std::move(cb), commitId);
1703 2051 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1704 2051 : auto sthis = w.lock();
1705 : // For waiting request, downloadFile
1706 2051 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1707 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1708 2051 : }
1709 2051 : });
1710 2051 : }
1711 :
1712 : std::map<std::string, std::string>
1713 268 : Conversation::generateInvitation() const
1714 : {
1715 : // Invite the new member to the conversation
1716 268 : Json::Value root;
1717 268 : auto& metadata = root[ConversationMapKeys::METADATAS];
1718 542 : for (const auto& [k, v] : infos()) {
1719 274 : if (v.size() >= 64000) {
1720 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1721 0 : continue;
1722 0 : }
1723 274 : metadata[k] = v;
1724 268 : }
1725 268 : root[ConversationMapKeys::CONVERSATIONID] = id();
1726 804 : return {{"application/invite+json", json::toString(root)}};
1727 268 : }
1728 :
1729 : std::string
1730 7 : Conversation::leave()
1731 : {
1732 7 : setRemovingFlag();
1733 7 : std::lock_guard lk(pimpl_->writeMtx_);
1734 14 : return pimpl_->repository_->leave();
1735 7 : }
1736 :
1737 : void
1738 10 : Conversation::setRemovingFlag()
1739 : {
1740 10 : pimpl_->isRemoving_ = true;
1741 10 : }
1742 :
1743 : bool
1744 4076 : Conversation::isRemoving()
1745 : {
1746 4076 : return pimpl_->isRemoving_;
1747 : }
1748 :
1749 : void
1750 21 : Conversation::erase()
1751 : {
1752 21 : if (pimpl_->conversationDataPath_ != "")
1753 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1754 21 : if (!pimpl_->repository_)
1755 0 : return;
1756 21 : std::lock_guard lk(pimpl_->writeMtx_);
1757 21 : pimpl_->repository_->erase();
1758 21 : }
1759 :
1760 : ConversationMode
1761 4864 : Conversation::mode() const
1762 : {
1763 4864 : return pimpl_->repository_->mode();
1764 : }
1765 :
1766 : std::vector<std::string>
1767 30 : Conversation::getInitialMembers() const
1768 : {
1769 30 : return pimpl_->repository_->getInitialMembers();
1770 : }
1771 :
1772 : bool
1773 0 : Conversation::isInitialMember(const std::string& uri) const
1774 : {
1775 0 : auto members = getInitialMembers();
1776 0 : return std::find(members.begin(), members.end(), uri) != members.end();
1777 0 : }
1778 :
1779 : void
1780 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
1781 : {
1782 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
1783 9 : if (auto sthis = w.lock()) {
1784 9 : auto& repo = sthis->pimpl_->repository_;
1785 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1786 9 : auto commit = repo->updateInfos(map);
1787 9 : sthis->pimpl_->announce(commit, true);
1788 9 : lk.unlock();
1789 9 : if (cb)
1790 9 : cb(!commit.empty(), commit);
1791 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
1792 9 : repo->id(),
1793 18 : repo->infos());
1794 18 : }
1795 9 : });
1796 9 : }
1797 :
1798 : std::map<std::string, std::string>
1799 315 : Conversation::infos() const
1800 : {
1801 315 : return pimpl_->repository_->infos();
1802 : }
1803 :
1804 : void
1805 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
1806 : {
1807 8 : const auto& filePath = pimpl_->preferencesPath_;
1808 8 : auto prefs = map;
1809 8 : auto itLast = prefs.find(LAST_MODIFIED);
1810 8 : if (itLast != prefs.end()) {
1811 3 : std::error_code ec;
1812 3 : if (std::filesystem::is_regular_file(filePath, ec)) {
1813 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
1814 : try {
1815 1 : if (lastModified >= to_int<uint64_t>(itLast->second))
1816 0 : return;
1817 0 : } catch (...) {
1818 0 : return;
1819 0 : }
1820 : }
1821 3 : prefs.erase(itLast);
1822 : }
1823 :
1824 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
1825 8 : msgpack::pack(file, prefs);
1826 8 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_, id(), std::move(prefs));
1827 8 : }
1828 :
1829 : std::map<std::string, std::string>
1830 105 : Conversation::preferences(bool includeLastModified) const
1831 : {
1832 : try {
1833 105 : std::map<std::string, std::string> preferences;
1834 105 : const auto& filePath = pimpl_->preferencesPath_;
1835 199 : auto file = fileutils::loadFile(filePath);
1836 11 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
1837 11 : oh.get().convert(preferences);
1838 11 : if (includeLastModified)
1839 8 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
1840 11 : return preferences;
1841 199 : } catch (const std::exception& e) {
1842 94 : }
1843 94 : return {};
1844 : }
1845 :
1846 : std::vector<uint8_t>
1847 0 : Conversation::vCard() const
1848 : {
1849 : try {
1850 0 : return fileutils::loadFile(pimpl_->repoPath_ / "profile.vcf");
1851 0 : } catch (...) {
1852 0 : }
1853 0 : return {};
1854 : }
1855 :
1856 : std::shared_ptr<TransferManager>
1857 2159 : Conversation::dataTransfer() const
1858 : {
1859 2159 : return pimpl_->transferManager_;
1860 : }
1861 :
1862 : bool
1863 14 : Conversation::onFileChannelRequest(const std::string& member,
1864 : const std::string& fileId,
1865 : std::filesystem::path& path,
1866 : std::string& sha3sum) const
1867 : {
1868 14 : if (!isMember(member))
1869 0 : return false;
1870 :
1871 14 : auto sep = fileId.find('_');
1872 14 : if (sep == std::string::npos)
1873 0 : return false;
1874 :
1875 14 : auto interactionId = fileId.substr(0, sep);
1876 14 : auto commit = getCommit(interactionId);
1877 28 : if (commit == std::nullopt || commit->find("type") == commit->end() || commit->find("tid") == commit->end()
1878 28 : || commit->find("sha3sum") == commit->end() || commit->at("type") != "application/data-transfer+json") {
1879 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
1880 : pimpl_->accountId_,
1881 : member,
1882 : interactionId);
1883 0 : return false;
1884 : }
1885 :
1886 14 : path = dataTransfer()->path(fileId);
1887 14 : sha3sum = commit->at("sha3sum");
1888 :
1889 14 : return true;
1890 14 : }
1891 :
1892 : bool
1893 14 : Conversation::downloadFile(const std::string& interactionId,
1894 : const std::string& fileId,
1895 : const std::string& path,
1896 : const std::string&,
1897 : const std::string& deviceId)
1898 : {
1899 14 : auto commit = getCommit(interactionId);
1900 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
1901 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
1902 0 : return false;
1903 : }
1904 14 : auto tid = commit->find("tid");
1905 14 : auto sha3sum = commit->find("sha3sum");
1906 14 : auto size_str = commit->find("totalSize");
1907 :
1908 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
1909 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
1910 0 : return false;
1911 : }
1912 :
1913 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
1914 14 : if (totalSize < 0) {
1915 0 : JAMI_ERROR("Invalid file size {}", totalSize);
1916 0 : return false;
1917 : }
1918 :
1919 : // Be sure to not lock conversation
1920 28 : dht::ThreadPool().io().run(
1921 14 : [w = weak(), deviceId, fileId, interactionId, sha3sum = sha3sum->second, path, totalSize] {
1922 14 : if (auto shared = w.lock()) {
1923 14 : std::filesystem::path filePath(path);
1924 14 : if (filePath.empty()) {
1925 0 : filePath = shared->dataTransfer()->path(fileId);
1926 : }
1927 :
1928 14 : std::error_code ec;
1929 14 : if (std::filesystem::file_size(filePath, ec) == static_cast<size_t>(totalSize)) {
1930 1 : if (fileutils::sha3File(filePath) == sha3sum) {
1931 4 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
1932 1 : return;
1933 : }
1934 : }
1935 :
1936 13 : std::filesystem::path tempFilePath(filePath);
1937 13 : tempFilePath += ".tmp";
1938 13 : auto start = std::filesystem::file_size(tempFilePath, ec);
1939 13 : if (ec || start == static_cast<decltype(start)>(-1)) {
1940 11 : start = 0;
1941 : }
1942 13 : size_t end = 0;
1943 :
1944 13 : auto acc = shared->pimpl_->account_.lock();
1945 13 : if (!acc)
1946 0 : return;
1947 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
1948 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
1949 28 : }
1950 : });
1951 14 : return true;
1952 14 : }
1953 :
1954 : void
1955 1117 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
1956 : {
1957 1117 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
1958 1117 : auto sthis = w.lock();
1959 1117 : if (!sthis)
1960 0 : return;
1961 : // Update fetched for Uri
1962 1117 : auto uri = sthis->uriFromDevice(deviceId);
1963 1117 : if (uri.empty() || uri == sthis->pimpl_->userId_)
1964 45 : return;
1965 : // When a user fetches a commit, the message is sent for this person
1966 1072 : sthis->pimpl_->updateStatus(uri,
1967 : libjami::Account::MessageStates::SENT,
1968 1072 : commitId,
1969 2144 : std::to_string(std::time(nullptr)),
1970 : true);
1971 1162 : });
1972 1117 : }
1973 :
1974 : void
1975 1118 : Conversation::Impl::updateStatus(const std::string& uri,
1976 : libjami::Account::MessageStates st,
1977 : const std::string& commitId,
1978 : const std::string& ts,
1979 : bool emit)
1980 : {
1981 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer
1982 : // send us a status and will emit to other connected devices.
1983 1118 : LogOptions options;
1984 1118 : std::map<std::string, std::map<std::string, std::string>> newStatus;
1985 : {
1986 : // Update internal structures.
1987 1118 : std::lock_guard lk(messageStatusMtx_);
1988 1118 : auto& status = messagesStatus_[uri];
1989 1118 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
1990 1118 : if (oldStatus == commitId)
1991 3 : return; // Nothing to do
1992 1115 : options.to = oldStatus;
1993 1115 : options.from = commitId;
1994 1115 : oldStatus = commitId;
1995 1115 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
1996 1115 : saveStatus();
1997 1115 : if (emit)
1998 1085 : newStatus[uri].insert(status.begin(), status.end());
1999 1118 : }
2000 1115 : if (emit && messageStatusCb_) {
2001 1085 : messageStatusCb_(newStatus);
2002 : }
2003 : // Update messages status for all commit between the old and new one
2004 1115 : options.logIfNotFound = false;
2005 1115 : options.fastLog = true;
2006 1115 : History optHistory;
2007 1115 : std::unique_lock lk(optHistory.mutex); // Avoid to announce messages while updating status.
2008 1115 : auto res = loadMessages(options, &optHistory);
2009 1115 : std::unique_lock mlk(messageStatusMtx_);
2010 1115 : std::vector<std::pair<std::string, int32_t>> statusToUpdate;
2011 1115 : if (res.size() == 0) {
2012 : // In this case, commit is not received yet, so we cache it
2013 10 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2014 : }
2015 6802 : for (const auto& [cid, _] : optHistory.quickAccess) {
2016 5687 : auto message = loadedHistory_.quickAccess.find(cid);
2017 5687 : if (message != loadedHistory_.quickAccess.end()) {
2018 : // Update message and emit to client,
2019 2885 : if (static_cast<int32_t>(st) > message->second->status[uri]) {
2020 2880 : message->second->status[uri] = static_cast<int32_t>(st);
2021 2880 : statusToUpdate.emplace_back(cid, static_cast<int32_t>(st));
2022 : }
2023 : } else {
2024 : // In this case, commit is not loaded by client, so we cache it
2025 : // No need to emit to client, they will get a correct status on load.
2026 2802 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2027 : }
2028 : }
2029 1115 : mlk.unlock();
2030 1115 : lk.unlock();
2031 3995 : for (const auto& [cid, status] : statusToUpdate)
2032 5760 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
2033 2880 : repository_->id(),
2034 : uri,
2035 : cid,
2036 : static_cast<int>(status));
2037 1121 : }
2038 :
2039 : bool
2040 18 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2041 : {
2042 18 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2043 18 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2044 2 : return false; // Nothing to do
2045 16 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2046 16 : auto sthis = w.lock();
2047 16 : if (!sthis)
2048 0 : return;
2049 16 : sthis->pimpl_->updateStatus(uri,
2050 : libjami::Account::MessageStates::DISPLAYED,
2051 16 : interactionId,
2052 32 : std::to_string(std::time(nullptr)),
2053 : true);
2054 16 : });
2055 16 : return true;
2056 18 : }
2057 :
2058 : std::map<std::string, std::map<std::string, std::string>>
2059 87 : Conversation::messageStatus() const
2060 : {
2061 87 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2062 174 : return pimpl_->messagesStatus_;
2063 87 : }
2064 :
2065 : void
2066 44 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2067 : {
2068 44 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2069 44 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2070 93 : for (const auto& [uri, status] : messageStatus) {
2071 49 : auto& oldMs = pimpl_->messagesStatus_[uri];
2072 49 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2073 26 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2074 0 : stVec.emplace_back(libjami::Account::MessageStates::SENT,
2075 : uri,
2076 26 : status.at("fetched"),
2077 52 : status.at("fetched_ts"));
2078 : }
2079 : }
2080 49 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2081 4 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2082 0 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED,
2083 : uri,
2084 4 : status.at("read"),
2085 8 : status.at("read_ts"));
2086 : }
2087 : }
2088 : }
2089 44 : lk.unlock();
2090 :
2091 74 : for (const auto& [status, uri, commitId, ts] : stVec) {
2092 30 : pimpl_->updateStatus(uri, status, commitId, ts);
2093 : }
2094 44 : }
2095 :
2096 : void
2097 390 : Conversation::onMessageStatusChanged(
2098 : const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2099 : {
2100 390 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2101 390 : pimpl_->messageStatusCb_ = cb;
2102 390 : }
2103 :
2104 : #ifdef LIBJAMI_TEST
2105 : void
2106 514 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2107 : {
2108 514 : pimpl_->bootstrapCbTest_ = cb;
2109 514 : }
2110 :
2111 : std::vector<libjami::SwarmMessage>
2112 0 : Conversation::loadMessagesSync(const LogOptions& options)
2113 : {
2114 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
2115 0 : auto result = pimpl_->loadMessages(options);
2116 0 : return result;
2117 0 : }
2118 :
2119 : void
2120 0 : Conversation::announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf)
2121 : {
2122 0 : pimpl_->announce(commits, commitFromSelf);
2123 0 : }
2124 :
2125 : void
2126 0 : Conversation::announce(const std::string& commitId, bool commitFromSelf)
2127 : {
2128 0 : pimpl_->announce(commitId, commitFromSelf);
2129 0 : }
2130 : #endif
2131 :
2132 : void
2133 606 : Conversation::checkBootstrapMember(const asio::error_code& ec, std::vector<std::map<std::string, std::string>> members)
2134 : {
2135 606 : if (ec == asio::error::operation_aborted)
2136 317 : return;
2137 583 : auto acc = pimpl_->account_.lock();
2138 583 : if (pimpl_->swarmManager_->isConnected() or not acc)
2139 14 : return;
2140 : // We bootstrap the DRT with devices who already wrote in the repository.
2141 : // However, in a conversation, a large number of devices may just watch
2142 : // the conversation, but never write any message.
2143 569 : std::unique_lock lock(pimpl_->membersMtx_);
2144 :
2145 569 : std::string uri;
2146 855 : while (!members.empty()) {
2147 294 : auto member = std::move(members.back());
2148 294 : members.pop_back();
2149 294 : uri = std::move(member.at("uri"));
2150 294 : if (uri != pimpl_->userId_ && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2151 8 : break;
2152 294 : }
2153 281 : auto fallbackFailed = [](auto sthis) {
2154 1124 : JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.", sthis->pimpl_->toString());
2155 : #ifdef LIBJAMI_TEST
2156 281 : if (sthis->pimpl_->bootstrapCbTest_)
2157 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2158 : #endif
2159 281 : };
2160 : // If members is empty, we finished the fallback un-successfully
2161 569 : if (members.empty() && uri.empty()) {
2162 280 : lock.unlock();
2163 280 : fallbackFailed(this);
2164 280 : return;
2165 : }
2166 :
2167 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2168 289 : pimpl_->checkedMembers_.emplace(uri);
2169 289 : auto devices = std::make_shared<std::vector<NodeId>>();
2170 1156 : acc->forEachDevice(
2171 578 : dht::InfoHash(uri),
2172 291 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2173 : // Test if already sent
2174 291 : if (auto sthis = w.lock()) {
2175 289 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2176 279 : devices->emplace_back(dev->getLongId());
2177 291 : }
2178 291 : },
2179 289 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed = std::move(fallbackFailed)](bool ok) {
2180 289 : auto sthis = w.lock();
2181 289 : if (!sthis)
2182 2 : return;
2183 287 : auto checkNext = true;
2184 287 : if (ok && devices->size() != 0) {
2185 : #ifdef LIBJAMI_TEST
2186 279 : if (sthis->pimpl_->bootstrapCbTest_)
2187 6 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2188 : #endif
2189 1116 : JAMI_LOG("{} Bootstrap: Fallback with member: {}", sthis->pimpl_->toString(), uri);
2190 279 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2191 1 : checkNext = false;
2192 : }
2193 287 : if (checkNext) {
2194 : // Check next member
2195 286 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2196 572 : sthis->pimpl_->fallbackTimer_->async_wait(
2197 572 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2198 : } else {
2199 : // In this case, all members are checked. Fallback failed
2200 1 : fallbackFailed(sthis);
2201 : }
2202 289 : });
2203 1143 : }
2204 :
2205 : void
2206 513 : Conversation::bootstrap(std::function<void()> onBootstrapped, const std::vector<DeviceId>& knownDevices)
2207 : {
2208 513 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2209 513 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2210 0 : return;
2211 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2212 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2213 : // If it works, the callback onConnectionChanged will be called with ok=true
2214 513 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2215 513 : std::vector<DeviceId> devices = knownDevices;
2216 1532 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2217 1019 : if (!isBanned(member))
2218 1019 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2219 513 : }
2220 2052 : JAMI_DEBUG("{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2221 : // set callback
2222 321 : auto fallback = [](std::shared_ptr<Conversation> sthis, bool now = false) {
2223 : // Fallback
2224 321 : auto acc = sthis->pimpl_->account_.lock();
2225 321 : if (!acc)
2226 0 : return;
2227 321 : auto members = sthis->getMembers(false, false);
2228 321 : std::shuffle(members.begin(), members.end(), acc->rand);
2229 321 : if (now) {
2230 290 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2231 : } else {
2232 31 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2233 93 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2234 62 : - std::chrono::seconds(timeForBootstrap));
2235 124 : JAMI_DEBUG("{} Fallback in {} seconds", sthis->pimpl_->toString(), (20 - timeForBootstrap));
2236 : }
2237 642 : sthis->pimpl_->fallbackTimer_->async_wait(
2238 642 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2239 321 : };
2240 :
2241 513 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2242 : // This will call methods from accounts, so trigger on another thread.
2243 362 : dht::ThreadPool::io().run([w, ok, fallback = std::move(fallback)] {
2244 362 : auto sthis = w.lock();
2245 362 : if (!sthis)
2246 0 : return;
2247 362 : if (ok) {
2248 : // Bootstrap succeeded!
2249 : {
2250 331 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2251 331 : sthis->pimpl_->checkedMembers_.clear();
2252 331 : }
2253 331 : if (sthis->pimpl_->bootstrapCb_)
2254 330 : sthis->pimpl_->bootstrapCb_();
2255 : #ifdef LIBJAMI_TEST
2256 331 : if (sthis->pimpl_->bootstrapCbTest_)
2257 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2258 : #endif
2259 331 : return;
2260 : }
2261 31 : fallback(sthis);
2262 362 : });
2263 362 : });
2264 : {
2265 513 : std::lock_guard lock(pimpl_->membersMtx_);
2266 513 : pimpl_->checkedMembers_.clear();
2267 513 : }
2268 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic
2269 : // connectivity change
2270 513 : if (pimpl_->swarmManager_->isShutdown()) {
2271 24 : pimpl_->swarmManager_->restart();
2272 24 : pimpl_->swarmManager_->maintainBuckets();
2273 489 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2274 290 : fallback(shared_from_this(), true);
2275 : }
2276 513 : }
2277 :
2278 : std::vector<std::string>
2279 18 : Conversation::commitsEndedCalls()
2280 : {
2281 18 : pimpl_->loadActiveCalls();
2282 18 : pimpl_->loadHostedCalls();
2283 18 : auto commits = pimpl_->commitsEndedCalls();
2284 18 : if (!commits.empty()) {
2285 : // Announce to client
2286 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2287 0 : if (auto sthis = w.lock())
2288 0 : sthis->pimpl_->announce(commits, true);
2289 0 : });
2290 : }
2291 18 : return commits;
2292 0 : }
2293 :
2294 : void
2295 390 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2296 : {
2297 390 : pimpl_->onMembersChanged_ = std::move(cb);
2298 390 : pimpl_->repository_->onMembersChanged([w = weak()](const std::set<std::string>& memberUris) {
2299 1084 : if (auto sthis = w.lock())
2300 1083 : sthis->pimpl_->onMembersChanged_(memberUris);
2301 1084 : });
2302 390 : }
2303 :
2304 : void
2305 390 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2306 : {
2307 780 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket), w = weak()](const std::string& deviceId,
2308 : ChannelCb&& cb) {
2309 733 : if (auto sthis = w.lock())
2310 733 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2311 1513 : };
2312 390 : }
2313 :
2314 : void
2315 698 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2316 : {
2317 698 : auto deviceId = channel->deviceId();
2318 : // Transmit avatar if necessary
2319 : // We do this here, because at this point we know both sides are connected and in
2320 : // the same conversation
2321 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2322 699 : auto cert = channel->peerCertificate();
2323 699 : if (!cert || !cert->issuer)
2324 0 : return;
2325 698 : auto member = cert->issuer->getId().toString();
2326 699 : pimpl_->swarmManager_->addChannel(std::move(channel));
2327 699 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2328 699 : auto sthis = w.lock();
2329 699 : if (auto account = a.lock()) {
2330 697 : account->sendProfile(sthis->id(), member, deviceId.toString());
2331 699 : }
2332 699 : });
2333 699 : }
2334 :
2335 : uint32_t
2336 4 : Conversation::countInteractions(const std::string& toId, const std::string& fromId, const std::string& authorUri) const
2337 : {
2338 4 : LogOptions options;
2339 4 : options.to = toId;
2340 4 : options.from = fromId;
2341 4 : options.authorUri = authorUri;
2342 4 : options.logIfNotFound = false;
2343 4 : options.fastLog = true;
2344 4 : History history;
2345 4 : std::lock_guard lk(history.mutex);
2346 4 : auto res = pimpl_->loadMessages(options, &history);
2347 8 : return res.size();
2348 4 : }
2349 :
2350 : void
2351 4 : Conversation::search(uint32_t req, const Filter& filter, const std::shared_ptr<std::atomic_int>& flag) const
2352 : {
2353 : // Because logging a conversation can take quite some time,
2354 : // do it asynchronously
2355 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2356 4 : if (auto sthis = w.lock()) {
2357 4 : History history;
2358 4 : std::vector<std::map<std::string, std::string>> commits {};
2359 : // std::regex_constants::ECMAScript is the default flag.
2360 4 : auto re = std::regex(filter.regexSearch,
2361 4 : filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2362 4 : sthis->pimpl_->repository_->log(
2363 20 : [&](const std::string& /*id*/, const GitAuthor& author, const GitCommit& commit) {
2364 20 : if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
2365 : // Filter author
2366 0 : return CallbackResult::Skip;
2367 : }
2368 20 : auto commitTime = git_commit_time(commit.get());
2369 20 : if (filter.before && filter.before < commitTime) {
2370 : // Only get commits before this date
2371 0 : return CallbackResult::Skip;
2372 : }
2373 20 : if (filter.after && filter.after > commitTime) {
2374 : // Only get commits before this date
2375 0 : if (git_commit_parentcount(commit.get()) <= 1)
2376 0 : return CallbackResult::Break;
2377 : else
2378 0 : return CallbackResult::Skip; // Because we are sorting it with
2379 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2380 : }
2381 :
2382 20 : return CallbackResult::Ok; // Continue
2383 : },
2384 20 : [&](ConversationCommit&& cc) {
2385 20 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2386 40 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2387 20 : },
2388 20 : [&](const std::string& id, const GitAuthor&, ConversationCommit&) {
2389 20 : if (id == filter.lastId)
2390 0 : return true;
2391 20 : return false;
2392 : },
2393 : "",
2394 : false);
2395 : // Search on generated history
2396 24 : for (auto& message : history.messageList) {
2397 20 : auto contentType = message->type;
2398 20 : auto isSearchable = contentType == "text/plain" || contentType == "application/data-transfer+json";
2399 20 : if (filter.type.empty() && !isSearchable) {
2400 : // Not searchable, at least for now
2401 8 : continue;
2402 12 : } else if (contentType == filter.type || filter.type.empty()) {
2403 12 : if (isSearchable) {
2404 : // If it's a text match the body, else the display name
2405 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2406 36 : : message->body.at("displayName");
2407 12 : std::smatch body_match;
2408 12 : if (std::regex_search(body, body_match, re)) {
2409 5 : auto commit = message->body;
2410 5 : commit["id"] = message->id;
2411 5 : commit["type"] = message->type;
2412 5 : commits.emplace_back(commit);
2413 5 : }
2414 12 : } else {
2415 : // Matching type, just add it to the results
2416 0 : commits.emplace_back(message->body);
2417 : }
2418 :
2419 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2420 0 : break;
2421 : }
2422 20 : }
2423 :
2424 4 : if (commits.size() > 0)
2425 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2426 3 : sthis->pimpl_->accountId_,
2427 6 : sthis->id(),
2428 3 : std::move(commits));
2429 : // If we're the latest thread, inform client that the search is finished
2430 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2431 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2432 8 : req, sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2433 : }
2434 8 : }
2435 4 : });
2436 4 : }
2437 :
2438 : void
2439 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2440 : {
2441 14 : if (!message.isMember("confId")) {
2442 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2443 0 : return;
2444 : }
2445 :
2446 14 : auto now = std::chrono::system_clock::now();
2447 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2448 : {
2449 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2450 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2451 14 : pimpl_->saveHostedCalls();
2452 14 : }
2453 :
2454 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2455 : }
2456 :
2457 : bool
2458 20 : Conversation::isHosting(const std::string& confId) const
2459 : {
2460 20 : auto info = infos();
2461 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2462 0 : return true; // We are the current device Host
2463 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2464 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2465 20 : }
2466 :
2467 : void
2468 10 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2469 : {
2470 10 : if (!message.isMember("confId")) {
2471 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2472 0 : return;
2473 : }
2474 :
2475 10 : auto erased = false;
2476 : {
2477 10 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2478 10 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2479 10 : }
2480 10 : if (erased) {
2481 10 : pimpl_->saveHostedCalls();
2482 10 : sendMessage(std::move(message), "", {}, std::move(cb));
2483 : } else
2484 0 : cb(false, "");
2485 : }
2486 :
2487 : std::vector<std::map<std::string, std::string>>
2488 38 : Conversation::currentCalls() const
2489 : {
2490 38 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2491 76 : return pimpl_->activeCalls_;
2492 38 : }
2493 : } // namespace jami
|