Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/ring_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #include "conversationrepository.h"
25 : #ifdef ENABLE_PLUGIN
26 : #include "manager.h"
27 : #include "plugin/jamipluginmanager.h"
28 : #include "plugin/streamdata.h"
29 : #endif
30 : #include "jami/conversation_interface.h"
31 : #include "jami/configurationmanager_interface.h"
32 :
33 : #include "fileutils.h"
34 : #include "json_utils.h"
35 : #include "logger.h"
36 :
37 : #include <opendht/thread_pool.h>
38 : #include <opendht/infohash.h>
39 : #include <fmt/compile.h>
40 : #include <asio/error_code.hpp>
41 :
42 : #include <algorithm>
43 : #include <memory>
44 : #include <charconv>
45 : #include <string_view>
46 : #include <tuple>
47 : #include <optional>
48 : #include <utility>
49 : #include <deque>
50 : #include <chrono>
51 : #include <ctime>
52 : #include <functional>
53 : #include <atomic>
54 : #include <regex>
55 :
56 : namespace jami {
57 :
58 : static const char* const LAST_MODIFIED = "lastModified";
59 :
60 13 : ConvInfo::ConvInfo(const Json::Value& json)
61 : {
62 13 : id = json[ConversationMapKeys::ID].asString();
63 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
64 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
65 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
66 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
67 20 : members.emplace(v["uri"].asString());
68 : }
69 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
70 13 : }
71 :
72 : Json::Value
73 25 : ConvInfo::toJson() const
74 : {
75 25 : Json::Value json;
76 25 : json[ConversationMapKeys::ID] = id;
77 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
78 25 : if (removed) {
79 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
80 : }
81 25 : if (erased) {
82 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
83 : }
84 64 : for (const auto& m : members) {
85 39 : Json::Value member;
86 39 : member["uri"] = m;
87 39 : json[ConversationMapKeys::MEMBERS].append(member);
88 39 : }
89 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
90 25 : return json;
91 0 : }
92 :
93 : // ConversationRequest
94 262 : ConversationRequest::ConversationRequest(const Json::Value& json)
95 : {
96 262 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
97 262 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
98 262 : from = json[ConversationMapKeys::FROM].asString();
99 262 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
100 262 : auto& md = json[ConversationMapKeys::METADATAS];
101 527 : for (const auto& member : md.getMemberNames()) {
102 265 : metadatas.emplace(member, md[member].asString());
103 262 : }
104 262 : }
105 :
106 : Json::Value
107 2 : ConversationRequest::toJson() const
108 : {
109 2 : Json::Value json;
110 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
111 2 : json[ConversationMapKeys::FROM] = from;
112 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
113 2 : if (declined)
114 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
115 4 : for (const auto& [key, value] : metadatas) {
116 2 : json[ConversationMapKeys::METADATAS][key] = value;
117 : }
118 2 : return json;
119 0 : }
120 :
121 : std::map<std::string, std::string>
122 225 : ConversationRequest::toMap() const
123 : {
124 225 : auto result = metadatas;
125 225 : result[ConversationMapKeys::ID] = conversationId;
126 225 : result[ConversationMapKeys::FROM] = from;
127 225 : if (declined)
128 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
129 225 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
130 225 : return result;
131 0 : }
132 :
133 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
134 :
135 : struct History
136 : {
137 : // While loading the history, we need to avoid:
138 : // - reloading history (can just be ignored)
139 : // - adding new commits (should wait for history to be loaded)
140 : std::mutex mutex {};
141 : std::condition_variable cv {};
142 : bool loading {false};
143 : MessageList messageList {};
144 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
145 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
146 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
147 : };
148 :
149 : class Conversation::Impl
150 : {
151 : private:
152 394 : Impl(std::unique_ptr<ConversationRepository>&& repository,
153 : const std::shared_ptr<JamiAccount>& account,
154 : std::vector<ConversationCommit>&& commits = {})
155 394 : : repository_(std::move(repository ? repository : throw std::logic_error("Invalid repository")))
156 389 : , account_(account)
157 389 : , accountId_(account->getAccountID())
158 389 : , userId_(account->getUsername())
159 389 : , deviceId_(account->currentDeviceId())
160 389 : , swarmManager_(std::make_shared<SwarmManager>(NodeId(deviceId_),
161 778 : Manager::instance().getSeededRandomEngine(),
162 389 : [account = account_](const DeviceId& deviceId) {
163 2626 : if (auto acc = account.lock()) {
164 2626 : return acc->isConnectedWith(deviceId);
165 2626 : }
166 0 : return false;
167 : }))
168 389 : , transferManager_(std::make_shared<TransferManager>(accountId_,
169 : "",
170 389 : repository_->id(),
171 389 : Manager::instance().getSeededRandomEngine()))
172 389 : , repoPath_(fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id())
173 389 : , conversationDataPath_(fileutils::get_data_dir() / accountId_ / "conversation_data" / repository_->id())
174 389 : , fetchedPath_(conversationDataPath_ / ConversationDirectories::FETCHED)
175 389 : , sendingPath_(conversationDataPath_ / ConversationDirectories::SENDING)
176 389 : , preferencesPath_(conversationDataPath_ / ConversationDirectories::PREFERENCES)
177 389 : , statusPath_(conversationDataPath_ / ConversationDirectories::STATUS)
178 389 : , hostedCallsPath_(conversationDataPath_ / ConversationDirectories::HOSTED_CALLS)
179 389 : , activeCallsPath_(conversationDataPath_ / ConversationDirectories::ACTIVE_CALLS)
180 389 : , ioContext_(Manager::instance().ioContext())
181 389 : , fallbackTimer_(std::make_unique<asio::steady_timer>(*ioContext_))
182 2339 : , typers_(std::make_shared<Typers>(account, repository_->id()))
183 : {
184 389 : if (!commits.empty())
185 185 : initActiveCalls(repository_->convCommitsToMap(commits));
186 389 : loadStatus();
187 409 : }
188 :
189 190 : Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&& repoAndCommits,
190 : const std::shared_ptr<JamiAccount>& account)
191 190 : : Impl(std::move(repoAndCommits.first), account, std::move(repoAndCommits.second))
192 185 : {}
193 :
194 : public:
195 186 : Impl(const std::shared_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "")
196 186 : : Impl(ConversationRepository::createConversation(account, mode, otherMember), account)
197 186 : {}
198 :
199 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
200 18 : : Impl(std::make_unique<ConversationRepository>(account, conversationId), account)
201 18 : {}
202 :
203 191 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& remoteDevice, const std::string& conversationId)
204 191 : : Impl(ConversationRepository::cloneConversation(account, remoteDevice, conversationId), account)
205 185 : {}
206 :
207 2979 : std::string toString() const
208 : {
209 11916 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
210 : }
211 : mutable std::string fmtStr_;
212 :
213 389 : ~Impl()
214 5835 : {
215 : try {
216 389 : if (fallbackTimer_)
217 389 : fallbackTimer_->cancel();
218 0 : } catch (const std::exception& e) {
219 0 : JAMI_ERROR("{:s} {:s}", toString(), e.what());
220 0 : }
221 389 : }
222 :
223 : /**
224 : * If, for whatever reason, the daemon is stopped while hosting a conference,
225 : * we need to announce the end of this call when restarting.
226 : * To avoid to keep active calls forever.
227 : */
228 : std::vector<std::string> commitsEndedCalls();
229 : bool isAdmin() const;
230 :
231 283 : void announce(const std::string& commitId, bool commitFromSelf = false)
232 : {
233 283 : std::vector<std::string> vec;
234 283 : if (!commitId.empty())
235 281 : vec.emplace_back(commitId);
236 283 : announce(vec, commitFromSelf);
237 283 : }
238 :
239 296 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
240 : {
241 296 : std::vector<ConversationCommit> convcommits;
242 296 : convcommits.reserve(commits.size());
243 603 : for (const auto& cid : commits) {
244 307 : if (auto commit = repository_->getCommit(cid)) {
245 307 : convcommits.emplace_back(*commit);
246 307 : }
247 : }
248 296 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
249 296 : }
250 :
251 : /**
252 : * Initialize activeCalls_ from the list of commits in the repository
253 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
254 : */
255 185 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
256 : {
257 185 : std::unordered_set<std::string> invalidHostUris;
258 185 : std::unordered_set<std::string> invalidCallIds;
259 :
260 185 : std::lock_guard lk(activeCallsMtx_);
261 1160 : for (const auto& commit : commits) {
262 975 : if (commit.at("type") == "member") {
263 : // Each commit of type "member" has an "action" field whose value can be one
264 : // of the following: "add", "join", "remove", "ban", "unban"
265 : // In the case of "remove" and "ban", we need to add the member's URI to
266 : // invalidHostUris to ensure that any call they may have started in the past
267 : // is no longer considered active.
268 : // For the other actions, there's no harm in adding the member's URI anyway,
269 : // since it's not possible to start hosting a call before joining the swarm (or
270 : // before getting unbanned in the case of previously banned members).
271 769 : invalidHostUris.emplace(commit.at("uri"));
272 413 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
273 413 : && commit.find("device") != commit.end()) {
274 : // The commit indicates either the end or the beginning of a call
275 : // (depending on whether there is a "duration" field or not).
276 1 : auto convId = repository_->id();
277 2 : auto confId = commit.at("confId");
278 2 : auto uri = commit.at("uri");
279 2 : auto device = commit.at("device");
280 :
281 3 : if (commit.find("duration") == commit.end() && invalidCallIds.find(confId) == invalidCallIds.end()
282 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
283 1 : std::map<std::string, std::string> activeCall;
284 1 : activeCall["id"] = confId;
285 1 : activeCall["uri"] = uri;
286 1 : activeCall["device"] = device;
287 1 : activeCalls_.emplace_back(activeCall);
288 0 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
289 : convId,
290 : confId,
291 : device,
292 : uri);
293 1 : }
294 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
295 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
296 : // there are sometimes multiple commits indicating the beginning of the same call.)
297 1 : invalidCallIds.emplace(confId);
298 1 : }
299 : }
300 185 : saveActiveCalls();
301 185 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_, repository_->id(), activeCalls_);
302 185 : }
303 :
304 : /**
305 : * Update activeCalls_ via announced commits (in load or via new commits)
306 : * @param commit Commit to check
307 : * @param eraseOnly If we want to ignore added commits
308 : * @param emitSig If we want to emit to client
309 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
310 : */
311 77 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
312 : bool eraseOnly = false,
313 : bool emitSig = true) const
314 : {
315 77 : if (!repository_)
316 0 : return;
317 77 : if (commit.at("type") == "member") {
318 : // In this case, we need to check if we are not removing a hosting member or device
319 18 : std::lock_guard lk(activeCallsMtx_);
320 18 : auto it = activeCalls_.begin();
321 18 : auto updateActives = false;
322 19 : while (it != activeCalls_.end()) {
323 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
324 4 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left", it->at("id"), commit.at("uri"));
325 1 : it = activeCalls_.erase(it);
326 1 : updateActives = true;
327 : } else {
328 0 : ++it;
329 : }
330 : }
331 18 : if (updateActives) {
332 1 : saveActiveCalls();
333 1 : if (emitSig)
334 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
335 1 : repository_->id(),
336 1 : activeCalls_);
337 : }
338 18 : return;
339 18 : }
340 : // Else, it's a call information
341 174 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
342 174 : && commit.find("device") != commit.end()) {
343 56 : auto convId = repository_->id();
344 112 : auto confId = commit.at("confId");
345 112 : auto uri = commit.at("uri");
346 112 : auto device = commit.at("device");
347 56 : std::lock_guard lk(activeCallsMtx_);
348 56 : auto itActive = std::find_if(activeCalls_.begin(), activeCalls_.end(), [&](const auto& value) {
349 25 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
350 : });
351 56 : if (commit.find("duration") == commit.end()) {
352 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
353 124 : JAMI_DEBUG("swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
354 : convId,
355 : confId,
356 : device,
357 : uri);
358 155 : activeCalls_.emplace_back(std::map<std::string, std::string> {
359 : {"id", confId},
360 : {"uri", uri},
361 : {"device", device},
362 124 : });
363 31 : saveActiveCalls();
364 31 : if (emitSig)
365 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
366 31 : repository_->id(),
367 31 : activeCalls_);
368 : }
369 : } else {
370 25 : if (itActive != activeCalls_.end()) {
371 25 : itActive = activeCalls_.erase(itActive);
372 : // Unlikely, but we must ensure that no duplicate exists
373 25 : while (itActive != activeCalls_.end()) {
374 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
375 0 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
376 : });
377 0 : if (itActive != activeCalls_.end()) {
378 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
379 0 : itActive = activeCalls_.erase(itActive);
380 : }
381 : }
382 :
383 25 : if (eraseOnly) {
384 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
385 : "{:s}, account {:s}",
386 : convId,
387 : confId,
388 : device,
389 : uri);
390 : } else {
391 100 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
392 : convId,
393 : confId,
394 : device,
395 : uri);
396 : }
397 : }
398 25 : saveActiveCalls();
399 25 : if (emitSig)
400 25 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
401 25 : repository_->id(),
402 25 : activeCalls_);
403 : }
404 56 : }
405 : }
406 :
407 1203 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
408 : {
409 1203 : if (!repository_)
410 0 : return;
411 1203 : auto convId = repository_->id();
412 1203 : auto ok = !commits.empty();
413 3607 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
414 1203 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
415 1203 : if (ok) {
416 1201 : bool announceMember = false;
417 2448 : for (const auto& c : commits) {
418 : // Announce member events
419 1247 : if (c.at("type") == "member") {
420 922 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
421 922 : const auto& uri = c.at("uri");
422 922 : const auto& actionStr = c.at("action");
423 922 : auto action = -1;
424 922 : if (actionStr == "add")
425 442 : action = 0;
426 480 : else if (actionStr == "join")
427 461 : action = 1;
428 19 : else if (actionStr == "remove")
429 1 : action = 2;
430 18 : else if (actionStr == "ban")
431 17 : action = 3;
432 1 : else if (actionStr == "unban")
433 1 : action = 4;
434 922 : if (actionStr == "ban" || actionStr == "remove") {
435 : // In this case, a potential host was removed during a call.
436 18 : updateActiveCalls(c);
437 18 : typers_->removeTyper(uri);
438 : }
439 922 : if (action != -1) {
440 922 : announceMember = true;
441 922 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(accountId_,
442 : convId,
443 : uri,
444 : action);
445 : }
446 : }
447 325 : } else if (c.at("type") == "application/call-history+json") {
448 59 : updateActiveCalls(c);
449 : }
450 : #ifdef ENABLE_PLUGIN
451 1247 : auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager();
452 1247 : if (pluginChatManager.hasHandlers()) {
453 0 : auto cm = std::make_shared<JamiMessage>(accountId_, convId, c.at("author") != userId_, c, false);
454 0 : cm->isSwarm = true;
455 0 : pluginChatManager.publishMessage(std::move(cm));
456 0 : }
457 : #endif
458 : }
459 :
460 1201 : if (announceMember && onMembersChanged_) {
461 921 : onMembersChanged_(repository_->memberUris("", {}));
462 : }
463 : }
464 1203 : }
465 :
466 389 : void loadStatus()
467 : {
468 : try {
469 : // read file
470 776 : auto file = fileutils::loadFile(statusPath_);
471 : // load values
472 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
473 2 : std::lock_guard lk {messageStatusMtx_};
474 2 : oh.get().convert(messagesStatus_);
475 389 : } catch (const std::exception& e) {
476 387 : }
477 389 : }
478 1112 : void saveStatus()
479 : {
480 1112 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
481 1112 : msgpack::pack(file, messagesStatus_);
482 1112 : }
483 :
484 18 : void loadActiveCalls() const
485 : {
486 : try {
487 : // read file
488 29 : auto file = fileutils::loadFile(activeCallsPath_);
489 : // load values
490 7 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
491 7 : std::lock_guard lk {activeCallsMtx_};
492 7 : oh.get().convert(activeCalls_);
493 18 : } catch (const std::exception& e) {
494 11 : return;
495 11 : }
496 : }
497 :
498 260 : void saveActiveCalls() const
499 : {
500 260 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
501 260 : msgpack::pack(file, activeCalls_);
502 260 : }
503 :
504 18 : void loadHostedCalls() const
505 : {
506 : try {
507 : // read file
508 30 : auto file = fileutils::loadFile(hostedCallsPath_);
509 : // load values
510 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
511 6 : std::lock_guard lk {activeCallsMtx_};
512 6 : oh.get().convert(hostedCalls_);
513 18 : } catch (const std::exception& e) {
514 12 : return;
515 12 : }
516 : }
517 :
518 43 : void saveHostedCalls() const
519 : {
520 43 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
521 43 : msgpack::pack(file, hostedCalls_);
522 43 : }
523 :
524 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
525 :
526 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
527 : bool includeLeft,
528 : bool includeBanned) const;
529 :
530 6477 : std::string_view bannedType(const std::string& uri) const
531 : {
532 0 : auto crt = fmt::format("{}.crt", uri);
533 12958 : auto bannedMember = repoPath_ / MemberPath::BANNED / MemberPath::MEMBERS / crt;
534 6478 : if (std::filesystem::is_regular_file(bannedMember))
535 16 : return "members"sv;
536 12926 : auto bannedAdmin = repoPath_ / MemberPath::BANNED / MemberPath::ADMINS / crt;
537 6463 : if (std::filesystem::is_regular_file(bannedAdmin))
538 0 : return "admins"sv;
539 12926 : auto bannedInvited = repoPath_ / MemberPath::BANNED / MemberPath::INVITED / uri;
540 6463 : if (std::filesystem::is_regular_file(bannedInvited))
541 0 : return "invited"sv;
542 12925 : auto bannedDevice = repoPath_ / MemberPath::BANNED / MemberPath::DEVICES / crt;
543 6463 : if (std::filesystem::is_regular_file(bannedDevice))
544 0 : return "devices"sv;
545 6463 : return {};
546 6478 : }
547 :
548 4545 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
549 : {
550 4545 : auto deviceSockets = gitSocketList_.find(deviceId);
551 9090 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
552 : }
553 :
554 2039 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
555 : {
556 2039 : gitSocketList_[deviceId] = socket;
557 2039 : }
558 814 : void removeGitSocket(const DeviceId& deviceId)
559 : {
560 814 : auto deviceSockets = gitSocketList_.find(deviceId);
561 814 : if (deviceSockets != gitSocketList_.end())
562 435 : gitSocketList_.erase(deviceSockets);
563 814 : }
564 :
565 : /**
566 : * Remove all git sockets and all DRT nodes associated with the given peer.
567 : * This is used when a swarm member is banned to ensure that we stop syncing
568 : * with them or sending them message notifications.
569 : */
570 : void disconnectFromPeer(const std::string& peerUri);
571 :
572 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, bool includeLeft) const;
573 :
574 : std::mutex membersMtx_ {};
575 : std::set<std::string> checkedMembers_; // Store members we tried
576 : std::function<void()> bootstrapCb_;
577 : std::mutex bootstrapMtx_ {};
578 : #ifdef LIBJAMI_TEST
579 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
580 : #endif
581 :
582 : std::mutex writeMtx_ {};
583 : const std::unique_ptr<ConversationRepository> repository_;
584 : const std::weak_ptr<JamiAccount> account_;
585 : const std::string accountId_;
586 : const std::string userId_;
587 : const std::string deviceId_;
588 : const std::shared_ptr<SwarmManager> swarmManager_;
589 : std::atomic_bool isRemoving_ {false};
590 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
591 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options, History* optHistory = nullptr);
592 : void pull(const std::string& deviceId);
593 :
594 : // Avoid multiple fetch/merges at the same time.
595 : std::mutex pullcbsMtx_ {};
596 : // store current remote in fetch
597 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {};
598 : const std::shared_ptr<TransferManager> transferManager_ {};
599 : const std::filesystem::path repoPath_ {};
600 : const std::filesystem::path conversationDataPath_ {};
601 : const std::filesystem::path fetchedPath_ {};
602 :
603 : // Manage last message displayed and status
604 : const std::filesystem::path sendingPath_ {};
605 : const std::filesystem::path preferencesPath_ {};
606 : const std::filesystem::path statusPath_ {};
607 :
608 : OnMembersChanged onMembersChanged_ {};
609 :
610 : // Manage hosted calls on this device
611 : std::filesystem::path hostedCallsPath_ {};
612 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
613 : // Manage active calls for this conversation (can be hosted by other devices)
614 : std::filesystem::path activeCallsPath_ {};
615 : mutable std::mutex activeCallsMtx_ {};
616 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
617 :
618 : GitSocketList gitSocketList_ {};
619 :
620 : // Bootstrap
621 : const std::shared_ptr<asio::io_context> ioContext_;
622 : const std::unique_ptr<asio::steady_timer> fallbackTimer_;
623 :
624 : /**
625 : * Loaded history represents the linearized history to show for clients
626 : */
627 : History loadedHistory_ {};
628 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
629 : History& history,
630 : const std::vector<std::map<std::string, std::string>>& commits,
631 : bool messageReceived = false,
632 : bool commitFromSelf = false);
633 :
634 : void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
635 : void handleEdition(History& history,
636 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
637 : bool messageReceived) const;
638 : bool handleMessage(History& history,
639 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
640 : bool messageReceived) const;
641 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const;
642 : /**
643 : * {uri, {
644 : * {"fetch", "commitId"},
645 : * {"fetched_ts", "timestamp"},
646 : * {"read", "commitId"},
647 : * {"read_ts", "timestamp"}
648 : * }
649 : * }
650 : */
651 : mutable std::mutex messageStatusMtx_;
652 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
653 : std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
654 : /**
655 : * Status: 0 = commited, 1 = fetched, 2 = read
656 : * This cache the curent status to add in the messages
657 : */
658 : // Note: only store int32_t cause it's easy to pass to dbus this way
659 : // memberToStatus serves as a cache for loading messages
660 : std::map<std::string, int32_t> memberToStatus;
661 :
662 : // futureStatus is used to store the status for receiving messages
663 : // (because we're not sure to fetch the commit before receiving a status change for this)
664 : std::map<std::string, std::map<std::string, int32_t>> futureStatus;
665 : // Update internal structures regarding status
666 : void updateStatus(const std::string& uri,
667 : libjami::Account::MessageStates status,
668 : const std::string& commitId,
669 : const std::string& ts,
670 : bool emit = false);
671 :
672 : std::shared_ptr<Typers> typers_;
673 : };
674 :
675 : bool
676 17 : Conversation::Impl::isAdmin() const
677 : {
678 17 : auto adminsPath = repoPath_ / MemberPath::ADMINS;
679 34 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
680 17 : }
681 :
682 : void
683 17 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
684 : {
685 : // Remove nodes from swarmManager
686 17 : const auto nodes = swarmManager_->getAllNodes();
687 17 : std::vector<NodeId> toRemove;
688 38 : for (const auto node : nodes)
689 21 : if (peerUri == repository_->uriFromDevice(node.toString()))
690 11 : toRemove.emplace_back(node);
691 17 : swarmManager_->deleteNode(toRemove);
692 :
693 : // Remove git sockets with this member
694 38 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
695 21 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
696 11 : it = gitSocketList_.erase(it);
697 : else
698 10 : ++it;
699 : }
700 17 : }
701 :
702 : std::vector<std::map<std::string, std::string>>
703 428 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
704 : {
705 428 : std::vector<std::map<std::string, std::string>> result;
706 428 : auto members = repository_->members();
707 428 : std::lock_guard lk(messageStatusMtx_);
708 1312 : for (const auto& member : members) {
709 884 : if (member.role == MemberRole::BANNED && !includeBanned) {
710 174 : continue;
711 : }
712 884 : if (member.role == MemberRole::INVITED && !includeInvited)
713 174 : continue;
714 710 : if (member.role == MemberRole::LEFT && !includeLeft)
715 0 : continue;
716 710 : auto mm = member.map();
717 710 : auto it = messagesStatus_.find(member.uri);
718 710 : if (it != messagesStatus_.end()) {
719 340 : auto readIt = it->second.find("read");
720 340 : if (readIt != it->second.end())
721 281 : mm[ConversationMapKeys::LAST_DISPLAYED] = readIt->second;
722 : }
723 710 : result.emplace_back(std::move(mm));
724 710 : }
725 856 : return result;
726 428 : }
727 :
728 : std::vector<std::string>
729 18 : Conversation::Impl::commitsEndedCalls()
730 : {
731 : // Handle current calls
732 18 : std::vector<std::string> commits {};
733 18 : std::unique_lock lk(writeMtx_);
734 18 : std::unique_lock lkA(activeCallsMtx_);
735 18 : for (const auto& hostedCall : hostedCalls_) {
736 : // In this case, this means that we left
737 : // the conference while still hosting it, so activeCalls
738 : // will not be correctly updated
739 : // We don't need to send notifications there, as peers will sync with presence
740 0 : Json::Value value;
741 0 : value["uri"] = userId_;
742 0 : value["device"] = deviceId_;
743 0 : value["confId"] = hostedCall.first;
744 0 : value["type"] = "application/call-history+json";
745 0 : auto now = std::chrono::system_clock::now();
746 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
747 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
748 0 : auto itActive = std::find_if(activeCalls_.begin(),
749 : activeCalls_.end(),
750 0 : [this, confId = hostedCall.first](const auto& value) {
751 0 : return value.at("id") == confId && value.at("uri") == userId_
752 0 : && value.at("device") == deviceId_;
753 : });
754 0 : if (itActive != activeCalls_.end())
755 0 : activeCalls_.erase(itActive);
756 0 : commits.emplace_back(repository_->commitMessage(json::toString(value)));
757 :
758 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
759 0 : }
760 18 : hostedCalls_.clear();
761 18 : saveActiveCalls();
762 18 : saveHostedCalls();
763 36 : return commits;
764 18 : }
765 :
766 : std::vector<std::map<std::string, std::string>>
767 0 : Conversation::Impl::loadMessages(const LogOptions& options)
768 : {
769 0 : if (!repository_)
770 0 : return {};
771 0 : std::vector<ConversationCommit> commits;
772 0 : auto startLogging = options.from == "";
773 0 : auto breakLogging = false;
774 0 : repository_->log(
775 0 : [&](const auto& id, const auto& author, const auto& commit) {
776 0 : if (!commits.empty()) {
777 : // Set linearized parent
778 0 : commits.rbegin()->linearized_parent = id;
779 : }
780 0 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
781 0 : return CallbackResult::Skip;
782 : }
783 0 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
784 0 : return CallbackResult::Break; // Stop logging
785 0 : if (breakLogging)
786 0 : return CallbackResult::Break; // Stop logging
787 0 : if (id == options.to) {
788 0 : if (options.includeTo)
789 0 : breakLogging = true; // For the next commit
790 : else
791 0 : return CallbackResult::Break; // Stop logging
792 : }
793 :
794 0 : if (!startLogging && options.from != "" && options.from == id)
795 0 : startLogging = true;
796 0 : if (!startLogging)
797 0 : return CallbackResult::Skip; // Start logging after this one
798 :
799 0 : if (options.fastLog) {
800 0 : if (options.authorUri != "") {
801 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
802 0 : return CallbackResult::Break; // Found author, stop
803 : }
804 : }
805 : // Used to only count commit
806 0 : commits.emplace(commits.end(), ConversationCommit {});
807 0 : return CallbackResult::Skip;
808 : }
809 :
810 0 : return CallbackResult::Ok; // Continue
811 : },
812 0 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
813 0 : [](auto, auto, auto) { return false; },
814 0 : options.from,
815 0 : options.logIfNotFound);
816 0 : return repository_->convCommitsToMap(commits);
817 0 : }
818 :
819 : std::vector<libjami::SwarmMessage>
820 1513 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
821 : {
822 1513 : auto history = optHistory ? optHistory : &loadedHistory_;
823 :
824 : // history->mutex is locked by the caller
825 1513 : if (!repository_ || history->loading) {
826 0 : return {};
827 : }
828 1513 : history->loading = true;
829 :
830 : // By convention, if options.nbOfCommits is zero, then we
831 : // don't impose a limit on the number of commits returned.
832 1513 : bool limitNbOfCommits = options.nbOfCommits > 0;
833 :
834 1513 : auto startLogging = options.from == "";
835 1513 : auto breakLogging = false;
836 1513 : auto currentHistorySize = loadedHistory_.messageList.size();
837 1513 : std::vector<std::string> replies;
838 1513 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
839 3026 : repository_->log(
840 : /* preCondition */
841 8135 : [&](const auto& id, const auto& author, const auto& commit) {
842 8135 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
843 2 : return CallbackResult::Skip;
844 : }
845 8132 : if (id == options.to) {
846 686 : if (options.includeTo)
847 0 : breakLogging = true; // For the next commit
848 : }
849 8130 : if (replies.empty()) { // This avoid load until
850 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
851 : // until this commit
852 16266 : if ((limitNbOfCommits
853 8133 : && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
854 5 : return CallbackResult::Break; // Stop logging
855 8128 : if (breakLogging)
856 0 : return CallbackResult::Break; // Stop logging
857 8128 : if (id == options.to && !options.includeTo) {
858 685 : return CallbackResult::Break; // Stop logging
859 : }
860 : }
861 :
862 7442 : if (!startLogging && options.from != "" && options.from == id)
863 1108 : startLogging = true;
864 7447 : if (!startLogging)
865 24 : return CallbackResult::Skip; // Start logging after this one
866 :
867 7423 : if (options.fastLog) {
868 6748 : if (options.authorUri != "") {
869 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
870 1 : return CallbackResult::Break; // Found author, stop
871 : }
872 : }
873 : }
874 :
875 7418 : return CallbackResult::Ok; // Continue
876 : },
877 : /* emplaceCb */
878 7423 : [&](auto&& cc) {
879 7423 : if (limitNbOfCommits && (msgList.size() == options.nbOfCommits))
880 273 : return;
881 7151 : auto optMessage = repository_->convCommitToMap(cc);
882 7154 : if (!optMessage.has_value())
883 1 : return;
884 7147 : auto message = optMessage.value();
885 7151 : if (message.find("reply-to") != message.end()) {
886 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
887 1 : if (it == replies.end()) {
888 1 : replies.emplace_back(message.at("reply-to"));
889 : }
890 : }
891 7151 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
892 7146 : if (it != replies.end()) {
893 1 : replies.erase(it);
894 : }
895 7148 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
896 7148 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
897 0 : firstMsg = *loadedHistory_.messageList.rbegin();
898 : }
899 21443 : auto added = addToHistory(*history, {message}, false, false);
900 7149 : if (!added.empty() && firstMsg) {
901 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg);
902 : }
903 7142 : msgList.insert(msgList.end(), added.begin(), added.end());
904 7152 : },
905 : /* postCondition */
906 7426 : [&](auto, auto, auto) {
907 : // Stop logging if there was a limit set on the number of commits
908 : // to return and we reached it. This isn't strictly necessary since
909 : // the check at the beginning of `emplaceCb` ensures that we won't
910 : // return too many messages, but it prevents us from needlessly
911 : // iterating over a (potentially) large number of commits.
912 7426 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
913 : },
914 1513 : options.from,
915 1513 : options.logIfNotFound);
916 :
917 1513 : history->loading = false;
918 1513 : history->cv.notify_all();
919 :
920 : // Convert for client (remove ptr)
921 1513 : std::vector<libjami::SwarmMessage> ret;
922 1513 : ret.reserve(msgList.size());
923 8658 : for (const auto& msg : msgList) {
924 7144 : ret.emplace_back(*msg);
925 : }
926 1513 : return ret;
927 1513 : }
928 :
929 : void
930 3 : Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
931 : {
932 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
933 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
934 3 : if (peditIt != history.pendingEditions.end()) {
935 0 : auto oldBody = sharedCommit->body;
936 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
937 0 : if (sharedCommit->body.at("body").empty())
938 0 : return;
939 0 : history.pendingEditions.erase(peditIt);
940 0 : }
941 3 : if (it != history.quickAccess.end()) {
942 3 : it->second->reactions.emplace_back(sharedCommit->body);
943 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
944 3 : repository_->id(),
945 3 : it->second->id,
946 3 : sharedCommit->body);
947 : } else {
948 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
949 : }
950 : }
951 :
952 : void
953 8 : Conversation::Impl::handleEdition(History& history,
954 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
955 : bool messageReceived) const
956 : {
957 16 : auto editId = sharedCommit->body.at("edit");
958 8 : auto it = history.quickAccess.find(editId);
959 8 : if (it != history.quickAccess.end()) {
960 6 : auto baseCommit = it->second;
961 6 : if (baseCommit) {
962 6 : auto itReact = baseCommit->body.find("react-to");
963 6 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ? "tid" : "body";
964 6 : auto body = sharedCommit->body.at(toReplace);
965 : // Edit reaction
966 6 : if (itReact != baseCommit->body.end()) {
967 1 : baseCommit->body[toReplace] = body; // Replace body if pending
968 1 : it = history.quickAccess.find(itReact->second);
969 1 : auto itPending = history.pendingReactions.find(itReact->second);
970 1 : if (it != history.quickAccess.end()) {
971 1 : baseCommit = it->second; // Base commit
972 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
973 1 : baseCommit->reactions.end(),
974 1 : [&](const auto& reaction) {
975 1 : return reaction.at("id") == editId;
976 : });
977 1 : if (itPreviousReact != baseCommit->reactions.end()) {
978 1 : (*itPreviousReact)[toReplace] = body;
979 1 : if (body.empty()) {
980 1 : baseCommit->reactions.erase(itPreviousReact);
981 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
982 1 : repository_->id(),
983 1 : baseCommit->id,
984 : editId);
985 : }
986 : }
987 0 : } else if (itPending != history.pendingReactions.end()) {
988 : // Else edit if pending
989 0 : auto itReaction = std::find_if(itPending->second.begin(),
990 0 : itPending->second.end(),
991 0 : [&](const auto& reaction) { return reaction.at("id") == editId; });
992 0 : if (itReaction != itPending->second.end()) {
993 0 : (*itReaction)[toReplace] = body;
994 0 : if (body.empty())
995 0 : itPending->second.erase(itReaction);
996 : }
997 : } else {
998 : // Add to pending edtions
999 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1000 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1001 : }
1002 : } else {
1003 : // Normal message
1004 5 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1005 5 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1006 5 : if (toReplace == "tid") {
1007 : // Avoid to replace fileId in client
1008 1 : it->second->body["fileId"] = "";
1009 : }
1010 : // Remove reactions
1011 5 : if (sharedCommit->body.at(toReplace).empty())
1012 2 : it->second->reactions.clear();
1013 5 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1014 : }
1015 6 : }
1016 6 : } else {
1017 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1018 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1019 : }
1020 8 : }
1021 :
1022 : bool
1023 8372 : Conversation::Impl::handleMessage(History& history,
1024 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1025 : bool messageReceived) const
1026 : {
1027 8372 : if (messageReceived) {
1028 : // For a received message, we place it at the beginning of the list
1029 1209 : if (!history.messageList.empty())
1030 965 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1031 1209 : history.messageList.emplace_front(sharedCommit);
1032 : } else {
1033 : // For a loaded message, we load from newest to oldest
1034 : // So we change the parent of the last message.
1035 7163 : if (!history.messageList.empty())
1036 5653 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1037 7160 : history.messageList.emplace_back(sharedCommit);
1038 : }
1039 : // Handle pending reactions/editions
1040 8369 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1041 8366 : if (reactIt != history.pendingReactions.end()) {
1042 0 : for (const auto& commitBody : reactIt->second)
1043 0 : sharedCommit->reactions.emplace_back(commitBody);
1044 0 : history.pendingReactions.erase(reactIt);
1045 : }
1046 8361 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1047 8363 : if (peditIt != history.pendingEditions.end()) {
1048 0 : auto oldBody = sharedCommit->body;
1049 0 : if (sharedCommit->type == "application/data-transfer+json") {
1050 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1051 0 : sharedCommit->body["fileId"] = "";
1052 : } else {
1053 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1054 : }
1055 0 : peditIt->second.pop_front();
1056 0 : for (const auto& commit : peditIt->second) {
1057 0 : sharedCommit->editions.emplace_back(commit->body);
1058 : }
1059 0 : sharedCommit->editions.emplace_back(oldBody);
1060 0 : history.pendingEditions.erase(peditIt);
1061 0 : }
1062 : // Announce to client
1063 8363 : if (messageReceived)
1064 1209 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
1065 8366 : return !messageReceived;
1066 : }
1067 :
1068 : void
1069 8376 : Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const
1070 : {
1071 8376 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1072 8376 : auto currentMessage = message;
1073 :
1074 20380 : while (parentIt != history.quickAccess.end()) {
1075 11999 : const auto& parent = parentIt->second;
1076 13177 : for (const auto& [peer, value] : message->status) {
1077 12076 : auto parentStatusIt = parent->status.find(peer);
1078 12073 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1079 1178 : parent->status[peer] = value;
1080 2356 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
1081 1178 : repository_->id(),
1082 : peer,
1083 1178 : parent->id,
1084 : value);
1085 10898 : } else if (parentStatusIt->second >= value) {
1086 10898 : break;
1087 : }
1088 : }
1089 11998 : currentMessage = parent;
1090 11999 : parentIt = history.quickAccess.find(parent->linearizedParent);
1091 : }
1092 8374 : }
1093 :
1094 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1095 8375 : Conversation::Impl::addToHistory(History& history,
1096 : const std::vector<std::map<std::string, std::string>>& commits,
1097 : bool messageReceived,
1098 : bool commitFromSelf)
1099 : {
1100 : //
1101 : // NOTE: This function makes the following assumptions on its arguments:
1102 : // - The messages in "history" are in reverse chronological order (newest message
1103 : // first, oldest message last).
1104 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1105 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1106 : // They are therefore inserted at the beginning of the message list.
1107 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1108 : // reverse chronological order (newest to oldest) and to be older than the ones in
1109 : // "history". They are therefore appended at the end of the message list.
1110 : //
1111 8375 : auto acc = account_.lock();
1112 8373 : if (!acc)
1113 0 : return {};
1114 8370 : auto username = acc->getUsername();
1115 8374 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1116 0 : std::unique_lock lk(history.mutex);
1117 0 : history.cv.wait(lk, [&] { return !history.loading; });
1118 0 : }
1119 :
1120 : // Only set messages' status on history for client
1121 8373 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1122 :
1123 8373 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1124 16792 : for (const auto& commit : commits) {
1125 16829 : auto commitId = commit.at("id");
1126 8417 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1127 2 : continue; // Already present
1128 8416 : auto typeIt = commit.find("type");
1129 : // Nothing to show for the client, skip
1130 8415 : if (typeIt != commit.end() && typeIt->second == "merge")
1131 35 : continue;
1132 :
1133 8383 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1134 8382 : sharedCommit->fromMapStringString(commit);
1135 :
1136 8384 : if (needToSetMessageStatus) {
1137 919 : std::lock_guard lk(messageStatusMtx_);
1138 : // Check if we already have status information for the commit.
1139 919 : auto itFuture = futureStatus.find(sharedCommit->id);
1140 919 : if (itFuture != futureStatus.end()) {
1141 10 : sharedCommit->status = std::move(itFuture->second);
1142 10 : futureStatus.erase(itFuture);
1143 : }
1144 919 : }
1145 8384 : sharedCommits.emplace_back(sharedCommit);
1146 8416 : }
1147 :
1148 8374 : if (needToSetMessageStatus) {
1149 915 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1150 915 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1151 915 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1152 :
1153 915 : std::lock_guard lk(messageStatusMtx_);
1154 13505 : for (const auto& member : repository_->members()) {
1155 : // For each member, we iterate over the commits to add in reverse chronological
1156 : // order (i.e. from newest to oldest) and set their status from the point of view
1157 : // of that member (as best we can given the information we have).
1158 : //
1159 : // The key assumption made in order to compute the status is that it can never decrease
1160 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1161 : // therefore start by setting the "status" variable below to the lowest possible value,
1162 : // and increase it when we encounter a commit for which it is justified to do so.
1163 : //
1164 : // If messageReceived is true, then the commits we are adding are the most recent in the
1165 : // conversation history, so the lowest possible value is SENDING.
1166 : //
1167 : // If messageReceived is false, then the commits we are adding are older than the ones
1168 : // that are already in the history, so the lowest possible value is the status of the
1169 : // oldest message in the history so far, which is stored in memberToStatus.
1170 12588 : auto status = SENDING;
1171 12588 : if (!messageReceived) {
1172 12 : auto cache = memberToStatus[member.uri];
1173 12 : if (cache > status)
1174 9 : status = cache;
1175 : }
1176 12588 : auto& messagesStatus = messagesStatus_[member.uri];
1177 :
1178 25188 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1179 12601 : auto sharedCommit = *it;
1180 12600 : auto previousStatus = status;
1181 12600 : auto& commitStatus = sharedCommit->status[member.uri];
1182 :
1183 : // Compute status for the current commit.
1184 12600 : if (status < SENT && messagesStatus["fetched"] == sharedCommit->id) {
1185 10 : status = SENT;
1186 : }
1187 12596 : if (messagesStatus["read"] == sharedCommit->id) {
1188 1 : status = DISPLAYED;
1189 : }
1190 12598 : if (member.uri == sharedCommit->body.at("author")) {
1191 918 : status = DISPLAYED;
1192 : }
1193 12599 : if (status < commitStatus) {
1194 0 : status = commitStatus;
1195 : }
1196 :
1197 : // Store computed value.
1198 12599 : commitStatus = status;
1199 :
1200 : // Update messagesStatus_ if needed.
1201 12599 : if (previousStatus == SENDING && status >= SENT) {
1202 915 : messagesStatus["fetched"] = sharedCommit->id;
1203 : }
1204 12599 : if (previousStatus <= SENT && status == DISPLAYED) {
1205 906 : messagesStatus["read"] = sharedCommit->id;
1206 : }
1207 12599 : }
1208 :
1209 12590 : if (!messageReceived) {
1210 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1211 12 : memberToStatus[member.uri] = status;
1212 : }
1213 915 : }
1214 915 : }
1215 :
1216 8374 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1217 16748 : for (const auto& sharedCommit : sharedCommits) {
1218 8376 : history.quickAccess[sharedCommit->id] = sharedCommit;
1219 :
1220 8380 : auto reactToIt = sharedCommit->body.find("react-to");
1221 8380 : auto editIt = sharedCommit->body.find("edit");
1222 8380 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1223 3 : handleReaction(history, sharedCommit);
1224 8371 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1225 8 : handleEdition(history, sharedCommit, messageReceived);
1226 8365 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1227 7153 : messages.emplace_back(sharedCommit);
1228 : }
1229 8374 : rectifyStatus(sharedCommit, history);
1230 : }
1231 :
1232 8372 : return messages;
1233 8373 : }
1234 :
1235 186 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1236 : ConversationMode mode,
1237 186 : const std::string& otherMember)
1238 186 : : pimpl_ {new Impl {account, mode, otherMember}}
1239 186 : {}
1240 :
1241 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
1242 18 : : pimpl_ {new Impl {account, conversationId}}
1243 18 : {}
1244 :
1245 191 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1246 : const std::string& remoteDevice,
1247 191 : const std::string& conversationId)
1248 191 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1249 191 : {}
1250 :
1251 389 : Conversation::~Conversation() {}
1252 :
1253 : std::string
1254 4829 : Conversation::id() const
1255 : {
1256 4829 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1257 : }
1258 :
1259 : void
1260 138 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1261 : {
1262 : try {
1263 138 : if (mode() == ConversationMode::ONE_TO_ONE) {
1264 : // Only authorize to add left members
1265 1 : auto initialMembers = getInitialMembers();
1266 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1267 1 : if (it == initialMembers.end()) {
1268 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1269 1 : cb(false, "");
1270 1 : return;
1271 : }
1272 1 : }
1273 0 : } catch (const std::exception& e) {
1274 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1275 0 : cb(false, "");
1276 0 : return;
1277 0 : }
1278 137 : if (isMember(contactUri, true)) {
1279 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1280 0 : cb(false, "");
1281 0 : return;
1282 : }
1283 137 : if (isBanned(contactUri)) {
1284 2 : if (pimpl_->isAdmin()) {
1285 1 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1286 1 : if (auto sthis = w.lock()) {
1287 1 : auto members = sthis->pimpl_->repository_->members();
1288 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1289 1 : if (type.empty()) {
1290 0 : cb(false, {});
1291 0 : return;
1292 : }
1293 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1294 2 : }
1295 : });
1296 : } else {
1297 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1298 1 : cb(false, "");
1299 : }
1300 2 : return;
1301 : }
1302 :
1303 135 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1304 135 : if (auto sthis = w.lock()) {
1305 : // Add member files and commit
1306 135 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1307 135 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1308 135 : sthis->pimpl_->announce(commit, true);
1309 135 : lk.unlock();
1310 135 : if (cb)
1311 135 : cb(!commit.empty(), commit);
1312 270 : }
1313 135 : });
1314 : }
1315 :
1316 : std::shared_ptr<dhtnet::ChannelSocket>
1317 4545 : Conversation::gitSocket(const DeviceId& deviceId) const
1318 : {
1319 4545 : return pimpl_->gitSocket(deviceId);
1320 : }
1321 :
1322 : void
1323 2039 : Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1324 : {
1325 2039 : pimpl_->addGitSocket(deviceId, socket);
1326 2039 : }
1327 :
1328 : void
1329 814 : Conversation::removeGitSocket(const DeviceId& deviceId)
1330 : {
1331 814 : pimpl_->removeGitSocket(deviceId);
1332 814 : }
1333 :
1334 : void
1335 380 : Conversation::shutdownConnections()
1336 : {
1337 380 : pimpl_->fallbackTimer_->cancel();
1338 380 : pimpl_->gitSocketList_.clear();
1339 380 : if (pimpl_->swarmManager_)
1340 380 : pimpl_->swarmManager_->shutdown();
1341 380 : std::lock_guard lk(pimpl_->membersMtx_);
1342 380 : pimpl_->checkedMembers_.clear();
1343 380 : }
1344 :
1345 : void
1346 0 : Conversation::connectivityChanged()
1347 : {
1348 0 : if (pimpl_->swarmManager_)
1349 0 : pimpl_->swarmManager_->maintainBuckets();
1350 0 : }
1351 :
1352 : std::vector<jami::DeviceId>
1353 0 : Conversation::getDeviceIdList() const
1354 : {
1355 0 : return pimpl_->swarmManager_->getAllNodes();
1356 : }
1357 :
1358 : std::shared_ptr<Typers>
1359 9 : Conversation::typers() const
1360 : {
1361 9 : return pimpl_->typers_;
1362 : }
1363 :
1364 : void
1365 0 : Conversation::announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf)
1366 : {
1367 0 : pimpl_->announce(commits, commitFromSelf);
1368 0 : }
1369 :
1370 : bool
1371 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1372 : {
1373 0 : if (!pimpl_->swarmManager_)
1374 0 : return false;
1375 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1376 : }
1377 :
1378 : void
1379 1 : Conversation::Impl::voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb)
1380 : {
1381 : // Check if admin
1382 1 : if (!isAdmin()) {
1383 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1384 0 : cb(false, {});
1385 0 : return;
1386 : }
1387 :
1388 : // Vote for removal
1389 1 : std::unique_lock lk(writeMtx_);
1390 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1391 1 : if (voteCommit.empty()) {
1392 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1393 0 : cb(false, "");
1394 0 : return;
1395 : }
1396 :
1397 1 : auto lastId = voteCommit;
1398 1 : std::vector<std::string> commits;
1399 1 : commits.emplace_back(voteCommit);
1400 :
1401 : // If admin, check vote
1402 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1403 1 : if (!resolveCommit.empty()) {
1404 1 : commits.emplace_back(resolveCommit);
1405 1 : lastId = resolveCommit;
1406 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1407 : }
1408 1 : announce(commits, true);
1409 1 : lk.unlock();
1410 1 : if (cb)
1411 1 : cb(!lastId.empty(), lastId);
1412 1 : }
1413 :
1414 : void
1415 14 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1416 : {
1417 42 : dht::ThreadPool::io().run(
1418 28 : [w = weak(), contactUri = std::move(contactUri), isDevice = std::move(isDevice), cb = std::move(cb)] {
1419 14 : if (auto sthis = w.lock()) {
1420 : // Check if admin
1421 14 : if (!sthis->pimpl_->isAdmin()) {
1422 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1423 1 : cb(false, {});
1424 2 : return;
1425 : }
1426 :
1427 : // Get current user type
1428 13 : std::string type;
1429 13 : if (isDevice) {
1430 0 : type = "devices";
1431 : } else {
1432 13 : auto members = sthis->pimpl_->repository_->members();
1433 29 : for (const auto& member : members) {
1434 29 : if (member.uri == contactUri) {
1435 13 : if (member.role == MemberRole::INVITED) {
1436 2 : type = "invited";
1437 11 : } else if (member.role == MemberRole::ADMIN) {
1438 1 : type = "admins";
1439 10 : } else if (member.role == MemberRole::MEMBER) {
1440 10 : type = "members";
1441 : }
1442 13 : break;
1443 : }
1444 : }
1445 13 : if (type.empty()) {
1446 0 : cb(false, {});
1447 0 : return;
1448 : }
1449 13 : }
1450 :
1451 : // Vote for removal
1452 13 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1453 13 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1454 13 : if (voteCommit.empty()) {
1455 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1456 1 : cb(false, "");
1457 1 : return;
1458 : }
1459 :
1460 12 : auto lastId = voteCommit;
1461 12 : std::vector<std::string> commits;
1462 12 : commits.emplace_back(voteCommit);
1463 :
1464 : // If admin, check vote
1465 24 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1466 12 : if (!resolveCommit.empty()) {
1467 12 : commits.emplace_back(resolveCommit);
1468 12 : lastId = resolveCommit;
1469 12 : JAMI_WARN("Vote solved for %s. %s banned", contactUri.c_str(), isDevice ? "Device" : "Member");
1470 12 : sthis->pimpl_->disconnectFromPeer(contactUri);
1471 : }
1472 :
1473 12 : sthis->pimpl_->announce(commits, true);
1474 12 : lk.unlock();
1475 12 : cb(!lastId.empty(), lastId);
1476 29 : }
1477 : });
1478 14 : }
1479 :
1480 : std::vector<std::map<std::string, std::string>>
1481 428 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1482 : {
1483 428 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1484 : }
1485 :
1486 : std::set<std::string>
1487 2023 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1488 : {
1489 2023 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1490 : }
1491 :
1492 : std::vector<NodeId>
1493 1748 : Conversation::peersToSyncWith() const
1494 : {
1495 1748 : auto s = pimpl_->swarmManager_->getConnectedNodes();
1496 13218 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1497 11469 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1498 4956 : s.emplace_back(deviceId);
1499 1748 : return s;
1500 0 : }
1501 :
1502 : bool
1503 1748 : Conversation::isBootstrapped() const
1504 : {
1505 1748 : return pimpl_->swarmManager_->isConnected();
1506 : }
1507 :
1508 : std::string
1509 13384 : Conversation::uriFromDevice(const std::string& deviceId) const
1510 : {
1511 13384 : return pimpl_->repository_->uriFromDevice(deviceId);
1512 : }
1513 :
1514 : void
1515 0 : Conversation::monitor()
1516 : {
1517 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1518 0 : }
1519 :
1520 : std::string
1521 185 : Conversation::join()
1522 : {
1523 185 : return pimpl_->repository_->join();
1524 : }
1525 :
1526 : bool
1527 3310 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1528 : {
1529 3310 : auto uriCrt = uri + ".crt"sv;
1530 6620 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::ADMINS / uriCrt)
1531 6620 : || std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::MEMBERS / uriCrt)) {
1532 2239 : return true;
1533 : }
1534 1071 : if (includeInvited) {
1535 886 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::INVITED / uri)) {
1536 609 : return true;
1537 : }
1538 277 : if (mode() == ConversationMode::ONE_TO_ONE) {
1539 3 : for (const auto& member : getInitialMembers()) {
1540 2 : if (member == uri)
1541 0 : return true;
1542 1 : }
1543 : }
1544 : }
1545 462 : return false;
1546 3310 : }
1547 :
1548 : bool
1549 6476 : Conversation::isBanned(const std::string& uri) const
1550 : {
1551 6476 : return !pimpl_->bannedType(uri).empty();
1552 : }
1553 :
1554 : void
1555 0 : Conversation::sendMessage(
1556 : std::string&& message, const std::string& type, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1557 : {
1558 0 : Json::Value json;
1559 0 : json["body"] = std::move(message);
1560 0 : json["type"] = type;
1561 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1562 0 : }
1563 :
1564 : void
1565 140 : Conversation::sendMessage(Json::Value&& value, const std::string& replyTo, OnCommitCb&& onCommit, OnDoneCb&& cb)
1566 : {
1567 140 : if (!replyTo.empty()) {
1568 2 : auto commit = pimpl_->repository_->getCommit(replyTo);
1569 2 : if (commit == std::nullopt) {
1570 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1571 1 : return;
1572 : }
1573 1 : value["reply-to"] = replyTo;
1574 2 : }
1575 556 : dht::ThreadPool::io().run(
1576 417 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1577 139 : if (auto sthis = w.lock()) {
1578 139 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1579 139 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1580 139 : lk.unlock();
1581 139 : if (onCommit)
1582 13 : onCommit(commit);
1583 139 : sthis->pimpl_->announce(commit, true);
1584 139 : if (cb)
1585 139 : cb(!commit.empty(), commit);
1586 278 : }
1587 139 : });
1588 : }
1589 :
1590 : void
1591 0 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1592 : {
1593 0 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1594 0 : if (auto sthis = w.lock()) {
1595 0 : std::vector<std::string> commits;
1596 0 : commits.reserve(messages.size());
1597 0 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1598 0 : for (const auto& message : messages) {
1599 0 : auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1600 0 : commits.emplace_back(std::move(commit));
1601 0 : }
1602 0 : lk.unlock();
1603 0 : sthis->pimpl_->announce(commits, true);
1604 0 : if (cb)
1605 0 : cb(commits);
1606 0 : }
1607 0 : });
1608 0 : }
1609 :
1610 : std::optional<std::map<std::string, std::string>>
1611 12226 : Conversation::getCommit(const std::string& commitId) const
1612 : {
1613 12226 : auto commit = pimpl_->repository_->getCommit(commitId);
1614 12226 : if (commit == std::nullopt)
1615 1628 : return std::nullopt;
1616 10598 : return pimpl_->repository_->convCommitToMap(*commit);
1617 12225 : }
1618 :
1619 : void
1620 0 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1621 : {
1622 0 : if (!cb)
1623 0 : return;
1624 0 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1625 0 : if (auto sthis = w.lock()) {
1626 0 : cb(sthis->pimpl_->loadMessages(options));
1627 0 : }
1628 0 : });
1629 : }
1630 :
1631 : void
1632 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1633 : {
1634 2 : if (!cb)
1635 0 : return;
1636 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1637 2 : if (auto sthis = w.lock()) {
1638 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1639 2 : auto result = sthis->pimpl_->loadMessages2(options);
1640 2 : lk.unlock();
1641 2 : cb(std::move(result));
1642 4 : }
1643 2 : });
1644 : }
1645 :
1646 : void
1647 0 : Conversation::clearCache()
1648 : {
1649 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1650 0 : pimpl_->loadedHistory_.messageList.clear();
1651 0 : pimpl_->loadedHistory_.quickAccess.clear();
1652 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1653 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1654 : {
1655 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1656 0 : pimpl_->memberToStatus.clear();
1657 0 : }
1658 0 : }
1659 :
1660 : std::string
1661 2199 : Conversation::lastCommitId() const
1662 : {
1663 : {
1664 2199 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1665 2198 : if (!pimpl_->loadedHistory_.messageList.empty())
1666 3608 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1667 2199 : }
1668 395 : LogOptions options;
1669 395 : options.nbOfCommits = 1;
1670 395 : options.skipMerge = true;
1671 395 : History optHistory;
1672 395 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1673 395 : auto res = pimpl_->loadMessages2(options, &optHistory);
1674 395 : if (res.empty())
1675 2 : return {};
1676 786 : return (*optHistory.messageList.begin())->id;
1677 395 : }
1678 :
1679 : bool
1680 1854 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1681 : {
1682 1854 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1683 5561 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId,
1684 3706 : std::deque<std::pair<std::string, OnPullCb>>());
1685 1854 : auto& pullcbs = it->second;
1686 1854 : auto itPull = std::find_if(pullcbs.begin(), pullcbs.end(), [&](const auto& elem) {
1687 1 : return std::get<0>(elem) == commitId;
1688 3708 : });
1689 1854 : if (itPull != pullcbs.end()) {
1690 4 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1691 : pimpl_->toString(),
1692 : deviceId,
1693 : commitId);
1694 1 : cb(false);
1695 1 : return false;
1696 : }
1697 7411 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1698 1853 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1699 1853 : if (notInProgress)
1700 1845 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1701 1845 : if (auto sthis_ = w.lock())
1702 1845 : sthis_->pimpl_->pull(deviceId);
1703 1845 : });
1704 1853 : return true;
1705 1854 : }
1706 :
1707 : void
1708 1845 : Conversation::Impl::pull(const std::string& deviceId)
1709 : {
1710 1845 : auto& repo = repository_;
1711 :
1712 1845 : std::string commitId;
1713 1845 : OnPullCb cb;
1714 : while (true) {
1715 : {
1716 3698 : std::lock_guard lk(pullcbsMtx_);
1717 3698 : auto it = fetchingRemotes_.find(deviceId);
1718 3698 : if (it == fetchingRemotes_.end()) {
1719 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1720 0 : break;
1721 : }
1722 3698 : auto& pullcbs = it->second;
1723 3698 : if (pullcbs.empty()) {
1724 1845 : fetchingRemotes_.erase(it);
1725 1845 : break;
1726 : }
1727 1853 : auto& elem = pullcbs.front();
1728 1853 : commitId = std::move(std::get<0>(elem));
1729 1853 : cb = std::move(std::get<1>(elem));
1730 1853 : pullcbs.pop_front();
1731 3698 : }
1732 : // If recently fetched, the commit can already be there, so no need to do complex operations
1733 1853 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1734 53 : cb(true);
1735 79 : continue;
1736 : }
1737 : // Pull from remote
1738 1800 : auto fetched = repo->fetch(deviceId);
1739 1800 : if (!fetched) {
1740 26 : cb(false);
1741 26 : continue;
1742 : }
1743 :
1744 1774 : auto oldHead = repo->getHead();
1745 :
1746 1774 : std::unique_lock lk(writeMtx_);
1747 : auto commits = repo->mergeHistory(deviceId,
1748 1779 : [this](const std::string& peerUri) { this->disconnectFromPeer(peerUri); });
1749 1774 : if (!commits.empty()) {
1750 907 : announce(commits);
1751 : }
1752 1774 : lk.unlock();
1753 :
1754 1774 : bool commitFound = false;
1755 1774 : if (commitId != "") {
1756 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1757 : // We need to check if we actually got it; the fact that the fetch above was
1758 : // successful doesn't guarantee that we did.
1759 1413 : for (const auto& commit : commits) {
1760 914 : if (commit.at("id") == commitId) {
1761 900 : commitFound = true;
1762 900 : break;
1763 : }
1764 : }
1765 : } else {
1766 375 : commitFound = true;
1767 : }
1768 1774 : if (!commitFound)
1769 1996 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1770 : deviceId,
1771 : commitId);
1772 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1773 : // for commit `commitId` to other members of the swarm. It's important that we only
1774 : // send these notifications if we actually have the commit. Otherwise, we can end up
1775 : // in a situation where the members of the swarm keep sending notifications to each
1776 : // other for a commit that none of them have (note that we are unable to rule this out, as
1777 : // nothing prevents a malicious user from intentionally sending a notification with
1778 : // a fake commit ID).
1779 1774 : if (cb)
1780 1774 : cb(commitFound);
1781 :
1782 : // Announce if profile changed
1783 1774 : if (!commits.empty()) {
1784 1814 : auto diffStats = repo->diffStats("HEAD", oldHead);
1785 907 : auto changedFiles = repo->changedFiles(diffStats);
1786 907 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") != changedFiles.end()) {
1787 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(accountId_,
1788 5 : repo->id(),
1789 10 : repo->infos());
1790 : }
1791 907 : }
1792 3627 : }
1793 1845 : }
1794 :
1795 : void
1796 1854 : Conversation::sync(const std::string& member, const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1797 : {
1798 1854 : pull(deviceId, std::move(cb), commitId);
1799 1854 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1800 1854 : auto sthis = w.lock();
1801 : // For waiting request, downloadFile
1802 1855 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1803 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1804 1854 : }
1805 1853 : });
1806 1854 : }
1807 :
1808 : std::map<std::string, std::string>
1809 268 : Conversation::generateInvitation() const
1810 : {
1811 : // Invite the new member to the conversation
1812 268 : Json::Value root;
1813 268 : auto& metadata = root[ConversationMapKeys::METADATAS];
1814 542 : for (const auto& [k, v] : infos()) {
1815 274 : if (v.size() >= 64000) {
1816 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1817 0 : continue;
1818 0 : }
1819 274 : metadata[k] = v;
1820 268 : }
1821 268 : root[ConversationMapKeys::CONVERSATIONID] = id();
1822 804 : return {{"application/invite+json", json::toString(root)}};
1823 268 : }
1824 :
1825 : std::string
1826 7 : Conversation::leave()
1827 : {
1828 7 : setRemovingFlag();
1829 7 : std::lock_guard lk(pimpl_->writeMtx_);
1830 14 : return pimpl_->repository_->leave();
1831 7 : }
1832 :
1833 : void
1834 10 : Conversation::setRemovingFlag()
1835 : {
1836 10 : pimpl_->isRemoving_ = true;
1837 10 : }
1838 :
1839 : bool
1840 3825 : Conversation::isRemoving()
1841 : {
1842 3825 : return pimpl_->isRemoving_;
1843 : }
1844 :
1845 : void
1846 21 : Conversation::erase()
1847 : {
1848 21 : if (pimpl_->conversationDataPath_ != "")
1849 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1850 21 : if (!pimpl_->repository_)
1851 0 : return;
1852 21 : std::lock_guard lk(pimpl_->writeMtx_);
1853 21 : pimpl_->repository_->erase();
1854 21 : }
1855 :
1856 : ConversationMode
1857 4754 : Conversation::mode() const
1858 : {
1859 4754 : return pimpl_->repository_->mode();
1860 : }
1861 :
1862 : std::vector<std::string>
1863 30 : Conversation::getInitialMembers() const
1864 : {
1865 30 : return pimpl_->repository_->getInitialMembers();
1866 : }
1867 :
1868 : bool
1869 0 : Conversation::isInitialMember(const std::string& uri) const
1870 : {
1871 0 : auto members = getInitialMembers();
1872 0 : return std::find(members.begin(), members.end(), uri) != members.end();
1873 0 : }
1874 :
1875 : void
1876 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
1877 : {
1878 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
1879 9 : if (auto sthis = w.lock()) {
1880 9 : auto& repo = sthis->pimpl_->repository_;
1881 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1882 9 : auto commit = repo->updateInfos(map);
1883 9 : sthis->pimpl_->announce(commit, true);
1884 9 : lk.unlock();
1885 9 : if (cb)
1886 9 : cb(!commit.empty(), commit);
1887 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
1888 9 : repo->id(),
1889 18 : repo->infos());
1890 18 : }
1891 9 : });
1892 9 : }
1893 :
1894 : std::map<std::string, std::string>
1895 315 : Conversation::infos() const
1896 : {
1897 315 : return pimpl_->repository_->infos();
1898 : }
1899 :
1900 : void
1901 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
1902 : {
1903 8 : const auto& filePath = pimpl_->preferencesPath_;
1904 8 : auto prefs = map;
1905 8 : auto itLast = prefs.find(LAST_MODIFIED);
1906 8 : if (itLast != prefs.end()) {
1907 3 : std::error_code ec;
1908 3 : if (std::filesystem::is_regular_file(filePath, ec)) {
1909 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
1910 : try {
1911 1 : if (lastModified >= to_int<uint64_t>(itLast->second))
1912 0 : return;
1913 0 : } catch (...) {
1914 0 : return;
1915 0 : }
1916 : }
1917 3 : prefs.erase(itLast);
1918 : }
1919 :
1920 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
1921 8 : msgpack::pack(file, prefs);
1922 8 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_, id(), std::move(prefs));
1923 8 : }
1924 :
1925 : std::map<std::string, std::string>
1926 108 : Conversation::preferences(bool includeLastModified) const
1927 : {
1928 : try {
1929 108 : std::map<std::string, std::string> preferences;
1930 108 : const auto& filePath = pimpl_->preferencesPath_;
1931 206 : auto file = fileutils::loadFile(filePath);
1932 10 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
1933 10 : oh.get().convert(preferences);
1934 10 : if (includeLastModified)
1935 7 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
1936 10 : return preferences;
1937 206 : } catch (const std::exception& e) {
1938 98 : }
1939 98 : return {};
1940 : }
1941 :
1942 : std::vector<uint8_t>
1943 0 : Conversation::vCard() const
1944 : {
1945 : try {
1946 0 : return fileutils::loadFile(pimpl_->repoPath_ / "profile.vcf");
1947 0 : } catch (...) {
1948 0 : }
1949 0 : return {};
1950 : }
1951 :
1952 : std::shared_ptr<TransferManager>
1953 1960 : Conversation::dataTransfer() const
1954 : {
1955 1960 : return pimpl_->transferManager_;
1956 : }
1957 :
1958 : bool
1959 13 : Conversation::onFileChannelRequest(const std::string& member,
1960 : const std::string& fileId,
1961 : std::filesystem::path& path,
1962 : std::string& sha3sum) const
1963 : {
1964 13 : if (!isMember(member))
1965 0 : return false;
1966 :
1967 13 : auto sep = fileId.find('_');
1968 13 : if (sep == std::string::npos)
1969 0 : return false;
1970 :
1971 13 : auto interactionId = fileId.substr(0, sep);
1972 13 : auto commit = getCommit(interactionId);
1973 26 : if (commit == std::nullopt || commit->find("type") == commit->end() || commit->find("tid") == commit->end()
1974 26 : || commit->find("sha3sum") == commit->end() || commit->at("type") != "application/data-transfer+json") {
1975 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
1976 : pimpl_->accountId_,
1977 : member,
1978 : interactionId);
1979 0 : return false;
1980 : }
1981 :
1982 13 : path = dataTransfer()->path(fileId);
1983 13 : sha3sum = commit->at("sha3sum");
1984 :
1985 13 : return true;
1986 13 : }
1987 :
1988 : bool
1989 14 : Conversation::downloadFile(const std::string& interactionId,
1990 : const std::string& fileId,
1991 : const std::string& path,
1992 : const std::string&,
1993 : const std::string& deviceId)
1994 : {
1995 14 : auto commit = getCommit(interactionId);
1996 14 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
1997 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
1998 0 : return false;
1999 : }
2000 14 : auto tid = commit->find("tid");
2001 14 : auto sha3sum = commit->find("sha3sum");
2002 14 : auto size_str = commit->find("totalSize");
2003 :
2004 14 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2005 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2006 0 : return false;
2007 : }
2008 :
2009 14 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2010 14 : if (totalSize < 0) {
2011 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2012 0 : return false;
2013 : }
2014 :
2015 : // Be sure to not lock conversation
2016 28 : dht::ThreadPool().io().run(
2017 14 : [w = weak(), deviceId, fileId, interactionId, sha3sum = sha3sum->second, path, totalSize] {
2018 14 : if (auto shared = w.lock()) {
2019 14 : std::filesystem::path filePath(path);
2020 14 : if (filePath.empty()) {
2021 0 : filePath = shared->dataTransfer()->path(fileId);
2022 : }
2023 :
2024 14 : if (fileutils::size(filePath) == totalSize) {
2025 1 : if (fileutils::sha3File(filePath) == sha3sum) {
2026 4 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2027 1 : return;
2028 : }
2029 : }
2030 :
2031 13 : std::filesystem::path tempFilePath(filePath);
2032 13 : tempFilePath += ".tmp";
2033 13 : auto start = fileutils::size(tempFilePath);
2034 13 : if (start < 0)
2035 11 : start = 0;
2036 13 : size_t end = 0;
2037 :
2038 13 : auto acc = shared->pimpl_->account_.lock();
2039 13 : if (!acc)
2040 0 : return;
2041 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2042 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2043 28 : }
2044 : });
2045 14 : return true;
2046 14 : }
2047 :
2048 : void
2049 1114 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2050 : {
2051 1114 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2052 1114 : auto sthis = w.lock();
2053 1114 : if (!sthis)
2054 0 : return;
2055 : // Update fetched for Uri
2056 1113 : auto uri = sthis->uriFromDevice(deviceId);
2057 1114 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2058 45 : return;
2059 : // When a user fetches a commit, the message is sent for this person
2060 1069 : sthis->pimpl_->updateStatus(uri,
2061 : libjami::Account::MessageStates::SENT,
2062 1069 : commitId,
2063 2138 : std::to_string(std::time(nullptr)),
2064 : true);
2065 1159 : });
2066 1114 : }
2067 :
2068 : void
2069 1114 : Conversation::Impl::updateStatus(const std::string& uri,
2070 : libjami::Account::MessageStates st,
2071 : const std::string& commitId,
2072 : const std::string& ts,
2073 : bool emit)
2074 : {
2075 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer
2076 : // send us a status and will emit to other connected devices.
2077 1114 : LogOptions options;
2078 1114 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2079 : {
2080 : // Update internal structures.
2081 1114 : std::lock_guard lk(messageStatusMtx_);
2082 1114 : auto& status = messagesStatus_[uri];
2083 1114 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2084 1114 : if (oldStatus == commitId)
2085 2 : return; // Nothing to do
2086 1112 : options.to = oldStatus;
2087 1112 : options.from = commitId;
2088 1112 : oldStatus = commitId;
2089 1112 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2090 1112 : saveStatus();
2091 1112 : if (emit)
2092 1083 : newStatus[uri].insert(status.begin(), status.end());
2093 1114 : }
2094 1112 : if (emit && messageStatusCb_) {
2095 1083 : messageStatusCb_(newStatus);
2096 : }
2097 : // Update messages status for all commit between the old and new one
2098 1112 : options.logIfNotFound = false;
2099 1112 : options.fastLog = true;
2100 1112 : History optHistory;
2101 1112 : std::unique_lock lk(optHistory.mutex); // Avoid to announce messages while updating status.
2102 1112 : auto res = loadMessages2(options, &optHistory);
2103 1112 : std::unique_lock mlk(messageStatusMtx_);
2104 1112 : std::vector<std::pair<std::string, int32_t>> statusToUpdate;
2105 1112 : if (res.size() == 0) {
2106 : // In this case, commit is not received yet, so we cache it
2107 7 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2108 : }
2109 7853 : for (const auto& [cid, _] : optHistory.quickAccess) {
2110 6741 : auto message = loadedHistory_.quickAccess.find(cid);
2111 6741 : if (message != loadedHistory_.quickAccess.end()) {
2112 : // Update message and emit to client,
2113 2731 : if (static_cast<int32_t>(st) > message->second->status[uri]) {
2114 2727 : message->second->status[uri] = static_cast<int32_t>(st);
2115 2727 : statusToUpdate.emplace_back(cid, static_cast<int32_t>(st));
2116 : }
2117 : } else {
2118 : // In this case, commit is not loaded by client, so we cache it
2119 : // No need to emit to client, they will get a correct status on load.
2120 4010 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2121 : }
2122 : }
2123 1112 : mlk.unlock();
2124 1112 : lk.unlock();
2125 3839 : for (const auto& [cid, status] : statusToUpdate)
2126 5454 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
2127 2727 : repository_->id(),
2128 : uri,
2129 : cid,
2130 : static_cast<int>(status));
2131 1116 : }
2132 :
2133 : bool
2134 17 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2135 : {
2136 17 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2137 17 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2138 1 : return false; // Nothing to do
2139 16 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2140 16 : auto sthis = w.lock();
2141 16 : if (!sthis)
2142 0 : return;
2143 16 : sthis->pimpl_->updateStatus(uri,
2144 : libjami::Account::MessageStates::DISPLAYED,
2145 16 : interactionId,
2146 32 : std::to_string(std::time(nullptr)),
2147 : true);
2148 16 : });
2149 16 : return true;
2150 17 : }
2151 :
2152 : std::map<std::string, std::map<std::string, std::string>>
2153 90 : Conversation::messageStatus() const
2154 : {
2155 90 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2156 180 : return pimpl_->messagesStatus_;
2157 90 : }
2158 :
2159 : void
2160 46 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2161 : {
2162 46 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2163 46 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2164 98 : for (const auto& [uri, status] : messageStatus) {
2165 52 : auto& oldMs = pimpl_->messagesStatus_[uri];
2166 52 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2167 26 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2168 0 : stVec.emplace_back(libjami::Account::MessageStates::SENT,
2169 : uri,
2170 26 : status.at("fetched"),
2171 52 : status.at("fetched_ts"));
2172 : }
2173 : }
2174 52 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2175 3 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2176 0 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED,
2177 : uri,
2178 3 : status.at("read"),
2179 6 : status.at("read_ts"));
2180 : }
2181 : }
2182 : }
2183 46 : lk.unlock();
2184 :
2185 75 : for (const auto& [status, uri, commitId, ts] : stVec) {
2186 29 : pimpl_->updateStatus(uri, status, commitId, ts);
2187 : }
2188 46 : }
2189 :
2190 : void
2191 389 : Conversation::onMessageStatusChanged(
2192 : const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2193 : {
2194 389 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2195 389 : pimpl_->messageStatusCb_ = cb;
2196 389 : }
2197 :
2198 : #ifdef LIBJAMI_TEST
2199 : void
2200 515 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2201 : {
2202 515 : pimpl_->bootstrapCbTest_ = cb;
2203 515 : }
2204 : #endif
2205 :
2206 : void
2207 625 : Conversation::checkBootstrapMember(const asio::error_code& ec, std::vector<std::map<std::string, std::string>> members)
2208 : {
2209 625 : if (ec == asio::error::operation_aborted)
2210 329 : return;
2211 595 : auto acc = pimpl_->account_.lock();
2212 595 : if (pimpl_->swarmManager_->isConnected() or not acc)
2213 9 : return;
2214 : // We bootstrap the DRT with devices who already wrote in the repository.
2215 : // However, in a conversation, a large number of devices may just watch
2216 : // the conversation, but never write any message.
2217 586 : std::unique_lock lock(pimpl_->membersMtx_);
2218 :
2219 586 : std::string uri;
2220 880 : while (!members.empty()) {
2221 302 : auto member = std::move(members.back());
2222 302 : members.pop_back();
2223 302 : uri = std::move(member.at("uri"));
2224 302 : if (uri != pimpl_->userId_ && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2225 8 : break;
2226 302 : }
2227 290 : auto fallbackFailed = [](auto sthis) {
2228 1160 : JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.", sthis->pimpl_->toString());
2229 : #ifdef LIBJAMI_TEST
2230 290 : if (sthis->pimpl_->bootstrapCbTest_)
2231 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2232 : #endif
2233 290 : };
2234 : // If members is empty, we finished the fallback un-successfully
2235 586 : if (members.empty() && uri.empty()) {
2236 290 : lock.unlock();
2237 290 : fallbackFailed(this);
2238 290 : return;
2239 : }
2240 :
2241 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2242 296 : pimpl_->checkedMembers_.emplace(uri);
2243 296 : auto devices = std::make_shared<std::vector<NodeId>>();
2244 1184 : acc->forEachDevice(
2245 592 : dht::InfoHash(uri),
2246 298 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2247 : // Test if already sent
2248 298 : if (auto sthis = w.lock()) {
2249 296 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2250 284 : devices->emplace_back(dev->getLongId());
2251 298 : }
2252 298 : },
2253 296 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed = std::move(fallbackFailed)](bool ok) {
2254 296 : auto sthis = w.lock();
2255 296 : if (!sthis)
2256 2 : return;
2257 294 : auto checkNext = true;
2258 294 : if (ok && devices->size() != 0) {
2259 : #ifdef LIBJAMI_TEST
2260 284 : if (sthis->pimpl_->bootstrapCbTest_)
2261 6 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2262 : #endif
2263 1136 : JAMI_LOG("{} Bootstrap: Fallback with member: {}", sthis->pimpl_->toString(), uri);
2264 284 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2265 0 : checkNext = false;
2266 : }
2267 294 : if (checkNext) {
2268 : // Check next member
2269 294 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2270 588 : sthis->pimpl_->fallbackTimer_->async_wait(
2271 588 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2272 : } else {
2273 : // In this case, all members are checked. Fallback failed
2274 0 : fallbackFailed(sthis);
2275 : }
2276 296 : });
2277 1175 : }
2278 :
2279 : void
2280 514 : Conversation::bootstrap(std::function<void()> onBootstrapped, const std::vector<DeviceId>& knownDevices)
2281 : {
2282 514 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2283 514 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2284 0 : return;
2285 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2286 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2287 : // If it works, the callback onConnectionChanged will be called with ok=true
2288 514 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2289 514 : std::vector<DeviceId> devices = knownDevices;
2290 1533 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2291 1019 : if (!isBanned(member))
2292 1019 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2293 514 : }
2294 2056 : JAMI_DEBUG("{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2295 : // set callback
2296 331 : auto fallback = [](std::shared_ptr<Conversation> sthis, bool now = false) {
2297 : // Fallback
2298 331 : auto acc = sthis->pimpl_->account_.lock();
2299 331 : if (!acc)
2300 0 : return;
2301 331 : auto members = sthis->getMembers(false, false);
2302 331 : std::shuffle(members.begin(), members.end(), acc->rand);
2303 331 : if (now) {
2304 294 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2305 : } else {
2306 37 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2307 111 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2308 74 : - std::chrono::seconds(timeForBootstrap));
2309 148 : JAMI_DEBUG("{} Fallback in {} seconds", sthis->pimpl_->toString(), (20 - timeForBootstrap));
2310 : }
2311 662 : sthis->pimpl_->fallbackTimer_->async_wait(
2312 662 : std::bind(&Conversation::checkBootstrapMember, sthis, std::placeholders::_1, std::move(members)));
2313 331 : };
2314 :
2315 514 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2316 : // This will call methods from accounts, so trigger on another thread.
2317 379 : dht::ThreadPool::io().run([w, ok, fallback = std::move(fallback)] {
2318 379 : auto sthis = w.lock();
2319 379 : if (!sthis)
2320 0 : return;
2321 379 : if (ok) {
2322 : // Bootstrap succeeded!
2323 : {
2324 342 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2325 342 : sthis->pimpl_->checkedMembers_.clear();
2326 342 : }
2327 342 : if (sthis->pimpl_->bootstrapCb_)
2328 342 : sthis->pimpl_->bootstrapCb_();
2329 : #ifdef LIBJAMI_TEST
2330 342 : if (sthis->pimpl_->bootstrapCbTest_)
2331 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2332 : #endif
2333 342 : return;
2334 : }
2335 37 : fallback(sthis);
2336 379 : });
2337 379 : });
2338 : {
2339 514 : std::lock_guard lock(pimpl_->membersMtx_);
2340 514 : pimpl_->checkedMembers_.clear();
2341 514 : }
2342 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic
2343 : // connectivity change
2344 514 : if (pimpl_->swarmManager_->isShutdown()) {
2345 22 : pimpl_->swarmManager_->restart();
2346 22 : pimpl_->swarmManager_->maintainBuckets();
2347 492 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2348 294 : fallback(shared_from_this(), true);
2349 : }
2350 514 : }
2351 :
2352 : std::vector<std::string>
2353 18 : Conversation::commitsEndedCalls()
2354 : {
2355 18 : pimpl_->loadActiveCalls();
2356 18 : pimpl_->loadHostedCalls();
2357 18 : auto commits = pimpl_->commitsEndedCalls();
2358 18 : if (!commits.empty()) {
2359 : // Announce to client
2360 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2361 0 : if (auto sthis = w.lock())
2362 0 : sthis->pimpl_->announce(commits, true);
2363 0 : });
2364 : }
2365 18 : return commits;
2366 0 : }
2367 :
2368 : void
2369 389 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2370 : {
2371 389 : pimpl_->onMembersChanged_ = std::move(cb);
2372 389 : pimpl_->repository_->onMembersChanged([w = weak()](const std::set<std::string>& memberUris) {
2373 1083 : if (auto sthis = w.lock())
2374 1083 : sthis->pimpl_->onMembersChanged_(memberUris);
2375 1084 : });
2376 389 : }
2377 :
2378 : void
2379 389 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2380 : {
2381 778 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket), w = weak()](const std::string& deviceId,
2382 : ChannelCb&& cb) {
2383 694 : if (auto sthis = w.lock())
2384 693 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2385 1472 : };
2386 389 : }
2387 :
2388 : void
2389 653 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2390 : {
2391 653 : auto deviceId = channel->deviceId();
2392 : // Transmit avatar if necessary
2393 : // We do this here, because at this point we know both sides are connected and in
2394 : // the same conversation
2395 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2396 653 : auto cert = channel->peerCertificate();
2397 653 : if (!cert || !cert->issuer)
2398 0 : return;
2399 653 : auto member = cert->issuer->getId().toString();
2400 652 : pimpl_->swarmManager_->addChannel(std::move(channel));
2401 653 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2402 653 : auto sthis = w.lock();
2403 653 : if (auto account = a.lock()) {
2404 653 : account->sendProfile(sthis->id(), member, deviceId.toString());
2405 653 : }
2406 653 : });
2407 653 : }
2408 :
2409 : uint32_t
2410 4 : Conversation::countInteractions(const std::string& toId, const std::string& fromId, const std::string& authorUri) const
2411 : {
2412 4 : LogOptions options;
2413 4 : options.to = toId;
2414 4 : options.from = fromId;
2415 4 : options.authorUri = authorUri;
2416 4 : options.logIfNotFound = false;
2417 4 : options.fastLog = true;
2418 4 : History history;
2419 4 : std::lock_guard lk(history.mutex);
2420 4 : auto res = pimpl_->loadMessages2(options, &history);
2421 8 : return res.size();
2422 4 : }
2423 :
2424 : void
2425 4 : Conversation::search(uint32_t req, const Filter& filter, const std::shared_ptr<std::atomic_int>& flag) const
2426 : {
2427 : // Because logging a conversation can take quite some time,
2428 : // do it asynchronously
2429 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2430 4 : if (auto sthis = w.lock()) {
2431 4 : History history;
2432 4 : std::vector<std::map<std::string, std::string>> commits {};
2433 : // std::regex_constants::ECMAScript is the default flag.
2434 4 : auto re = std::regex(filter.regexSearch,
2435 4 : filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2436 4 : sthis->pimpl_->repository_->log(
2437 20 : [&](const auto& id, const auto& author, auto& commit) {
2438 20 : if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
2439 : // Filter author
2440 0 : return CallbackResult::Skip;
2441 : }
2442 20 : auto commitTime = git_commit_time(commit.get());
2443 20 : if (filter.before && filter.before < commitTime) {
2444 : // Only get commits before this date
2445 0 : return CallbackResult::Skip;
2446 : }
2447 20 : if (filter.after && filter.after > commitTime) {
2448 : // Only get commits before this date
2449 0 : if (git_commit_parentcount(commit.get()) <= 1)
2450 0 : return CallbackResult::Break;
2451 : else
2452 0 : return CallbackResult::Skip; // Because we are sorting it with
2453 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2454 : }
2455 :
2456 20 : return CallbackResult::Ok; // Continue
2457 : },
2458 20 : [&](auto&& cc) {
2459 40 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2460 40 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2461 20 : },
2462 20 : [&](auto id, auto, auto) {
2463 20 : if (id == filter.lastId)
2464 0 : return true;
2465 20 : return false;
2466 : },
2467 : "",
2468 : false);
2469 : // Search on generated history
2470 24 : for (auto& message : history.messageList) {
2471 20 : auto contentType = message->type;
2472 20 : auto isSearchable = contentType == "text/plain" || contentType == "application/data-transfer+json";
2473 20 : if (filter.type.empty() && !isSearchable) {
2474 : // Not searchable, at least for now
2475 8 : continue;
2476 12 : } else if (contentType == filter.type || filter.type.empty()) {
2477 12 : if (isSearchable) {
2478 : // If it's a text match the body, else the display name
2479 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2480 36 : : message->body.at("displayName");
2481 12 : std::smatch body_match;
2482 12 : if (std::regex_search(body, body_match, re)) {
2483 5 : auto commit = message->body;
2484 5 : commit["id"] = message->id;
2485 5 : commit["type"] = message->type;
2486 5 : commits.emplace_back(commit);
2487 5 : }
2488 12 : } else {
2489 : // Matching type, just add it to the results
2490 0 : commits.emplace_back(message->body);
2491 : }
2492 :
2493 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2494 0 : break;
2495 : }
2496 20 : }
2497 :
2498 4 : if (commits.size() > 0)
2499 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2500 3 : sthis->pimpl_->accountId_,
2501 6 : sthis->id(),
2502 3 : std::move(commits));
2503 : // If we're the latest thread, inform client that the search is finished
2504 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2505 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2506 8 : req, sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2507 : }
2508 8 : }
2509 4 : });
2510 4 : }
2511 :
2512 : void
2513 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2514 : {
2515 14 : if (!message.isMember("confId")) {
2516 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2517 0 : return;
2518 : }
2519 :
2520 14 : auto now = std::chrono::system_clock::now();
2521 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2522 : {
2523 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2524 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2525 14 : pimpl_->saveHostedCalls();
2526 14 : }
2527 :
2528 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2529 : }
2530 :
2531 : bool
2532 20 : Conversation::isHosting(const std::string& confId) const
2533 : {
2534 20 : auto info = infos();
2535 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2536 0 : return true; // We are the current device Host
2537 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2538 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2539 20 : }
2540 :
2541 : void
2542 11 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2543 : {
2544 11 : if (!message.isMember("confId")) {
2545 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2546 0 : return;
2547 : }
2548 :
2549 11 : auto erased = false;
2550 : {
2551 11 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2552 11 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2553 11 : }
2554 11 : if (erased) {
2555 11 : pimpl_->saveHostedCalls();
2556 11 : sendMessage(std::move(message), "", {}, std::move(cb));
2557 : } else
2558 0 : cb(false, "");
2559 : }
2560 :
2561 : std::vector<std::map<std::string, std::string>>
2562 39 : Conversation::currentCalls() const
2563 : {
2564 39 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2565 78 : return pimpl_->activeCalls_;
2566 39 : }
2567 : } // namespace jami
|