Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation.h"
19 :
20 : #include "account_const.h"
21 : #include "jamiaccount.h"
22 : #include "client/jami_signal.h"
23 : #include "swarm/swarm_manager.h"
24 : #include "conversationrepository.h"
25 : #ifdef ENABLE_PLUGIN
26 : #include "manager.h"
27 : #include "plugin/jamipluginmanager.h"
28 : #include "plugin/streamdata.h"
29 : #endif
30 : #include "jami/conversation_interface.h"
31 : #include "jami/configurationmanager_interface.h"
32 :
33 : #include "fileutils.h"
34 : #include "json_utils.h"
35 : #include "logger.h"
36 : #include "presence_manager.h"
37 :
38 : #include <opendht/thread_pool.h>
39 : #include <opendht/infohash.h>
40 : #include <fmt/compile.h>
41 : #include <asio/error_code.hpp>
42 :
43 : #include <algorithm>
44 : #include <memory>
45 : #include <charconv>
46 : #include <string_view>
47 : #include <tuple>
48 : #include <optional>
49 : #include <utility>
50 : #include <deque>
51 : #include <chrono>
52 : #include <ctime>
53 : #include <functional>
54 : #include <atomic>
55 : #include <regex>
56 :
57 : namespace jami {
58 :
59 : static const char* const LAST_MODIFIED = "lastModified";
60 :
61 13 : ConvInfo::ConvInfo(const Json::Value& json)
62 : {
63 13 : id = json[ConversationMapKeys::ID].asString();
64 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
65 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
66 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
67 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
68 20 : members.emplace(v["uri"].asString());
69 : }
70 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
71 13 : }
72 :
73 : Json::Value
74 26 : ConvInfo::toJson() const
75 : {
76 26 : Json::Value json;
77 26 : json[ConversationMapKeys::ID] = id;
78 26 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
79 26 : if (removed) {
80 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
81 : }
82 26 : if (erased) {
83 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
84 : }
85 67 : for (const auto& m : members) {
86 41 : Json::Value member;
87 41 : member["uri"] = m;
88 41 : json[ConversationMapKeys::MEMBERS].append(member);
89 41 : }
90 26 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
91 26 : return json;
92 0 : }
93 :
94 : // ConversationRequest
95 266 : ConversationRequest::ConversationRequest(const Json::Value& json)
96 : {
97 266 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
98 266 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
99 266 : from = json[ConversationMapKeys::FROM].asString();
100 266 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
101 266 : auto& md = json[ConversationMapKeys::METADATAS];
102 535 : for (const auto& member : md.getMemberNames()) {
103 269 : metadatas.emplace(member, md[member].asString());
104 266 : }
105 266 : }
106 :
107 : Json::Value
108 2 : ConversationRequest::toJson() const
109 : {
110 2 : Json::Value json;
111 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
112 2 : json[ConversationMapKeys::FROM] = from;
113 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
114 2 : if (declined)
115 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
116 4 : for (const auto& [key, value] : metadatas) {
117 2 : json[ConversationMapKeys::METADATAS][key] = value;
118 : }
119 2 : return json;
120 0 : }
121 :
122 : std::map<std::string, std::string>
123 237 : ConversationRequest::toMap() const
124 : {
125 237 : auto result = metadatas;
126 474 : result[ConversationMapKeys::ID] = conversationId;
127 474 : result[ConversationMapKeys::FROM] = from;
128 237 : if (declined)
129 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
130 711 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
131 237 : return result;
132 0 : }
133 :
134 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
135 :
136 : struct History
137 : {
138 : // While loading the history, we need to avoid:
139 : // - reloading history (can just be ignored)
140 : // - adding new commits (should wait for history to be loaded)
141 : std::mutex mutex {};
142 : std::condition_variable cv {};
143 : bool loading {false};
144 : MessageList messageList {};
145 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
146 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
147 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
148 : };
149 :
150 : class Conversation::Impl
151 : {
152 : private:
153 396 : Impl(std::unique_ptr<ConversationRepository>&& repository,
154 : const std::shared_ptr<JamiAccount>& account,
155 : std::vector<ConversationCommit>&& commits = {})
156 396 : : repository_(std::move(repository ? repository : throw std::logic_error("Invalid repository")))
157 390 : , account_(account)
158 390 : , accountId_(account->getAccountID())
159 390 : , userId_(account->getUsername())
160 780 : , deviceId_(account->currentDeviceId())
161 390 : , swarmManager_(std::make_shared<SwarmManager>(NodeId(deviceId_),
162 390 : Manager::instance().getSeededRandomEngine(),
163 780 : [account = account_](const DeviceId& deviceId) {
164 282 : if (auto acc = account.lock()) {
165 282 : return acc->isConnectedWith(deviceId);
166 282 : }
167 0 : return false;
168 : }))
169 390 : , transferManager_(std::make_shared<TransferManager>(accountId_,
170 : "",
171 390 : repository_->id(),
172 390 : Manager::instance().getSeededRandomEngine()))
173 390 : , repoPath_(fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id())
174 390 : , conversationDataPath_(fileutils::get_data_dir() / accountId_ / "conversation_data" / repository_->id())
175 390 : , fetchedPath_(conversationDataPath_ / ConversationDirectories::FETCHED)
176 390 : , sendingPath_(conversationDataPath_ / ConversationDirectories::SENDING)
177 390 : , preferencesPath_(conversationDataPath_ / ConversationDirectories::PREFERENCES)
178 390 : , statusPath_(conversationDataPath_ / ConversationDirectories::STATUS)
179 390 : , hostedCallsPath_(conversationDataPath_ / ConversationDirectories::HOSTED_CALLS)
180 390 : , activeCallsPath_(conversationDataPath_ / ConversationDirectories::ACTIVE_CALLS)
181 390 : , ioContext_(Manager::instance().ioContext())
182 2346 : , typers_(std::make_shared<Typers>(account, repository_->id()))
183 : {
184 390 : if (!commits.empty())
185 186 : initActiveCalls(repository_->convCommitsToMap(commits));
186 390 : loadStatus();
187 390 : setupMemberCallback();
188 408 : }
189 :
190 192 : Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&& repoAndCommits,
191 : const std::shared_ptr<JamiAccount>& account)
192 192 : : Impl(std::move(repoAndCommits.first), account, std::move(repoAndCommits.second))
193 186 : {}
194 :
195 : public:
196 186 : Impl(const std::shared_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "")
197 186 : : Impl(ConversationRepository::createConversation(account, mode, otherMember), account)
198 186 : {}
199 :
200 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
201 18 : : Impl(std::make_unique<ConversationRepository>(account, conversationId), account)
202 18 : {}
203 :
204 193 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& remoteDevice, const std::string& conversationId)
205 193 : : Impl(ConversationRepository::cloneConversation(account, remoteDevice, conversationId), account)
206 186 : {}
207 :
208 3113 : std::string toString() const
209 : {
210 12453 : return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
211 : }
212 :
213 0 : std::vector<std::map<std::string, std::string>> getConnectivity() const
214 : {
215 0 : return swarmManager_->getRoutingTableInfo();
216 : }
217 :
218 : mutable std::string fmtStr_;
219 :
220 390 : ~Impl() { stopTracking(); }
221 :
222 : /**
223 : * If, for whatever reason, the daemon is stopped while hosting a conference,
224 : * we need to announce the end of this call when restarting.
225 : * To avoid to keep active calls forever.
226 : */
227 : std::vector<std::string> commitsEndedCalls();
228 : bool isAdmin() const;
229 :
230 282 : void announce(const std::string& commitId, bool commitFromSelf = false)
231 : {
232 282 : std::vector<std::string> vec;
233 282 : if (!commitId.empty())
234 280 : vec.emplace_back(commitId);
235 282 : announce(vec, commitFromSelf);
236 282 : }
237 :
238 294 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
239 : {
240 294 : std::vector<ConversationCommit> convcommits;
241 294 : convcommits.reserve(commits.size());
242 598 : for (const auto& cid : commits) {
243 304 : if (auto commit = repository_->getCommit(cid)) {
244 304 : convcommits.emplace_back(*commit);
245 304 : }
246 : }
247 294 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
248 294 : }
249 :
250 : /**
251 : * Initialize activeCalls_ from the list of commits in the repository
252 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
253 : */
254 186 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
255 : {
256 186 : std::unordered_set<std::string> invalidHostUris;
257 186 : std::unordered_set<std::string> invalidCallIds;
258 :
259 186 : std::lock_guard lk(activeCallsMtx_);
260 1165 : for (const auto& commit : commits) {
261 1958 : if (commit.at(CommitKey::TYPE) == CommitType::MEMBER) {
262 : // Each commit of type MEMBER has an "action" field whose value can be one
263 : // of the following: "add", "join", "remove", "ban", "unban"
264 : // In the case of "remove" and "ban", we need to add the member's URI to
265 : // invalidHostUris to ensure that any call they may have started in the past
266 : // is no longer considered active.
267 : // For the other actions, there's no harm in adding the member's URI anyway,
268 : // since it's not possible to start hosting a call before joining the swarm (or
269 : // before getting unbanned in the case of previously banned members).
270 1544 : invalidHostUris.emplace(commit.at(CommitKey::URI));
271 624 : } else if (commit.find(CommitKey::CONF_ID) != commit.end() && commit.find(CommitKey::URI) != commit.end()
272 417 : && commit.find(CommitKey::DEVICE) != commit.end()) {
273 : // The commit indicates either the end or the beginning of a call
274 : // (depending on whether there is a "duration" field or not).
275 1 : auto convId = repository_->id();
276 2 : auto confId = commit.at(CommitKey::CONF_ID);
277 2 : auto uri = commit.at(CommitKey::URI);
278 1 : auto device = commit.at(CommitKey::DEVICE);
279 :
280 3 : if (commit.find(CommitKey::DURATION) == commit.end()
281 1 : && invalidCallIds.find(confId) == invalidCallIds.end()
282 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
283 1 : std::map<std::string, std::string> activeCall;
284 2 : activeCall["id"] = confId;
285 2 : activeCall["uri"] = uri;
286 1 : activeCall["device"] = device;
287 1 : activeCalls_.emplace_back(activeCall);
288 1 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
289 : convId,
290 : confId,
291 : device,
292 : uri);
293 1 : }
294 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
295 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
296 : // there are sometimes multiple commits indicating the beginning of the same call.)
297 1 : invalidCallIds.emplace(confId);
298 1 : }
299 : }
300 186 : saveActiveCalls();
301 186 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_, repository_->id(), activeCalls_);
302 186 : }
303 :
304 : /**
305 : * Update activeCalls_ via announced commits (in load or via new commits)
306 : * @param commit Commit to check
307 : * @param eraseOnly If we want to ignore added commits
308 : * @param emitSig If we want to emit to client
309 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
310 : */
311 75 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
312 : bool eraseOnly = false,
313 : bool emitSig = true) const
314 : {
315 75 : if (!repository_)
316 0 : return;
317 150 : if (commit.at(CommitKey::TYPE) == CommitType::MEMBER) {
318 : // In this case, we need to check if we are not removing a hosting member or device
319 17 : std::lock_guard lk(activeCallsMtx_);
320 17 : auto it = activeCalls_.begin();
321 17 : auto updateActives = false;
322 17 : while (it != activeCalls_.end()) {
323 0 : if (it->at("uri") == commit.at(CommitKey::URI) || it->at("device") == commit.at(CommitKey::URI)) {
324 0 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
325 : it->at("id"),
326 : commit.at(CommitKey::URI));
327 0 : it = activeCalls_.erase(it);
328 0 : updateActives = true;
329 : } else {
330 0 : ++it;
331 : }
332 : }
333 17 : if (updateActives) {
334 0 : saveActiveCalls();
335 0 : if (emitSig)
336 0 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
337 0 : repository_->id(),
338 0 : activeCalls_);
339 : }
340 17 : return;
341 17 : }
342 : // Else, it's a call information
343 339 : if (commit.find(CommitKey::CONF_ID) != commit.end() && commit.find(CommitKey::URI) != commit.end()
344 281 : && commit.find(CommitKey::DEVICE) != commit.end()) {
345 55 : auto convId = repository_->id();
346 110 : auto confId = commit.at(CommitKey::CONF_ID);
347 110 : auto uri = commit.at(CommitKey::URI);
348 55 : auto device = commit.at(CommitKey::DEVICE);
349 55 : std::lock_guard lk(activeCallsMtx_);
350 55 : auto itActive = std::find_if(activeCalls_.begin(), activeCalls_.end(), [&](const auto& value) {
351 168 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
352 : });
353 165 : if (commit.find(CommitKey::DURATION) == commit.end()) {
354 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
355 124 : JAMI_DEBUG("swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
356 : convId,
357 : confId,
358 : device,
359 : uri);
360 155 : activeCalls_.emplace_back(std::map<std::string, std::string> {
361 : {"id", confId},
362 : {"uri", uri},
363 : {"device", device},
364 124 : });
365 31 : saveActiveCalls();
366 31 : if (emitSig)
367 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
368 31 : repository_->id(),
369 31 : activeCalls_);
370 : }
371 : } else {
372 24 : if (itActive != activeCalls_.end()) {
373 24 : itActive = activeCalls_.erase(itActive);
374 : // Unlikely, but we must ensure that no duplicate exists
375 48 : while (itActive != activeCalls_.end()) {
376 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
377 0 : return value.at("id") == confId && value.at("uri") == uri && value.at("device") == device;
378 : });
379 0 : if (itActive != activeCalls_.end()) {
380 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
381 0 : itActive = activeCalls_.erase(itActive);
382 : }
383 : }
384 :
385 24 : if (eraseOnly) {
386 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
387 : "{:s}, account {:s}",
388 : convId,
389 : confId,
390 : device,
391 : uri);
392 : } else {
393 96 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
394 : convId,
395 : confId,
396 : device,
397 : uri);
398 : }
399 : }
400 24 : saveActiveCalls();
401 24 : if (emitSig)
402 24 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
403 24 : repository_->id(),
404 24 : activeCalls_);
405 : }
406 55 : }
407 31 : }
408 :
409 1195 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
410 : {
411 1195 : if (!repository_)
412 0 : return;
413 1195 : auto convId = repository_->id();
414 1195 : auto ok = !commits.empty();
415 2392 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
416 1195 : addToHistory(loadedHistory_, commits, true, commitFromSelf);
417 1195 : if (ok) {
418 1193 : bool announceMember = false;
419 2420 : for (const auto& c : commits) {
420 : // Announce member events
421 2454 : if (c.at(CommitKey::TYPE) == CommitType::MEMBER) {
422 4585 : if (c.find(CommitKey::URI) != c.end() && c.find(CommitKey::ACTION) != c.end()) {
423 1834 : const auto& uri = c.at(CommitKey::URI);
424 917 : const auto& actionStr = c.at(CommitKey::ACTION);
425 917 : auto action = -1;
426 917 : if (actionStr == CommitAction::ADD)
427 442 : action = 0;
428 475 : else if (actionStr == CommitAction::JOIN)
429 457 : action = 1;
430 18 : else if (actionStr == CommitAction::REMOVE)
431 1 : action = 2;
432 17 : else if (actionStr == CommitAction::BAN)
433 16 : action = 3;
434 1 : else if (actionStr == CommitAction::UNBAN)
435 1 : action = 4;
436 917 : if (actionStr == CommitAction::BAN || actionStr == CommitAction::REMOVE) {
437 : // In this case, a potential host was removed during a call.
438 17 : updateActiveCalls(c);
439 17 : typers_->removeTyper(uri);
440 : }
441 917 : if (action != -1) {
442 917 : announceMember = true;
443 917 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(accountId_,
444 : convId,
445 : uri,
446 : action);
447 : }
448 : }
449 620 : } else if (c.at(CommitKey::TYPE) == CommitType::CALL_HISTORY) {
450 58 : updateActiveCalls(c);
451 : }
452 : #ifdef ENABLE_PLUGIN
453 1227 : auto& pluginChatManager = Manager::instance().getJamiPluginManager().getChatServicesManager();
454 1227 : if (pluginChatManager.hasHandlers()) {
455 0 : auto cm = std::make_shared<JamiMessage>(accountId_, convId, c.at("author") != userId_, c, false);
456 0 : cm->isSwarm = true;
457 0 : pluginChatManager.publishMessage(std::move(cm));
458 0 : }
459 : #endif
460 : }
461 :
462 1193 : if (announceMember && onMembersChanged_) {
463 917 : onMembersChanged_(repository_->memberUris("", {}));
464 : }
465 : }
466 1195 : }
467 :
468 390 : void loadStatus()
469 : {
470 : try {
471 : // read file
472 777 : auto file = fileutils::loadFile(statusPath_);
473 : // load values
474 3 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
475 3 : std::lock_guard lk {messageStatusMtx_};
476 3 : oh.get().convert(messagesStatus_);
477 390 : } catch (const std::exception& e) {
478 387 : }
479 390 : }
480 1108 : void saveStatus()
481 : {
482 1108 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
483 1108 : msgpack::pack(file, messagesStatus_);
484 1108 : }
485 :
486 18 : void loadActiveCalls() const
487 : {
488 : try {
489 : // read file
490 29 : auto file = fileutils::loadFile(activeCallsPath_);
491 : // load values
492 7 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
493 7 : std::lock_guard lk {activeCallsMtx_};
494 7 : oh.get().convert(activeCalls_);
495 18 : } catch (const std::exception& e) {
496 11 : return;
497 11 : }
498 : }
499 :
500 259 : void saveActiveCalls() const
501 : {
502 259 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
503 259 : msgpack::pack(file, activeCalls_);
504 259 : }
505 :
506 18 : void loadHostedCalls() const
507 : {
508 : try {
509 : // read file
510 30 : auto file = fileutils::loadFile(hostedCallsPath_);
511 : // load values
512 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
513 6 : std::lock_guard lk {activeCallsMtx_};
514 6 : oh.get().convert(hostedCalls_);
515 18 : } catch (const std::exception& e) {
516 12 : return;
517 12 : }
518 : }
519 :
520 42 : void saveHostedCalls() const
521 : {
522 42 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
523 42 : msgpack::pack(file, hostedCalls_);
524 42 : }
525 :
526 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
527 :
528 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
529 : bool includeLeft,
530 : bool includeBanned) const;
531 :
532 : std::vector<std::map<std::string, std::string>> getTrackedMembers() const;
533 :
534 6817 : std::string_view memberBanType(const std::string& uri) const
535 : {
536 6817 : auto crt = fmt::format("{}.crt", uri);
537 6817 : auto bannedMember = repoPath_ / MemberPath::BANNED / MemberPath::MEMBERS / crt;
538 6814 : if (std::filesystem::is_regular_file(bannedMember))
539 17 : return "members"sv;
540 6801 : auto bannedAdmin = repoPath_ / MemberPath::BANNED / MemberPath::ADMINS / crt;
541 6798 : if (std::filesystem::is_regular_file(bannedAdmin))
542 0 : return "admins"sv;
543 6800 : auto bannedInvited = repoPath_ / MemberPath::BANNED / MemberPath::INVITED / uri;
544 6795 : if (std::filesystem::is_regular_file(bannedInvited))
545 0 : return "invited"sv;
546 6799 : return {};
547 6816 : }
548 :
549 7525 : bool isDeviceBanned(const std::string& deviceId) const
550 : {
551 7525 : auto crt = fmt::format("{}.crt", deviceId);
552 7526 : auto bannedDevice = repoPath_ / MemberPath::BANNED / MemberPath::DEVICES / crt;
553 15048 : return std::filesystem::is_regular_file(bannedDevice);
554 7527 : }
555 :
556 5071 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
557 : {
558 5071 : std::lock_guard lk(gitSocketMtx_);
559 5071 : auto deviceSockets = gitSocketList_.find(deviceId);
560 10142 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
561 5071 : }
562 :
563 2283 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
564 : {
565 2283 : std::lock_guard lk(gitSocketMtx_);
566 2284 : gitSocketList_[deviceId] = socket;
567 2284 : }
568 939 : void removeGitSocket(const DeviceId& deviceId)
569 : {
570 939 : std::lock_guard lk(gitSocketMtx_);
571 939 : auto deviceSockets = gitSocketList_.find(deviceId);
572 939 : if (deviceSockets != gitSocketList_.end())
573 490 : gitSocketList_.erase(deviceSockets);
574 939 : }
575 :
576 : void disconnectFromDevice(const DeviceId& deviceId);
577 :
578 : /**
579 : * Remove all git sockets and all DRT nodes associated with the given peer.
580 : * This is used when a swarm member is banned to ensure that we stop syncing
581 : * with them or sending them message notifications.
582 : */
583 : void disconnectFromPeer(const std::string& peerUri);
584 :
585 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, bool includeLeft) const;
586 :
587 : std::function<void()> bootstrapCb_;
588 : std::mutex bootstrapMtx_ {};
589 : #ifdef LIBJAMI_TEST
590 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
591 : #endif
592 :
593 : std::mutex writeMtx_ {};
594 : const std::unique_ptr<ConversationRepository> repository_;
595 : const std::weak_ptr<JamiAccount> account_;
596 : const std::string accountId_;
597 : const std::string userId_;
598 : const std::string deviceId_;
599 : const std::shared_ptr<SwarmManager> swarmManager_;
600 : std::atomic_bool isRemoving_ {false};
601 : std::vector<libjami::SwarmMessage> loadMessages(const LogOptions& options, History* optHistory = nullptr);
602 : void pull(const std::string& deviceId);
603 :
604 : // Avoid multiple fetch/merges at the same time.
605 : std::mutex pullcbsMtx_ {};
606 : // store current remote in fetch
607 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {};
608 : const std::shared_ptr<TransferManager> transferManager_ {};
609 : const std::filesystem::path repoPath_ {};
610 : const std::filesystem::path conversationDataPath_ {};
611 : const std::filesystem::path fetchedPath_ {};
612 :
613 : // Manage last message displayed and status
614 : const std::filesystem::path sendingPath_ {};
615 : const std::filesystem::path preferencesPath_ {};
616 : const std::filesystem::path statusPath_ {};
617 :
618 : OnMembersChanged onMembersChanged_ {};
619 : struct TrackedMember
620 : {
621 : std::set<DeviceId> devices;
622 : std::set<DeviceId> failedDevices;
623 : };
624 : std::map<std::string, TrackedMember> trackedMembers_;
625 : mutable std::mutex trackedMembersMtx_;
626 : bool isTracking_ {false};
627 : void setupMemberCallback();
628 : void startTracking(std::weak_ptr<Conversation> w);
629 : void stopTracking();
630 : void rotateTrackedMembers(const std::string& memberUri = "", const DeviceId& deviceId = {});
631 : void monitorConnection(std::weak_ptr<Conversation> w);
632 : void onConnectionFailed(const DeviceId& deviceId, const std::string& memberUri = "");
633 :
634 : uint64_t presenceDeviceListenerToken_ {0};
635 :
636 : // Manage hosted calls on this device
637 : std::filesystem::path hostedCallsPath_ {};
638 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
639 : // Manage active calls for this conversation (can be hosted by other devices)
640 : std::filesystem::path activeCallsPath_ {};
641 : mutable std::mutex activeCallsMtx_ {};
642 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
643 :
644 : mutable std::mutex gitSocketMtx_ {};
645 : GitSocketList gitSocketList_ {};
646 :
647 : // Bootstrap
648 : const std::shared_ptr<asio::io_context> ioContext_;
649 :
650 : /**
651 : * Loaded history represents the linearized history to show for clients
652 : */
653 : History loadedHistory_ {};
654 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
655 : History& history,
656 : const std::vector<std::map<std::string, std::string>>& commits,
657 : bool messageReceived = false,
658 : bool commitFromSelf = false);
659 :
660 : void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
661 : void handleEdition(History& history,
662 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
663 : bool messageReceived) const;
664 : bool handleMessage(History& history,
665 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
666 : bool messageReceived) const;
667 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const;
668 : /**
669 : * {uri, {
670 : * {"fetch", "commitId"},
671 : * {"fetched_ts", "timestamp"},
672 : * {"read", "commitId"},
673 : * {"read_ts", "timestamp"}
674 : * }
675 : * }
676 : */
677 : mutable std::mutex messageStatusMtx_;
678 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
679 : std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
680 : /**
681 : * Status: 0 = commited, 1 = fetched, 2 = read
682 : * This cache the curent status to add in the messages
683 : */
684 : // Note: only store int32_t cause it's easy to pass to dbus this way
685 : // memberToStatus serves as a cache for loading messages
686 : std::map<std::string, int32_t> memberToStatus;
687 :
688 : // futureStatus is used to store the status for receiving messages
689 : // (because we're not sure to fetch the commit before receiving a status change for this)
690 : std::map<std::string, std::map<std::string, int32_t>> futureStatus;
691 : // Update internal structures regarding status
692 : void updateStatus(const std::string& uri,
693 : libjami::Account::MessageStates status,
694 : const std::string& commitId,
695 : const std::string& ts,
696 : bool emit = false);
697 :
698 : std::shared_ptr<Typers> typers_;
699 : };
700 :
701 : void
702 390 : Conversation::Impl::setupMemberCallback()
703 : {
704 390 : repository_->onMembersChanged([this](const std::set<std::string>& memberUris) {
705 : {
706 1078 : std::lock_guard lk(trackedMembersMtx_);
707 1078 : if (isTracking_) {
708 230 : if (auto acc = account_.lock()) {
709 500 : for (auto it = trackedMembers_.begin(); it != trackedMembers_.end();) {
710 270 : if (memberUris.find(it->first) == memberUris.end()) {
711 8 : acc->presenceManager()->untrackBuddy(it->first);
712 8 : it = trackedMembers_.erase(it);
713 : } else {
714 262 : ++it;
715 : }
716 : }
717 230 : }
718 : }
719 1078 : }
720 :
721 1078 : if (onMembersChanged_)
722 1078 : onMembersChanged_(memberUris);
723 1078 : });
724 390 : }
725 :
726 : void
727 538 : Conversation::Impl::startTracking(std::weak_ptr<Conversation> w)
728 : {
729 538 : auto acc = account_.lock();
730 538 : if (!acc)
731 0 : return;
732 :
733 : {
734 538 : std::lock_guard lk(trackedMembersMtx_);
735 538 : if (isTracking_)
736 137 : return;
737 401 : isTracking_ = true;
738 802 : presenceDeviceListenerToken_ = acc->presenceManager()->addDeviceListener(
739 802 : [w](const std::string& uri, const DeviceId& deviceId, bool online) {
740 694 : if (auto sthis = w.lock()) {
741 694 : if (online && sthis->isMember(uri)) {
742 1647 : sthis->addKnownDevices({deviceId}, uri);
743 : }
744 694 : }
745 694 : });
746 538 : }
747 :
748 802 : rotateTrackedMembers();
749 538 : }
750 :
751 : void
752 707 : Conversation::Impl::stopTracking()
753 : {
754 : // Collect data under trackedMembersMtx_, then release it before calling
755 : // into PresenceManager to avoid lock-ordering inversion with the
756 : // PresenceManager mutex (which is held when notifyListeners calls
757 : // addKnownDevices, which in turn acquires trackedMembersMtx_).
758 707 : uint64_t token = 0;
759 707 : std::vector<std::string> urisToUntrack;
760 : {
761 707 : std::lock_guard lk(trackedMembersMtx_);
762 707 : if (!isTracking_)
763 306 : return;
764 401 : isTracking_ = false;
765 401 : token = presenceDeviceListenerToken_;
766 401 : presenceDeviceListenerToken_ = 0;
767 1162 : for (const auto& [uri, _] : trackedMembers_) {
768 761 : urisToUntrack.push_back(uri);
769 : }
770 401 : trackedMembers_.clear();
771 707 : }
772 :
773 401 : auto acc = account_.lock();
774 401 : if (!acc)
775 181 : return;
776 :
777 220 : if (token) {
778 220 : acc->presenceManager()->removeDeviceListener(token);
779 : }
780 :
781 708 : for (const auto& uri : urisToUntrack) {
782 487 : acc->presenceManager()->untrackBuddy(uri);
783 : }
784 888 : }
785 :
786 : void
787 676 : Conversation::Impl::rotateTrackedMembers(const std::string& memberUri, const DeviceId& deviceId)
788 : {
789 676 : auto acc = account_.lock();
790 676 : if (!acc)
791 0 : return;
792 :
793 676 : std::lock_guard lk(trackedMembersMtx_);
794 676 : if (!isTracking_)
795 221 : return;
796 :
797 455 : if (!memberUri.empty()) {
798 54 : if (auto it = trackedMembers_.find(memberUri); it != trackedMembers_.end()) {
799 204 : JAMI_WARNING("{} [device {}] Rotating tracked members after connection failure", toString(), deviceId);
800 51 : auto& info = it->second;
801 51 : info.failedDevices.insert(deviceId);
802 51 : if (std::includes(info.failedDevices.begin(),
803 : info.failedDevices.end(),
804 : info.devices.begin(),
805 : info.devices.end())) {
806 28 : acc->presenceManager()->untrackBuddy(it->first);
807 28 : trackedMembers_.erase(it);
808 : }
809 : } else {
810 3 : return;
811 : }
812 : }
813 :
814 452 : auto members = repository_->members();
815 452 : size_t N = members.size();
816 452 : size_t K = std::min(N, 3 + (size_t) std::log2(N));
817 :
818 : // If we are already trying enough devices, don't rotate
819 452 : auto activeDevices = swarmManager_->getActiveNodesCount();
820 452 : if (activeDevices >= 2 * K)
821 0 : return;
822 :
823 1808 : JAMI_WARNING("{} Refreshing tracked members: {}/{} active ({}/{} devices)",
824 : toString(),
825 : trackedMembers_.size(),
826 : K,
827 : activeDevices,
828 : 2 * K);
829 :
830 : // Add new members if we have space
831 452 : if (trackedMembers_.size() < K) {
832 429 : std::vector<std::string> candidates;
833 429 : candidates.reserve(N - trackedMembers_.size());
834 1486 : for (const auto& m : members) {
835 1057 : if (m.uri != memberUri && trackedMembers_.find(m.uri) == trackedMembers_.end()) {
836 998 : candidates.push_back(m.uri);
837 : }
838 : }
839 429 : if (!candidates.empty()) {
840 404 : std::vector<std::string> chosen;
841 404 : std::sample(candidates.begin(),
842 : candidates.end(),
843 : std::back_inserter(chosen),
844 404 : K - trackedMembers_.size(),
845 404 : Manager::instance().getSeededRandomEngine());
846 1201 : for (const auto& uri : chosen) {
847 797 : acc->presenceManager()->trackBuddy(uri);
848 797 : trackedMembers_.emplace(uri, TrackedMember {});
849 : }
850 404 : }
851 429 : }
852 900 : }
853 :
854 : void
855 275 : Conversation::Impl::onConnectionFailed(const DeviceId& deviceId, const std::string& memberUri)
856 : {
857 275 : rotateTrackedMembers(memberUri, deviceId);
858 275 : }
859 :
860 : void
861 512 : Conversation::Impl::monitorConnection(std::weak_ptr<Conversation> w)
862 : {
863 512 : if (!swarmManager_->isConnected()) {
864 500 : startTracking(w);
865 : }
866 :
867 512 : swarmManager_->onConnectionChanged([w](bool ok) {
868 355 : dht::ThreadPool::io().run([w, ok] {
869 355 : if (auto sthis = w.lock()) {
870 : // Check the current connection state rather than relying on the
871 : // captured `ok` value, because the thread pool does not guarantee
872 : // execution order. Two rapid state changes (connected then
873 : // disconnected) could otherwise cause stopTracking to run after
874 : // startTracking, leaving the conversation untracked while the DRT
875 : // has no connected nodes.
876 355 : if (sthis->pimpl_->swarmManager_->isConnected()) {
877 317 : sthis->pimpl_->stopTracking();
878 : } else {
879 38 : sthis->pimpl_->startTracking(w);
880 : }
881 355 : if (ok) {
882 314 : if (sthis->pimpl_->bootstrapCb_)
883 314 : sthis->pimpl_->bootstrapCb_();
884 : }
885 : #ifdef LIBJAMI_TEST
886 355 : if (sthis->pimpl_->bootstrapCbTest_)
887 26 : sthis->pimpl_->bootstrapCbTest_(sthis->id(),
888 13 : ok ? BootstrapStatus::SUCCESS : BootstrapStatus::FAILED);
889 : #endif
890 355 : }
891 355 : });
892 355 : });
893 512 : }
894 :
895 : bool
896 16 : Conversation::Impl::isAdmin() const
897 : {
898 16 : auto adminsPath = repoPath_ / MemberPath::ADMINS;
899 32 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
900 16 : }
901 :
902 : void
903 11 : Conversation::Impl::disconnectFromDevice(const DeviceId& deviceId)
904 : {
905 22 : swarmManager_->deleteNode({deviceId});
906 :
907 11 : std::shared_ptr<dhtnet::ChannelSocket> socket;
908 : {
909 11 : std::lock_guard lk(gitSocketMtx_);
910 11 : if (auto it = gitSocketList_.find(deviceId); it != gitSocketList_.end()) {
911 11 : socket = std::move(it->second);
912 11 : gitSocketList_.erase(it);
913 : }
914 11 : }
915 11 : if (socket)
916 11 : socket->shutdown();
917 11 : }
918 :
919 : void
920 16 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
921 : {
922 16 : std::set<DeviceId> devicesToRemove;
923 :
924 16 : const auto nodes = swarmManager_->getAllNodes();
925 30 : for (const auto node : nodes)
926 14 : if (peerUri == repository_->uriFromDevice(node.toString()))
927 6 : devicesToRemove.emplace(node);
928 :
929 : {
930 16 : std::lock_guard lk(gitSocketMtx_);
931 38 : for (const auto& [deviceId, socket] : gitSocketList_) {
932 22 : auto cert = socket ? socket->peerCertificate() : nullptr;
933 44 : if ((cert && cert->issuer && cert->issuer->getId().toString() == peerUri)
934 44 : || peerUri == repository_->uriFromDevice(deviceId.toString())) {
935 11 : devicesToRemove.emplace(deviceId);
936 : }
937 22 : }
938 16 : }
939 :
940 27 : for (const auto& deviceId : devicesToRemove)
941 11 : disconnectFromDevice(deviceId);
942 16 : }
943 :
944 : std::vector<std::map<std::string, std::string>>
945 104 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
946 : {
947 104 : std::vector<std::map<std::string, std::string>> result;
948 104 : auto members = repository_->members();
949 104 : std::lock_guard lk(messageStatusMtx_);
950 313 : for (const auto& member : members) {
951 209 : if (member.role == MemberRole::BANNED && !includeBanned) {
952 0 : continue;
953 : }
954 209 : if (member.role == MemberRole::INVITED && !includeInvited)
955 0 : continue;
956 209 : if (member.role == MemberRole::LEFT && !includeLeft)
957 0 : continue;
958 209 : auto mm = member.map();
959 209 : auto it = messagesStatus_.find(member.uri);
960 209 : if (it != messagesStatus_.end()) {
961 282 : auto readIt = it->second.find("read");
962 141 : if (readIt != it->second.end())
963 261 : mm[ConversationMapKeys::LAST_DISPLAYED] = readIt->second;
964 : }
965 209 : result.emplace_back(std::move(mm));
966 209 : }
967 208 : return result;
968 104 : }
969 :
970 : std::vector<std::map<std::string, std::string>>
971 0 : Conversation::Impl::getTrackedMembers() const
972 : {
973 0 : std::vector<std::map<std::string, std::string>> result;
974 0 : std::lock_guard lk(trackedMembersMtx_);
975 0 : for (const auto& [uri, member] : trackedMembers_) {
976 0 : std::map<std::string, std::string> map;
977 0 : map["uri"] = uri;
978 0 : std::string devicesStr;
979 0 : for (const auto& dev : member.devices) {
980 0 : if (!devicesStr.empty())
981 0 : devicesStr += ";";
982 0 : devicesStr += dev.toString();
983 : }
984 0 : map["devices"] = devicesStr;
985 0 : result.emplace_back(std::move(map));
986 0 : }
987 0 : return result;
988 0 : }
989 :
990 : std::vector<std::string>
991 18 : Conversation::Impl::commitsEndedCalls()
992 : {
993 : // Handle current calls
994 18 : std::vector<std::string> commits {};
995 18 : std::unique_lock lk(writeMtx_);
996 18 : std::unique_lock lkA(activeCallsMtx_);
997 18 : for (const auto& hostedCall : hostedCalls_) {
998 : // In this case, this means that we left
999 : // the conference while still hosting it, so activeCalls
1000 : // will not be correctly updated
1001 : // We don't need to send notifications there, as peers will sync with presence
1002 0 : auto confId = hostedCall.first;
1003 0 : auto now = std::chrono::system_clock::now();
1004 0 : uint64_t nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
1005 0 : auto duration = (nowConverted > hostedCall.second) ? (nowConverted - hostedCall.second) * 1000 : 0;
1006 0 : auto commitMessage = CommitMessage::conferenceHostingEnd(confId, deviceId_, userId_, duration);
1007 :
1008 0 : auto itActive = std::find_if(activeCalls_.begin(),
1009 : activeCalls_.end(),
1010 0 : [this, confId = hostedCall.first](const auto& value) {
1011 0 : return value.at("id") == confId && value.at("uri") == userId_
1012 0 : && value.at("device") == deviceId_;
1013 : });
1014 0 : if (itActive != activeCalls_.end())
1015 0 : activeCalls_.erase(itActive);
1016 0 : commits.emplace_back(repository_->commitMessage(commitMessage.toString()));
1017 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
1018 0 : }
1019 18 : hostedCalls_.clear();
1020 18 : saveActiveCalls();
1021 18 : saveHostedCalls();
1022 36 : return commits;
1023 18 : }
1024 :
1025 : std::vector<libjami::SwarmMessage>
1026 1650 : Conversation::Impl::loadMessages(const LogOptions& options, History* optHistory)
1027 : {
1028 1650 : auto history = optHistory ? optHistory : &loadedHistory_;
1029 :
1030 : // history->mutex is locked by the caller
1031 1650 : if (!repository_ || history->loading) {
1032 0 : return {};
1033 : }
1034 1650 : history->loading = true;
1035 :
1036 : // By convention, if options.nbOfCommits is zero, then we
1037 : // don't impose a limit on the number of commits returned.
1038 1650 : bool limitNbOfCommits = options.nbOfCommits > 0;
1039 :
1040 1650 : auto startLogging = options.from == "";
1041 1650 : auto breakLogging = false;
1042 1650 : auto currentHistorySize = loadedHistory_.messageList.size();
1043 1650 : std::vector<std::string> replies;
1044 1650 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
1045 3300 : repository_->log(
1046 : /* preCondition */
1047 3300 : [&](const auto& id, const auto& author, const auto& commit) {
1048 9392 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
1049 3 : return CallbackResult::Skip;
1050 : }
1051 9391 : if (id == options.to) {
1052 672 : if (options.includeTo)
1053 0 : breakLogging = true; // For the next commit
1054 : }
1055 9390 : if (replies.empty()) { // This avoid load until
1056 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
1057 : // until this commit
1058 9391 : if ((limitNbOfCommits
1059 9391 : && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
1060 3 : return CallbackResult::Break; // Stop logging
1061 9388 : if (breakLogging)
1062 0 : return CallbackResult::Break; // Stop logging
1063 9388 : if (id == options.to && !options.includeTo) {
1064 671 : return CallbackResult::Break; // Stop logging
1065 : }
1066 : }
1067 :
1068 8718 : if (!startLogging && options.from != "" && options.from == id)
1069 1104 : startLogging = true;
1070 8717 : if (!startLogging)
1071 25 : return CallbackResult::Skip; // Start logging after this one
1072 :
1073 8692 : if (options.fastLog) {
1074 7720 : if (options.authorUri != "") {
1075 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
1076 1 : return CallbackResult::Break; // Found author, stop
1077 : }
1078 : }
1079 : }
1080 :
1081 8684 : return CallbackResult::Ok; // Continue
1082 : },
1083 : /* emplaceCb */
1084 3300 : [&](auto&& cc) {
1085 8696 : if (limitNbOfCommits && (msgList.size() == options.nbOfCommits))
1086 430 : return;
1087 8267 : auto optMessage = repository_->convCommitToMap(cc);
1088 8256 : if (!optMessage.has_value())
1089 1 : return;
1090 8255 : auto message = optMessage.value();
1091 24790 : if (message.find(CommitKey::REPLY_TO) != message.end()) {
1092 1 : auto it = std::find(replies.begin(), replies.end(), message.at(CommitKey::REPLY_TO));
1093 1 : if (it == replies.end()) {
1094 3 : replies.emplace_back(message.at(CommitKey::REPLY_TO));
1095 : }
1096 : }
1097 8257 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
1098 8259 : if (it != replies.end()) {
1099 1 : replies.erase(it);
1100 : }
1101 8265 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
1102 8265 : if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
1103 0 : firstMsg = *loadedHistory_.messageList.rbegin();
1104 : }
1105 24793 : auto added = addToHistory(*history, {message}, false, false);
1106 8267 : if (!added.empty() && firstMsg) {
1107 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *firstMsg);
1108 : }
1109 8269 : msgList.insert(msgList.end(), added.begin(), added.end());
1110 16531 : },
1111 : /* postCondition */
1112 0 : [&](auto, auto, auto) {
1113 : // Stop logging if there was a limit set on the number of commits
1114 : // to return and we reached it. This isn't strictly necessary since
1115 : // the check at the beginning of `emplaceCb` ensures that we won't
1116 : // return too many messages, but it prevents us from needlessly
1117 : // iterating over a (potentially) large number of commits.
1118 8687 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
1119 : },
1120 1650 : options.from,
1121 1650 : options.logIfNotFound);
1122 :
1123 1650 : history->loading = false;
1124 1650 : history->cv.notify_all();
1125 :
1126 : // Convert for client (remove ptr)
1127 1650 : std::vector<libjami::SwarmMessage> ret;
1128 1650 : ret.reserve(msgList.size());
1129 9903 : for (const auto& msg : msgList) {
1130 8257 : ret.emplace_back(*msg);
1131 : }
1132 1650 : return ret;
1133 1650 : }
1134 :
1135 : void
1136 3 : Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
1137 : {
1138 6 : auto it = history.quickAccess.find(sharedCommit->body.at(CommitKey::REACT_TO));
1139 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1140 3 : if (peditIt != history.pendingEditions.end()) {
1141 0 : auto oldBody = sharedCommit->body;
1142 0 : sharedCommit->body[CommitKey::BODY] = peditIt->second.front()->body[CommitKey::BODY];
1143 0 : if (sharedCommit->body.at(CommitKey::BODY).empty())
1144 0 : return;
1145 0 : history.pendingEditions.erase(peditIt);
1146 0 : }
1147 3 : if (it != history.quickAccess.end()) {
1148 3 : it->second->reactions.emplace_back(sharedCommit->body);
1149 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
1150 3 : repository_->id(),
1151 3 : it->second->id,
1152 3 : sharedCommit->body);
1153 : } else {
1154 0 : history.pendingReactions[sharedCommit->body.at(CommitKey::REACT_TO)].emplace_back(sharedCommit->body);
1155 : }
1156 : }
1157 :
1158 : void
1159 9 : Conversation::Impl::handleEdition(History& history,
1160 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1161 : bool messageReceived) const
1162 : {
1163 18 : auto editId = sharedCommit->body.at(CommitKey::EDIT);
1164 9 : auto it = history.quickAccess.find(editId);
1165 9 : if (it != history.quickAccess.end()) {
1166 7 : auto baseCommit = it->second;
1167 7 : if (baseCommit) {
1168 21 : auto itReact = baseCommit->body.find(CommitKey::REACT_TO);
1169 7 : std::string toReplace = (baseCommit->type == CommitType::DATA_TRANSFER) ? CommitKey::TID : CommitKey::BODY;
1170 7 : auto body = sharedCommit->body.at(toReplace);
1171 : // Edit reaction
1172 7 : if (itReact != baseCommit->body.end()) {
1173 1 : baseCommit->body[toReplace] = body; // Replace body if pending
1174 1 : it = history.quickAccess.find(itReact->second);
1175 1 : auto itPending = history.pendingReactions.find(itReact->second);
1176 1 : if (it != history.quickAccess.end()) {
1177 1 : baseCommit = it->second; // Base commit
1178 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1179 1 : baseCommit->reactions.end(),
1180 1 : [&](const auto& reaction) {
1181 3 : return reaction.at("id") == editId;
1182 : });
1183 1 : if (itPreviousReact != baseCommit->reactions.end()) {
1184 1 : (*itPreviousReact)[toReplace] = body;
1185 1 : if (body.empty()) {
1186 1 : baseCommit->reactions.erase(itPreviousReact);
1187 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
1188 1 : repository_->id(),
1189 1 : baseCommit->id,
1190 : editId);
1191 : }
1192 : }
1193 0 : } else if (itPending != history.pendingReactions.end()) {
1194 : // Else edit if pending
1195 0 : auto itReaction = std::find_if(itPending->second.begin(),
1196 0 : itPending->second.end(),
1197 0 : [&](const auto& reaction) { return reaction.at("id") == editId; });
1198 0 : if (itReaction != itPending->second.end()) {
1199 0 : (*itReaction)[toReplace] = body;
1200 0 : if (body.empty())
1201 0 : itPending->second.erase(itReaction);
1202 : }
1203 : } else {
1204 : // Add to pending edtions
1205 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1206 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1207 : }
1208 : } else {
1209 : // Normal message
1210 6 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1211 6 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1212 6 : if (toReplace == CommitKey::TID) {
1213 : // Avoid to replace fileId in client
1214 6 : it->second->body["fileId"] = "";
1215 : }
1216 : // Remove reactions
1217 6 : if (sharedCommit->body.at(toReplace).empty())
1218 3 : it->second->reactions.clear();
1219 6 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1220 : }
1221 7 : }
1222 7 : } else {
1223 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1224 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1225 : }
1226 9 : }
1227 :
1228 : bool
1229 9478 : Conversation::Impl::handleMessage(History& history,
1230 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1231 : bool messageReceived) const
1232 : {
1233 9478 : if (messageReceived) {
1234 : // For a received message, we place it at the beginning of the list
1235 1200 : if (!history.messageList.empty())
1236 958 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1237 1200 : history.messageList.emplace_front(sharedCommit);
1238 : } else {
1239 : // For a loaded message, we load from newest to oldest
1240 : // So we change the parent of the last message.
1241 8278 : if (!history.messageList.empty())
1242 6636 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1243 8265 : history.messageList.emplace_back(sharedCommit);
1244 : }
1245 : // Handle pending reactions/editions
1246 9475 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1247 9478 : if (reactIt != history.pendingReactions.end()) {
1248 0 : for (const auto& commitBody : reactIt->second)
1249 0 : sharedCommit->reactions.emplace_back(commitBody);
1250 0 : history.pendingReactions.erase(reactIt);
1251 : }
1252 9479 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1253 9475 : if (peditIt != history.pendingEditions.end()) {
1254 0 : auto oldBody = sharedCommit->body;
1255 0 : if (sharedCommit->type == CommitType::DATA_TRANSFER) {
1256 0 : sharedCommit->body[CommitKey::TID] = peditIt->second.front()->body[CommitKey::TID];
1257 0 : sharedCommit->body["fileId"] = "";
1258 : } else {
1259 0 : sharedCommit->body[CommitKey::BODY] = peditIt->second.front()->body[CommitKey::BODY];
1260 : }
1261 0 : peditIt->second.pop_front();
1262 0 : for (const auto& commit : peditIt->second) {
1263 0 : sharedCommit->editions.emplace_back(commit->body);
1264 : }
1265 0 : sharedCommit->editions.emplace_back(oldBody);
1266 0 : history.pendingEditions.erase(peditIt);
1267 0 : }
1268 : // Announce to client
1269 9476 : if (messageReceived)
1270 1200 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
1271 9481 : return !messageReceived;
1272 : }
1273 :
1274 : void
1275 9491 : Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message, History& history) const
1276 : {
1277 9491 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1278 9480 : auto currentMessage = message;
1279 :
1280 21472 : while (parentIt != history.quickAccess.end()) {
1281 11989 : const auto& parent = parentIt->second;
1282 13016 : for (const auto& [peer, value] : message->status) {
1283 11928 : auto parentStatusIt = parent->status.find(peer);
1284 11916 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1285 1026 : parent->status[peer] = value;
1286 2051 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
1287 1025 : repository_->id(),
1288 : peer,
1289 1026 : parent->id,
1290 : value);
1291 10894 : } else if (parentStatusIt->second >= value) {
1292 10896 : break;
1293 : }
1294 : }
1295 11988 : currentMessage = parent;
1296 11992 : parentIt = history.quickAccess.find(parent->linearizedParent);
1297 : }
1298 9489 : }
1299 :
1300 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1301 9486 : Conversation::Impl::addToHistory(History& history,
1302 : const std::vector<std::map<std::string, std::string>>& commits,
1303 : bool messageReceived,
1304 : bool commitFromSelf)
1305 : {
1306 : //
1307 : // NOTE: This function makes the following assumptions on its arguments:
1308 : // - The messages in "history" are in reverse chronological order (newest message
1309 : // first, oldest message last).
1310 : // - If messageReceived is true, then the commits in "commits" are assumed to be in
1311 : // chronological order (oldest to newest) and to be newer than the ones in "history".
1312 : // They are therefore inserted at the beginning of the message list.
1313 : // - If messageReceived is false, then the commits in "commits" are assumed to be in
1314 : // reverse chronological order (newest to oldest) and to be older than the ones in
1315 : // "history". They are therefore appended at the end of the message list.
1316 : //
1317 9486 : auto acc = account_.lock();
1318 9480 : if (!acc)
1319 0 : return {};
1320 9476 : auto username = acc->getUsername();
1321 9478 : if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1322 0 : std::unique_lock lk(history.mutex);
1323 0 : history.cv.wait(lk, [&] { return !history.loading; });
1324 0 : }
1325 :
1326 : // Only set messages' status on history for client
1327 9478 : bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1328 :
1329 9478 : std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1330 18970 : for (const auto& commit : commits) {
1331 9515 : auto commitId = commit.at("id");
1332 9504 : if (history.quickAccess.find(commitId) != history.quickAccess.end())
1333 1 : continue; // Already present
1334 9510 : auto typeIt = commit.find(CommitKey::TYPE);
1335 : // Nothing to show for the client, skip
1336 9496 : if (typeIt != commit.end() && typeIt->second == CommitType::MERGE)
1337 21 : continue;
1338 :
1339 9476 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1340 9489 : sharedCommit->fromMapStringString(commit);
1341 :
1342 9493 : if (needToSetMessageStatus) {
1343 914 : std::lock_guard lk(messageStatusMtx_);
1344 : // Check if we already have status information for the commit.
1345 914 : auto itFuture = futureStatus.find(sharedCommit->id);
1346 914 : if (itFuture != futureStatus.end()) {
1347 9 : sharedCommit->status = std::move(itFuture->second);
1348 9 : futureStatus.erase(itFuture);
1349 : }
1350 914 : }
1351 9493 : sharedCommits.emplace_back(sharedCommit);
1352 9514 : }
1353 :
1354 9470 : if (needToSetMessageStatus) {
1355 908 : constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1356 908 : constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1357 908 : constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1358 :
1359 908 : std::lock_guard lk(messageStatusMtx_);
1360 13477 : for (const auto& member : repository_->members()) {
1361 : // For each member, we iterate over the commits to add in reverse chronological
1362 : // order (i.e. from newest to oldest) and set their status from the point of view
1363 : // of that member (as best we can given the information we have).
1364 : //
1365 : // The key assumption made in order to compute the status is that it can never decrease
1366 : // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1367 : // therefore start by setting the "status" variable below to the lowest possible value,
1368 : // and increase it when we encounter a commit for which it is justified to do so.
1369 : //
1370 : // If messageReceived is true, then the commits we are adding are the most recent in the
1371 : // conversation history, so the lowest possible value is SENDING.
1372 : //
1373 : // If messageReceived is false, then the commits we are adding are older than the ones
1374 : // that are already in the history, so the lowest possible value is the status of the
1375 : // oldest message in the history so far, which is stored in memberToStatus.
1376 12573 : auto status = SENDING;
1377 12573 : if (!messageReceived) {
1378 12 : auto cache = memberToStatus[member.uri];
1379 12 : if (cache > status)
1380 9 : status = cache;
1381 : }
1382 12573 : auto& messagesStatus = messagesStatus_[member.uri];
1383 :
1384 25135 : for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1385 12581 : auto sharedCommit = *it;
1386 12585 : auto previousStatus = status;
1387 12585 : auto& commitStatus = sharedCommit->status[member.uri];
1388 :
1389 : // Compute status for the current commit.
1390 37713 : if (status < SENT && messagesStatus["fetched"] == sharedCommit->id) {
1391 9 : status = SENT;
1392 : }
1393 37716 : if (messagesStatus["read"] == sharedCommit->id) {
1394 1 : status = DISPLAYED;
1395 : }
1396 37711 : if (member.uri == sharedCommit->body.at("author")) {
1397 913 : status = DISPLAYED;
1398 : }
1399 12566 : if (status < commitStatus) {
1400 0 : status = commitStatus;
1401 : }
1402 :
1403 : // Store computed value.
1404 12566 : commitStatus = status;
1405 :
1406 : // Update messagesStatus_ if needed.
1407 12566 : if (previousStatus == SENDING && status >= SENT) {
1408 2727 : messagesStatus["fetched"] = sharedCommit->id;
1409 : }
1410 12566 : if (previousStatus <= SENT && status == DISPLAYED) {
1411 2703 : messagesStatus["read"] = sharedCommit->id;
1412 : }
1413 12566 : }
1414 :
1415 12569 : if (!messageReceived) {
1416 : // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1417 12 : memberToStatus[member.uri] = status;
1418 : }
1419 907 : }
1420 909 : }
1421 :
1422 9471 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1423 18963 : for (const auto& sharedCommit : sharedCommits) {
1424 9490 : history.quickAccess[sharedCommit->id] = sharedCommit;
1425 :
1426 18990 : auto reactToIt = sharedCommit->body.find(CommitKey::REACT_TO);
1427 18973 : auto editIt = sharedCommit->body.find(CommitKey::EDIT);
1428 9480 : if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1429 3 : handleReaction(history, sharedCommit);
1430 9488 : } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1431 9 : handleEdition(history, sharedCommit, messageReceived);
1432 9478 : } else if (handleMessage(history, sharedCommit, messageReceived)) {
1433 8277 : messages.emplace_back(sharedCommit);
1434 : }
1435 9493 : rectifyStatus(sharedCommit, history);
1436 : }
1437 :
1438 9483 : return messages;
1439 9482 : }
1440 :
1441 186 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1442 : ConversationMode mode,
1443 186 : const std::string& otherMember)
1444 186 : : pimpl_ {new Impl {account, mode, otherMember}}
1445 186 : {}
1446 :
1447 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
1448 18 : : pimpl_ {new Impl {account, conversationId}}
1449 18 : {}
1450 :
1451 193 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1452 : const std::string& remoteDevice,
1453 193 : const std::string& conversationId)
1454 193 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1455 193 : {}
1456 :
1457 390 : Conversation::~Conversation() {}
1458 :
1459 : std::string
1460 5871 : Conversation::id() const
1461 : {
1462 5871 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1463 : }
1464 :
1465 : void
1466 138 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1467 : {
1468 : try {
1469 138 : if (mode() == ConversationMode::ONE_TO_ONE) {
1470 : // Only authorize to add left members
1471 1 : auto initialMembers = getInitialMembers();
1472 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1473 1 : if (it == initialMembers.end()) {
1474 4 : JAMI_WARNING("Unable to add new member in one to one conversation");
1475 1 : cb(false, "");
1476 1 : return;
1477 : }
1478 1 : }
1479 0 : } catch (const std::exception& e) {
1480 0 : JAMI_WARNING("Unable to get mode: {}", e.what());
1481 0 : cb(false, "");
1482 0 : return;
1483 0 : }
1484 137 : if (isMember(contactUri, true)) {
1485 0 : JAMI_WARNING("Unable to add member {} because it's already a member", contactUri);
1486 0 : cb(false, "");
1487 0 : return;
1488 : }
1489 137 : if (isMemberBanned(contactUri)) {
1490 2 : if (pimpl_->isAdmin()) {
1491 1 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1492 1 : if (auto sthis = w.lock()) {
1493 1 : auto members = sthis->pimpl_->repository_->members();
1494 1 : auto type = sthis->pimpl_->memberBanType(contactUri);
1495 1 : if (type.empty()) {
1496 0 : cb(false, {});
1497 0 : return;
1498 : }
1499 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1500 2 : }
1501 : });
1502 : } else {
1503 4 : JAMI_WARNING("Unable to add member {} because this member is blocked", contactUri);
1504 2 : cb(false, "");
1505 : }
1506 2 : return;
1507 : }
1508 :
1509 135 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1510 135 : if (auto sthis = w.lock()) {
1511 : // Add member files and commit
1512 135 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1513 135 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1514 135 : if (not commit.empty())
1515 135 : sthis->pimpl_->announce(commit, true);
1516 135 : lk.unlock();
1517 135 : if (cb)
1518 135 : cb(!commit.empty(), commit);
1519 270 : }
1520 135 : });
1521 : }
1522 :
1523 : std::shared_ptr<dhtnet::ChannelSocket>
1524 5071 : Conversation::gitSocket(const DeviceId& deviceId) const
1525 : {
1526 5071 : return pimpl_->gitSocket(deviceId);
1527 : }
1528 :
1529 : void
1530 2284 : Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1531 : {
1532 2284 : pimpl_->addGitSocket(deviceId, socket);
1533 2284 : }
1534 :
1535 : void
1536 939 : Conversation::removeGitSocket(const DeviceId& deviceId)
1537 : {
1538 939 : pimpl_->removeGitSocket(deviceId);
1539 939 : }
1540 :
1541 : void
1542 380 : Conversation::shutdownConnections()
1543 : {
1544 : {
1545 380 : std::lock_guard lk(pimpl_->gitSocketMtx_);
1546 380 : pimpl_->gitSocketList_.clear();
1547 380 : }
1548 380 : if (pimpl_->swarmManager_)
1549 380 : pimpl_->swarmManager_->shutdown();
1550 380 : }
1551 :
1552 : void
1553 0 : Conversation::connectivityChanged()
1554 : {
1555 0 : if (pimpl_->swarmManager_)
1556 0 : pimpl_->swarmManager_->maintainBuckets();
1557 0 : }
1558 :
1559 : std::vector<jami::DeviceId>
1560 0 : Conversation::getDeviceIdList() const
1561 : {
1562 0 : return pimpl_->swarmManager_->getAllNodes();
1563 : }
1564 :
1565 : std::shared_ptr<Typers>
1566 9 : Conversation::typers() const
1567 : {
1568 9 : return pimpl_->typers_;
1569 : }
1570 :
1571 : std::vector<std::map<std::string, std::string>>
1572 0 : Conversation::getConnectivity() const
1573 : {
1574 0 : return pimpl_->getConnectivity();
1575 : }
1576 :
1577 : void
1578 1 : Conversation::Impl::voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb)
1579 : {
1580 : // Check if admin
1581 1 : if (!isAdmin()) {
1582 0 : JAMI_WARNING("You're not an admin of this repo. Unable to unblock {}", contactUri);
1583 0 : cb(false, {});
1584 0 : return;
1585 : }
1586 :
1587 : // Vote for removal
1588 1 : std::unique_lock lk(writeMtx_);
1589 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1590 1 : if (voteCommit.empty()) {
1591 0 : JAMI_WARNING("Unbanning {} failed", contactUri);
1592 0 : cb(false, "");
1593 0 : return;
1594 : }
1595 :
1596 1 : auto lastId = voteCommit;
1597 1 : std::vector<std::string> commits;
1598 1 : commits.emplace_back(voteCommit);
1599 :
1600 : // If admin, check vote
1601 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, CommitAction::UNBAN);
1602 1 : if (!resolveCommit.empty()) {
1603 1 : commits.emplace_back(resolveCommit);
1604 1 : lastId = resolveCommit;
1605 4 : JAMI_WARNING("Vote solved for unbanning {}.", contactUri);
1606 : }
1607 1 : announce(commits, true);
1608 1 : lk.unlock();
1609 1 : if (cb)
1610 1 : cb(!lastId.empty(), lastId);
1611 1 : }
1612 :
1613 : void
1614 13 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1615 : {
1616 26 : dht::ThreadPool::io().run(
1617 26 : [w = weak(), contactUri = std::move(contactUri), isDevice = std::move(isDevice), cb = std::move(cb)] {
1618 13 : if (auto sthis = w.lock()) {
1619 : // Check if admin
1620 13 : if (!sthis->pimpl_->isAdmin()) {
1621 4 : JAMI_WARNING("You're not an admin of this repo. Unable to block {}", contactUri);
1622 1 : cb(false, {});
1623 2 : return;
1624 : }
1625 :
1626 : // Get current user type
1627 12 : std::string type;
1628 12 : if (isDevice) {
1629 0 : type = "devices";
1630 : } else {
1631 12 : auto members = sthis->pimpl_->repository_->members();
1632 26 : for (const auto& member : members) {
1633 26 : if (member.uri == contactUri) {
1634 12 : if (member.role == MemberRole::INVITED) {
1635 2 : type = "invited";
1636 10 : } else if (member.role == MemberRole::ADMIN) {
1637 1 : type = "admins";
1638 9 : } else if (member.role == MemberRole::MEMBER) {
1639 9 : type = "members";
1640 : }
1641 12 : break;
1642 : }
1643 : }
1644 12 : if (type.empty()) {
1645 0 : cb(false, {});
1646 0 : return;
1647 : }
1648 12 : }
1649 :
1650 : // Vote for removal
1651 12 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1652 12 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1653 12 : if (voteCommit.empty()) {
1654 4 : JAMI_WARNING("Kicking {} failed", contactUri);
1655 2 : cb(false, "");
1656 1 : return;
1657 : }
1658 :
1659 11 : auto lastId = voteCommit;
1660 11 : std::vector<std::string> commits;
1661 11 : commits.emplace_back(voteCommit);
1662 :
1663 : // If admin, check vote
1664 22 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, CommitAction::BAN);
1665 11 : if (!resolveCommit.empty()) {
1666 11 : commits.emplace_back(resolveCommit);
1667 11 : lastId = resolveCommit;
1668 44 : JAMI_WARNING("Vote solved for {}. {} banned", contactUri, isDevice ? "Device" : "Member");
1669 11 : if (isDevice)
1670 0 : sthis->pimpl_->disconnectFromDevice(DeviceId(contactUri));
1671 : else
1672 11 : sthis->pimpl_->disconnectFromPeer(contactUri);
1673 : }
1674 :
1675 11 : sthis->pimpl_->announce(commits, true);
1676 11 : lk.unlock();
1677 11 : cb(!lastId.empty(), lastId);
1678 27 : }
1679 : });
1680 13 : }
1681 :
1682 : std::vector<std::map<std::string, std::string>>
1683 104 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1684 : {
1685 104 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1686 : }
1687 :
1688 : std::vector<std::map<std::string, std::string>>
1689 0 : Conversation::getTrackedMembers() const
1690 : {
1691 0 : return pimpl_->getTrackedMembers();
1692 : }
1693 :
1694 : std::set<std::string>
1695 2555 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1696 : {
1697 2555 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1698 : }
1699 :
1700 : std::vector<NodeId>
1701 1765 : Conversation::peersToSyncWith() const
1702 : {
1703 1765 : auto s = pimpl_->swarmManager_->getConnectedNodes();
1704 1765 : std::lock_guard lk(pimpl_->gitSocketMtx_);
1705 16991 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1706 15226 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1707 1307 : s.emplace_back(deviceId);
1708 3530 : return s;
1709 1765 : }
1710 :
1711 : bool
1712 1765 : Conversation::isBootstrapped() const
1713 : {
1714 1765 : return pimpl_->swarmManager_->isConnected();
1715 : }
1716 :
1717 : std::string
1718 16925 : Conversation::uriFromDevice(const std::string& deviceId) const
1719 : {
1720 16925 : return pimpl_->repository_->uriFromDevice(deviceId);
1721 : }
1722 :
1723 : void
1724 0 : Conversation::monitor()
1725 : {
1726 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1727 0 : }
1728 :
1729 : std::string
1730 186 : Conversation::join()
1731 : {
1732 186 : return pimpl_->repository_->join();
1733 : }
1734 :
1735 : bool
1736 9550 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1737 : {
1738 9550 : auto uriCrt = uri + ".crt"sv;
1739 19099 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::ADMINS / uriCrt)
1740 19093 : || std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::MEMBERS / uriCrt)) {
1741 7547 : return true;
1742 : }
1743 1999 : if (includeInvited) {
1744 1417 : if (std::filesystem::is_regular_file(pimpl_->repoPath_ / MemberPath::INVITED / uri)) {
1745 1137 : return true;
1746 : }
1747 279 : if (mode() == ConversationMode::ONE_TO_ONE) {
1748 3 : for (const auto& member : getInitialMembers()) {
1749 2 : if (member == uri)
1750 0 : return true;
1751 1 : }
1752 : }
1753 : }
1754 861 : return false;
1755 9545 : }
1756 :
1757 : bool
1758 6817 : Conversation::isMemberBanned(const std::string& uri) const
1759 : {
1760 6817 : return !pimpl_->memberBanType(uri).empty();
1761 : }
1762 :
1763 : bool
1764 7525 : Conversation::isDeviceBanned(const std::string& deviceId) const
1765 : {
1766 7525 : return pimpl_->isDeviceBanned(deviceId);
1767 : }
1768 :
1769 : bool
1770 6680 : Conversation::isPeerAuthorized(const std::string& uri, const std::string& deviceId, bool includeInvited) const
1771 : {
1772 6680 : return !isMemberBanned(uri) && !isDeviceBanned(deviceId) && isMember(uri, includeInvited);
1773 : }
1774 :
1775 : void
1776 139 : Conversation::createCommit(CommitMessage&& message, OnCommitCb&& onCommit, OnDoneCb&& cb)
1777 : {
1778 139 : if (!message.replyTo.empty()) {
1779 2 : if (!pimpl_->repository_->hasCommit(message.replyTo)) {
1780 4 : JAMI_ERROR("Replying to invalid commit {}", message.replyTo);
1781 1 : return;
1782 : }
1783 : }
1784 276 : dht::ThreadPool::io().run(
1785 276 : [w = weak(), message = std::move(message), onCommit = std::move(onCommit), cb = std::move(cb)] {
1786 138 : if (auto sthis = w.lock()) {
1787 138 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1788 138 : auto commit = sthis->pimpl_->repository_->commitMessage(message.toString());
1789 138 : lk.unlock();
1790 138 : if (onCommit)
1791 14 : onCommit(commit);
1792 138 : sthis->pimpl_->announce(commit, true);
1793 138 : if (cb)
1794 138 : cb(!commit.empty(), commit);
1795 276 : }
1796 138 : });
1797 : }
1798 :
1799 : bool
1800 14911 : Conversation::hasCommit(const std::string& commitId) const
1801 : {
1802 14911 : return pimpl_->repository_->hasCommit(commitId);
1803 : }
1804 :
1805 : std::optional<ConversationCommit>
1806 35 : Conversation::getCommit(const std::string& commitId) const
1807 : {
1808 35 : return pimpl_->repository_->getCommit(commitId);
1809 : }
1810 :
1811 : void
1812 2 : Conversation::loadMessages(const OnLoadMessages& cb, const LogOptions& options)
1813 : {
1814 2 : if (!cb)
1815 0 : return;
1816 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1817 2 : if (auto sthis = w.lock()) {
1818 2 : std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1819 2 : auto result = sthis->pimpl_->loadMessages(options);
1820 2 : lk.unlock();
1821 2 : cb(std::move(result));
1822 4 : }
1823 2 : });
1824 : }
1825 :
1826 : void
1827 0 : Conversation::clearCache()
1828 : {
1829 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1830 0 : pimpl_->loadedHistory_.messageList.clear();
1831 0 : pimpl_->loadedHistory_.quickAccess.clear();
1832 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1833 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1834 : {
1835 0 : std::lock_guard lk(pimpl_->messageStatusMtx_);
1836 0 : pimpl_->memberToStatus.clear();
1837 0 : }
1838 0 : }
1839 :
1840 : std::string
1841 2369 : Conversation::lastCommitId() const
1842 : {
1843 : {
1844 2369 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1845 2370 : if (!pimpl_->loadedHistory_.messageList.empty())
1846 1834 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1847 2369 : }
1848 536 : LogOptions options;
1849 536 : options.nbOfCommits = 1;
1850 536 : options.skipMerge = true;
1851 536 : History optHistory;
1852 536 : std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1853 536 : auto res = pimpl_->loadMessages(options, &optHistory);
1854 536 : if (res.empty())
1855 2 : return {};
1856 534 : return (*optHistory.messageList.begin())->id;
1857 536 : }
1858 :
1859 : bool
1860 2099 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1861 : {
1862 2099 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1863 6296 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId,
1864 4197 : std::deque<std::pair<std::string, OnPullCb>>());
1865 2098 : auto& pullcbs = it->second;
1866 0 : auto itPull = std::find_if(pullcbs.begin(), pullcbs.end(), [&](const auto& elem) {
1867 0 : return std::get<0>(elem) == commitId;
1868 4197 : });
1869 2099 : if (itPull != pullcbs.end()) {
1870 0 : JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1871 : pimpl_->toString(),
1872 : deviceId,
1873 : commitId);
1874 0 : cb(false);
1875 0 : return false;
1876 : }
1877 8394 : JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1878 2099 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1879 2099 : if (notInProgress)
1880 2097 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1881 2097 : if (auto sthis_ = w.lock())
1882 2097 : sthis_->pimpl_->pull(deviceId);
1883 2096 : });
1884 2099 : return true;
1885 2099 : }
1886 :
1887 : void
1888 2097 : Conversation::Impl::pull(const std::string& deviceId)
1889 : {
1890 2097 : auto& repo = repository_;
1891 :
1892 2097 : std::string commitId;
1893 2097 : OnPullCb cb;
1894 : while (true) {
1895 : {
1896 4195 : std::lock_guard lk(pullcbsMtx_);
1897 4195 : auto it = fetchingRemotes_.find(deviceId);
1898 4196 : if (it == fetchingRemotes_.end()) {
1899 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1900 0 : break;
1901 : }
1902 4196 : auto& pullcbs = it->second;
1903 4196 : if (pullcbs.empty()) {
1904 2097 : fetchingRemotes_.erase(it);
1905 2097 : break;
1906 : }
1907 2099 : auto& elem = pullcbs.front();
1908 2099 : commitId = std::move(std::get<0>(elem));
1909 2099 : cb = std::move(std::get<1>(elem));
1910 2099 : pullcbs.pop_front();
1911 4196 : }
1912 : // If recently fetched, the commit can already be there, so no need to do complex operations
1913 2099 : if (commitId != "" && repo->hasCommit(commitId)) {
1914 109 : cb(true);
1915 131 : continue;
1916 : }
1917 : // Pull from remote
1918 1990 : auto fetched = repo->fetch(deviceId);
1919 1990 : if (!fetched) {
1920 22 : cb(false);
1921 22 : continue;
1922 : }
1923 :
1924 1968 : auto oldHead = repo->getHead();
1925 :
1926 1968 : std::unique_lock lk(writeMtx_);
1927 : auto commits = repo->mergeHistory(deviceId,
1928 1973 : [this](const std::string& peerUri) { this->disconnectFromPeer(peerUri); });
1929 1968 : if (!commits.empty()) {
1930 901 : announce(commits);
1931 : }
1932 1968 : lk.unlock();
1933 :
1934 1968 : bool commitFound = false;
1935 1968 : if (commitId != "") {
1936 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1937 : // We need to check if we actually got it; the fact that the fetch above was
1938 : // successful doesn't guarantee that we did.
1939 1443 : for (const auto& commit : commits) {
1940 1800 : if (commit.at("id") == commitId) {
1941 892 : commitFound = true;
1942 892 : break;
1943 : }
1944 : }
1945 : } else {
1946 532 : commitFound = true;
1947 : }
1948 1967 : if (!commitFound)
1949 2173 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1950 : deviceId,
1951 : commitId);
1952 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1953 : // for commit `commitId` to other members of the swarm. It's important that we only
1954 : // send these notifications if we actually have the commit. Otherwise, we can end up
1955 : // in a situation where the members of the swarm keep sending notifications to each
1956 : // other for a commit that none of them have (note that we are unable to rule this out, as
1957 : // nothing prevents a malicious user from intentionally sending a notification with
1958 : // a fake commit ID).
1959 1968 : if (cb)
1960 1968 : cb(commitFound);
1961 :
1962 : // Announce if profile changed
1963 1968 : if (!commits.empty()) {
1964 1802 : auto diffStats = repo->diffStats("HEAD", oldHead);
1965 901 : auto changedFiles = repo->changedFiles(diffStats);
1966 901 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") != changedFiles.end()) {
1967 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(accountId_,
1968 5 : repo->id(),
1969 10 : repo->infos());
1970 : }
1971 901 : }
1972 4066 : }
1973 2097 : }
1974 :
1975 : void
1976 2097 : Conversation::sync(const std::string& member, const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1977 : {
1978 2097 : pull(deviceId, std::move(cb), commitId);
1979 2099 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1980 2099 : auto sthis = w.lock();
1981 : // For waiting request, downloadFile
1982 2100 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1983 1 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1984 2099 : }
1985 2099 : });
1986 2099 : }
1987 :
1988 : std::map<std::string, std::string>
1989 269 : Conversation::generateInvitation() const
1990 : {
1991 : // Invite the new member to the conversation
1992 269 : Json::Value root;
1993 269 : auto& metadata = root[ConversationMapKeys::METADATAS];
1994 544 : for (const auto& [k, v] : infos()) {
1995 275 : if (v.size() >= 64000) {
1996 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1997 0 : continue;
1998 0 : }
1999 275 : metadata[k] = v;
2000 269 : }
2001 269 : root[ConversationMapKeys::CONVERSATIONID] = id();
2002 1076 : return {{"application/invite+json", json::toString(root)}};
2003 538 : }
2004 :
2005 : std::string
2006 7 : Conversation::leave()
2007 : {
2008 7 : setRemovingFlag();
2009 7 : std::lock_guard lk(pimpl_->writeMtx_);
2010 14 : return pimpl_->repository_->leave();
2011 7 : }
2012 :
2013 : void
2014 10 : Conversation::setRemovingFlag()
2015 : {
2016 10 : pimpl_->isRemoving_ = true;
2017 10 : }
2018 :
2019 : bool
2020 4161 : Conversation::isRemoving()
2021 : {
2022 4161 : return pimpl_->isRemoving_;
2023 : }
2024 :
2025 : void
2026 21 : Conversation::erase()
2027 : {
2028 21 : if (pimpl_->conversationDataPath_ != "")
2029 21 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
2030 21 : if (!pimpl_->repository_)
2031 0 : return;
2032 21 : std::lock_guard lk(pimpl_->writeMtx_);
2033 21 : pimpl_->repository_->erase();
2034 21 : }
2035 :
2036 : ConversationMode
2037 1877 : Conversation::mode() const
2038 : {
2039 1877 : return pimpl_->repository_->mode();
2040 : }
2041 :
2042 : std::vector<std::string>
2043 30 : Conversation::getInitialMembers() const
2044 : {
2045 30 : return pimpl_->repository_->getInitialMembers();
2046 : }
2047 :
2048 : bool
2049 0 : Conversation::isInitialMember(const std::string& uri) const
2050 : {
2051 0 : auto members = getInitialMembers();
2052 0 : return std::find(members.begin(), members.end(), uri) != members.end();
2053 0 : }
2054 :
2055 : void
2056 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2057 : {
2058 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2059 9 : if (auto sthis = w.lock()) {
2060 9 : auto& repo = sthis->pimpl_->repository_;
2061 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
2062 9 : auto commit = repo->updateInfos(map);
2063 9 : sthis->pimpl_->announce(commit, true);
2064 9 : lk.unlock();
2065 9 : if (cb)
2066 9 : cb(!commit.empty(), commit);
2067 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
2068 9 : repo->id(),
2069 18 : repo->infos());
2070 18 : }
2071 9 : });
2072 9 : }
2073 :
2074 : std::map<std::string, std::string>
2075 315 : Conversation::infos() const
2076 : {
2077 315 : return pimpl_->repository_->infos();
2078 : }
2079 :
2080 : void
2081 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2082 : {
2083 8 : const auto& filePath = pimpl_->preferencesPath_;
2084 8 : auto prefs = map;
2085 8 : auto itLast = prefs.find(LAST_MODIFIED);
2086 8 : if (itLast != prefs.end()) {
2087 3 : std::error_code ec;
2088 3 : if (std::filesystem::is_regular_file(filePath, ec)) {
2089 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
2090 : try {
2091 1 : if (lastModified >= to_int<uint64_t>(itLast->second))
2092 0 : return;
2093 0 : } catch (...) {
2094 0 : return;
2095 0 : }
2096 : }
2097 3 : prefs.erase(itLast);
2098 : }
2099 :
2100 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2101 8 : msgpack::pack(file, prefs);
2102 8 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_, id(), std::move(prefs));
2103 8 : }
2104 :
2105 : std::map<std::string, std::string>
2106 106 : Conversation::preferences(bool includeLastModified) const
2107 : {
2108 : try {
2109 106 : std::map<std::string, std::string> preferences;
2110 106 : const auto& filePath = pimpl_->preferencesPath_;
2111 201 : auto file = fileutils::loadFile(filePath);
2112 11 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2113 11 : oh.get().convert(preferences);
2114 11 : if (includeLastModified)
2115 24 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
2116 11 : return preferences;
2117 200 : } catch (const std::exception& e) {
2118 95 : }
2119 94 : return {};
2120 : }
2121 :
2122 : std::vector<uint8_t>
2123 0 : Conversation::vCard() const
2124 : {
2125 : try {
2126 0 : return fileutils::loadFile(pimpl_->repoPath_ / "profile.vcf");
2127 0 : } catch (...) {
2128 0 : }
2129 0 : return {};
2130 : }
2131 :
2132 : std::shared_ptr<TransferManager>
2133 2213 : Conversation::dataTransfer() const
2134 : {
2135 2213 : return pimpl_->transferManager_;
2136 : }
2137 :
2138 : bool
2139 13 : Conversation::onFileChannelRequest(const std::string& member,
2140 : const std::string& fileId,
2141 : std::filesystem::path& path,
2142 : std::string& sha3sum) const
2143 : {
2144 13 : if (!isMember(member))
2145 0 : return false;
2146 :
2147 13 : auto sep = fileId.find('_');
2148 13 : if (sep == std::string::npos)
2149 0 : return false;
2150 :
2151 13 : auto interactionId = fileId.substr(0, sep);
2152 13 : auto commit = getCommit(interactionId);
2153 26 : if (commit == std::nullopt || commit->commitMsg.tid.empty() || commit->commitMsg.sha3sum.empty()
2154 26 : || commit->commitMsg.type != CommitType::DATA_TRANSFER) {
2155 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2156 : pimpl_->accountId_,
2157 : member,
2158 : interactionId);
2159 0 : return false;
2160 : }
2161 :
2162 13 : path = dataTransfer()->path(fileId);
2163 13 : sha3sum = commit->commitMsg.sha3sum;
2164 :
2165 13 : return true;
2166 13 : }
2167 :
2168 : bool
2169 14 : Conversation::downloadFile(const std::string& interactionId,
2170 : const std::string& fileId,
2171 : const std::string& path,
2172 : const std::string&,
2173 : const std::string& deviceId)
2174 : {
2175 14 : auto commit = getCommit(interactionId);
2176 14 : if (commit == std::nullopt || commit->commitMsg.type != CommitType::DATA_TRANSFER) {
2177 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2178 0 : return false;
2179 : }
2180 14 : auto tid = commit->commitMsg.tid;
2181 14 : auto sha3sum = commit->commitMsg.sha3sum;
2182 14 : auto totalSize = commit->commitMsg.totalSize;
2183 :
2184 14 : if (tid.empty() || sha3sum.empty() || totalSize < 0) {
2185 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2186 0 : return false;
2187 : }
2188 :
2189 : // Be sure to not lock conversation
2190 14 : dht::ThreadPool().io().run([w = weak(), deviceId, fileId, interactionId, sha3sum, path, totalSize] {
2191 14 : if (auto shared = w.lock()) {
2192 14 : std::filesystem::path filePath(path);
2193 14 : if (filePath.empty()) {
2194 0 : filePath = shared->dataTransfer()->path(fileId);
2195 : }
2196 :
2197 14 : std::error_code ec;
2198 14 : if (std::filesystem::file_size(filePath, ec) == static_cast<size_t>(totalSize)) {
2199 1 : if (fileutils::sha3File(filePath) == sha3sum) {
2200 4 : JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2201 1 : return;
2202 : }
2203 : }
2204 :
2205 13 : std::filesystem::path tempFilePath(filePath);
2206 13 : tempFilePath += ".tmp";
2207 13 : auto start = std::filesystem::file_size(tempFilePath, ec);
2208 13 : if (ec || start == static_cast<decltype(start)>(-1)) {
2209 11 : start = 0;
2210 : }
2211 13 : size_t end = 0;
2212 :
2213 13 : auto acc = shared->pimpl_->account_.lock();
2214 13 : if (!acc)
2215 0 : return;
2216 13 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2217 13 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2218 28 : }
2219 : });
2220 14 : return true;
2221 14 : }
2222 :
2223 : void
2224 1110 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2225 : {
2226 1110 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2227 1110 : auto sthis = w.lock();
2228 1110 : if (!sthis)
2229 0 : return;
2230 : // Update fetched for Uri
2231 1110 : auto uri = sthis->uriFromDevice(deviceId);
2232 1110 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2233 44 : return;
2234 : // When a user fetches a commit, the message is sent for this person
2235 1066 : sthis->pimpl_->updateStatus(uri,
2236 : libjami::Account::MessageStates::SENT,
2237 1066 : commitId,
2238 2132 : std::to_string(std::time(nullptr)),
2239 : true);
2240 1154 : });
2241 1110 : }
2242 :
2243 : void
2244 1110 : Conversation::Impl::updateStatus(const std::string& uri,
2245 : libjami::Account::MessageStates st,
2246 : const std::string& commitId,
2247 : const std::string& ts,
2248 : bool emit)
2249 : {
2250 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer
2251 : // send us a status and will emit to other connected devices.
2252 1110 : LogOptions options;
2253 1110 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2254 : {
2255 : // Update internal structures.
2256 1110 : std::lock_guard lk(messageStatusMtx_);
2257 1110 : auto& status = messagesStatus_[uri];
2258 1110 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2259 1110 : if (oldStatus == commitId)
2260 2 : return; // Nothing to do
2261 1108 : options.to = oldStatus;
2262 1108 : options.from = commitId;
2263 1108 : oldStatus = commitId;
2264 1108 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2265 1108 : saveStatus();
2266 1108 : if (emit)
2267 1079 : newStatus[uri].insert(status.begin(), status.end());
2268 1110 : }
2269 1108 : if (emit && messageStatusCb_) {
2270 1079 : messageStatusCb_(newStatus);
2271 : }
2272 : // Update messages status for all commit between the old and new one
2273 1108 : options.logIfNotFound = false;
2274 1108 : options.fastLog = true;
2275 1108 : History optHistory;
2276 1108 : std::unique_lock lk(optHistory.mutex); // Avoid to announce messages while updating status.
2277 1108 : auto res = loadMessages(options, &optHistory);
2278 1108 : std::unique_lock mlk(messageStatusMtx_);
2279 1108 : std::vector<std::pair<std::string, int32_t>> statusToUpdate;
2280 1108 : if (res.size() == 0) {
2281 : // In this case, commit is not received yet, so we cache it
2282 7 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2283 : }
2284 8828 : for (const auto& [cid, _] : optHistory.quickAccess) {
2285 7720 : auto message = loadedHistory_.quickAccess.find(cid);
2286 7720 : if (message != loadedHistory_.quickAccess.end()) {
2287 : // Update message and emit to client,
2288 2987 : if (static_cast<int32_t>(st) > message->second->status[uri]) {
2289 2984 : message->second->status[uri] = static_cast<int32_t>(st);
2290 2984 : statusToUpdate.emplace_back(cid, static_cast<int32_t>(st));
2291 : }
2292 : } else {
2293 : // In this case, commit is not loaded by client, so we cache it
2294 : // No need to emit to client, they will get a correct status on load.
2295 4733 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2296 : }
2297 : }
2298 1108 : mlk.unlock();
2299 1108 : lk.unlock();
2300 4092 : for (const auto& [cid, status] : statusToUpdate)
2301 5968 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(accountId_,
2302 2984 : repository_->id(),
2303 : uri,
2304 : cid,
2305 : static_cast<int>(status));
2306 1112 : }
2307 :
2308 : bool
2309 17 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2310 : {
2311 17 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2312 51 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2313 2 : return false; // Nothing to do
2314 15 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2315 15 : auto sthis = w.lock();
2316 15 : if (!sthis)
2317 0 : return;
2318 15 : sthis->pimpl_->updateStatus(uri,
2319 : libjami::Account::MessageStates::DISPLAYED,
2320 15 : interactionId,
2321 30 : std::to_string(std::time(nullptr)),
2322 : true);
2323 15 : });
2324 15 : return true;
2325 17 : }
2326 :
2327 : std::map<std::string, std::map<std::string, std::string>>
2328 90 : Conversation::messageStatus() const
2329 : {
2330 90 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2331 180 : return pimpl_->messagesStatus_;
2332 90 : }
2333 :
2334 : void
2335 47 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2336 : {
2337 47 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2338 47 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2339 101 : for (const auto& [uri, status] : messageStatus) {
2340 54 : auto& oldMs = pimpl_->messagesStatus_[uri];
2341 346 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2342 107 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2343 0 : stVec.emplace_back(libjami::Account::MessageStates::SENT,
2344 : uri,
2345 25 : status.at("fetched"),
2346 75 : status.at("fetched_ts"));
2347 : }
2348 : }
2349 194 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2350 12 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2351 0 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED,
2352 : uri,
2353 4 : status.at("read"),
2354 12 : status.at("read_ts"));
2355 : }
2356 : }
2357 : }
2358 47 : lk.unlock();
2359 :
2360 76 : for (const auto& [status, uri, commitId, ts] : stVec) {
2361 29 : pimpl_->updateStatus(uri, status, commitId, ts);
2362 : }
2363 47 : }
2364 :
2365 : void
2366 390 : Conversation::onMessageStatusChanged(
2367 : const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2368 : {
2369 390 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2370 390 : pimpl_->messageStatusCb_ = cb;
2371 390 : }
2372 :
2373 : #ifdef LIBJAMI_TEST
2374 : void
2375 513 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2376 : {
2377 513 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2378 513 : pimpl_->bootstrapCbTest_ = cb;
2379 513 : }
2380 :
2381 : std::vector<libjami::SwarmMessage>
2382 0 : Conversation::loadMessagesSync(const LogOptions& options)
2383 : {
2384 0 : std::lock_guard lk(pimpl_->loadedHistory_.mutex);
2385 0 : auto result = pimpl_->loadMessages(options);
2386 0 : return result;
2387 0 : }
2388 :
2389 : void
2390 0 : Conversation::announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf)
2391 : {
2392 0 : pimpl_->announce(commits, commitFromSelf);
2393 0 : }
2394 :
2395 : void
2396 0 : Conversation::announce(const std::string& commitId, bool commitFromSelf)
2397 : {
2398 0 : pimpl_->announce(commitId, commitFromSelf);
2399 0 : }
2400 : #endif
2401 :
2402 : void
2403 512 : Conversation::bootstrap(std::function<void()> onBootstrapped, const std::vector<DeviceId>& knownDevices)
2404 : {
2405 512 : std::lock_guard lock(pimpl_->bootstrapMtx_);
2406 512 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2407 0 : return;
2408 : // Bootstrap the DRT from currently known devices. Since the per-device
2409 : // presence monitoring (monitorConnection/startTracking below), callers do
2410 : // not pass any device list here: candidates are injected as members get
2411 : // online, through addKnownDevices() with devices reported by the
2412 : // PresenceManager (i.e. announced on the DHT), and rotated on connection
2413 : // failure. The knownDevices parameter remains for callers/tests that
2414 : // already hold a list of live devices.
2415 : // If a connection succeeds, onConnectionChanged will be called with ok=true
2416 512 : pimpl_->bootstrapCb_ = std::move(onBootstrapped);
2417 512 : std::vector<DeviceId> devices = knownDevices;
2418 2048 : JAMI_DEBUG("{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2419 :
2420 512 : if (!devices.empty()) {
2421 0 : pimpl_->swarmManager_->setKnownNodes(devices);
2422 : }
2423 :
2424 512 : pimpl_->monitorConnection(weak_from_this());
2425 :
2426 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic
2427 : // connectivity change
2428 512 : if (pimpl_->swarmManager_->isShutdown()) {
2429 22 : pimpl_->swarmManager_->restart();
2430 22 : pimpl_->swarmManager_->maintainBuckets();
2431 : }
2432 512 : }
2433 :
2434 : void
2435 1569 : Conversation::addKnownDevices(const std::vector<DeviceId>& devices, const std::string& memberUri)
2436 : {
2437 1569 : if (devices.empty())
2438 892 : return;
2439 677 : if (!memberUri.empty()) {
2440 : // JAMI_WARNING("{} Adding {} known devices for member {}", pimpl_->toString(), devices.size(), memberUri);
2441 677 : std::lock_guard lk(pimpl_->trackedMembersMtx_);
2442 677 : auto it = pimpl_->trackedMembers_.find(memberUri);
2443 677 : if (it != pimpl_->trackedMembers_.end()) {
2444 670 : it->second.devices.insert(devices.begin(), devices.end());
2445 : }
2446 677 : } else {
2447 0 : JAMI_ERROR("{} Adding {} known devices without member URI", pimpl_->toString(), devices.size());
2448 : }
2449 677 : pimpl_->swarmManager_->setKnownNodes(devices);
2450 : }
2451 :
2452 : void
2453 559 : Conversation::connectNode(const DeviceId& deviceId)
2454 : {
2455 559 : pimpl_->swarmManager_->connectNode(deviceId);
2456 559 : }
2457 :
2458 : std::vector<std::string>
2459 18 : Conversation::commitsEndedCalls()
2460 : {
2461 18 : pimpl_->loadActiveCalls();
2462 18 : pimpl_->loadHostedCalls();
2463 18 : auto commits = pimpl_->commitsEndedCalls();
2464 18 : if (!commits.empty()) {
2465 : // Announce to client
2466 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2467 0 : if (auto sthis = w.lock())
2468 0 : sthis->pimpl_->announce(commits, true);
2469 0 : });
2470 : }
2471 18 : return commits;
2472 0 : }
2473 :
2474 : void
2475 390 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2476 : {
2477 390 : pimpl_->onMembersChanged_ = std::move(cb);
2478 390 : }
2479 :
2480 : void
2481 390 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2482 : {
2483 780 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2484 : w = weak()](const std::string& deviceId, ChannelCb&& cb, bool noNewSocket) {
2485 1029 : if (auto sthis = w.lock()) {
2486 1030 : auto wrappedCb = [cb = std::move(cb), w, deviceId](const std::shared_ptr<dhtnet::ChannelSocket>& socket) {
2487 1027 : if (auto sthis = w.lock()) {
2488 1025 : if (!socket) {
2489 271 : if (auto acc = sthis->pimpl_->account_.lock()) {
2490 274 : if (auto cert = acc->certStore().getCertificate(deviceId)) {
2491 275 : sthis->pimpl_->onConnectionFailed(DeviceId(deviceId), cert->issuer->getId().toString());
2492 : } else {
2493 0 : JAMI_WARNING("{} Unable to get member URI from device ID {}",
2494 : sthis->pimpl_->toString(),
2495 : deviceId);
2496 275 : }
2497 : } else {
2498 0 : return false;
2499 275 : }
2500 : }
2501 1030 : }
2502 1030 : return cb(socket);
2503 1030 : };
2504 2060 : needSocket(sthis->id(), deviceId, std::move(wrappedCb), "application/im-gitmessage-id", noNewSocket);
2505 2058 : }
2506 1420 : };
2507 390 : }
2508 :
2509 : void
2510 933 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2511 : {
2512 933 : auto deviceId = channel->deviceId();
2513 : // Transmit avatar if necessary
2514 : // We do this here, because at this point we know both sides are connected and in
2515 : // the same conversation
2516 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2517 932 : auto cert = channel->peerCertificate();
2518 932 : if (!cert || !cert->issuer)
2519 0 : return;
2520 932 : auto member = cert->issuer->getId().toString();
2521 933 : pimpl_->swarmManager_->addChannel(std::move(channel));
2522 933 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2523 933 : auto sthis = w.lock();
2524 933 : if (auto account = a.lock()) {
2525 933 : account->sendProfile(sthis->id(), member, deviceId.toString());
2526 933 : }
2527 933 : });
2528 933 : }
2529 :
2530 : uint32_t
2531 4 : Conversation::countInteractions(const std::string& toId, const std::string& fromId, const std::string& authorUri) const
2532 : {
2533 4 : LogOptions options;
2534 4 : options.to = toId;
2535 4 : options.from = fromId;
2536 4 : options.authorUri = authorUri;
2537 4 : options.logIfNotFound = false;
2538 4 : options.fastLog = true;
2539 4 : History history;
2540 4 : std::lock_guard lk(history.mutex);
2541 4 : auto res = pimpl_->loadMessages(options, &history);
2542 8 : return res.size();
2543 4 : }
2544 :
2545 : void
2546 4 : Conversation::search(uint32_t req, const Filter& filter, const std::shared_ptr<std::atomic_int>& flag) const
2547 : {
2548 : // Because logging a conversation can take quite some time,
2549 : // do it asynchronously
2550 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2551 4 : if (auto sthis = w.lock()) {
2552 4 : History history;
2553 4 : std::vector<std::map<std::string, std::string>> commits {};
2554 : // std::regex_constants::ECMAScript is the default flag.
2555 4 : auto re = std::regex(filter.regexSearch,
2556 4 : filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2557 12 : sthis->pimpl_->repository_->log(
2558 8 : [&](const std::string& /*id*/, const GitAuthor& author, const GitCommit& commit) {
2559 20 : if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
2560 : // Filter author
2561 0 : return CallbackResult::Skip;
2562 : }
2563 20 : auto commitTime = git_commit_time(commit.get());
2564 20 : if (filter.before && filter.before < commitTime) {
2565 : // Only get commits before this date
2566 0 : return CallbackResult::Skip;
2567 : }
2568 20 : if (filter.after && filter.after > commitTime) {
2569 : // Only get commits before this date
2570 0 : if (git_commit_parentcount(commit.get()) <= 1)
2571 0 : return CallbackResult::Break;
2572 : else
2573 0 : return CallbackResult::Skip; // Because we are sorting it with
2574 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2575 : }
2576 :
2577 20 : return CallbackResult::Ok; // Continue
2578 : },
2579 8 : [&](ConversationCommit&& cc) {
2580 20 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2581 80 : sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2582 40 : },
2583 8 : [&](const std::string& id, const GitAuthor&, ConversationCommit&) {
2584 20 : if (id == filter.lastId)
2585 0 : return true;
2586 20 : return false;
2587 : },
2588 : "",
2589 : false);
2590 : // Search on generated history
2591 24 : for (auto& message : history.messageList) {
2592 20 : auto contentType = message->type;
2593 20 : auto isSearchable = contentType == CommitType::TEXT || contentType == CommitType::DATA_TRANSFER;
2594 20 : if (filter.type.empty() && !isSearchable) {
2595 : // Not searchable, at least for now
2596 8 : continue;
2597 12 : } else if (contentType == filter.type || filter.type.empty()) {
2598 12 : if (isSearchable) {
2599 : // If it's a text match the body, else the display name
2600 60 : auto body = contentType == CommitType::TEXT ? message->body.at(CommitKey::BODY)
2601 36 : : message->body.at(CommitKey::DISPLAY_NAME);
2602 12 : std::smatch body_match;
2603 12 : if (std::regex_search(body, body_match, re)) {
2604 5 : auto commit = message->body;
2605 10 : commit["id"] = message->id;
2606 10 : commit["type"] = message->type;
2607 5 : commits.emplace_back(commit);
2608 5 : }
2609 12 : } else {
2610 : // Matching type, just add it to the results
2611 0 : commits.emplace_back(message->body);
2612 : }
2613 :
2614 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2615 0 : break;
2616 : }
2617 20 : }
2618 :
2619 4 : if (commits.size() > 0)
2620 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2621 3 : sthis->pimpl_->accountId_,
2622 6 : sthis->id(),
2623 3 : std::move(commits));
2624 : // If we're the latest thread, inform client that the search is finished
2625 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2626 8 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2627 8 : req, sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2628 : }
2629 8 : }
2630 4 : });
2631 4 : }
2632 :
2633 : void
2634 14 : Conversation::hostConference(CommitMessage&& message, OnDoneCb&& cb)
2635 : {
2636 14 : if (message.confId.empty()) {
2637 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2638 0 : return;
2639 : }
2640 :
2641 14 : auto now = std::chrono::system_clock::now();
2642 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2643 : {
2644 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2645 14 : pimpl_->hostedCalls_[message.confId] = nowSecs;
2646 14 : pimpl_->saveHostedCalls();
2647 14 : }
2648 :
2649 14 : createCommit(std::move(message), {}, std::move(cb));
2650 : }
2651 :
2652 : bool
2653 20 : Conversation::isHosting(const std::string& confId) const
2654 : {
2655 20 : auto info = infos();
2656 64 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2657 0 : return true; // We are the current device Host
2658 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2659 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2660 20 : }
2661 :
2662 : void
2663 10 : Conversation::removeActiveConference(CommitMessage&& message, OnDoneCb&& cb)
2664 : {
2665 10 : if (message.confId.empty()) {
2666 0 : JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2667 0 : return;
2668 : }
2669 :
2670 10 : auto erased = false;
2671 : {
2672 10 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2673 10 : erased = pimpl_->hostedCalls_.erase(message.confId);
2674 10 : }
2675 10 : if (erased) {
2676 10 : pimpl_->saveHostedCalls();
2677 10 : createCommit(std::move(message), {}, std::move(cb));
2678 : } else
2679 0 : cb(false, "");
2680 : }
2681 :
2682 : std::vector<std::map<std::string, std::string>>
2683 38 : Conversation::currentCalls() const
2684 : {
2685 38 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2686 76 : return pimpl_->activeCalls_;
2687 38 : }
2688 : } // namespace jami
|