Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation_module.h"
19 :
20 : #include "account_const.h"
21 : #include "call.h"
22 : #include "client/jami_signal.h"
23 : #include "fileutils.h"
24 : #include "jamidht/account_manager.h"
25 : #include "jamidht/jamiaccount.h"
26 : #include "manager.h"
27 : #include "sip/sipcall.h"
28 : #include "vcard.h"
29 : #include "json_utils.h"
30 :
31 : #include <opendht/thread_pool.h>
32 : #include <dhtnet/certstore.h>
33 :
34 : #include <algorithm>
35 : #include <fstream>
36 :
37 : namespace jami {
38 :
39 : using ConvInfoMap = std::map<std::string, ConvInfo>;
40 :
41 : struct PendingConversationFetch
42 : {
43 : bool ready {false};
44 : bool cloning {false};
45 : std::string deviceId {};
46 : std::string removeId {};
47 : std::map<std::string, std::string> preferences {};
48 : std::map<std::string, std::map<std::string, std::string>> status {};
49 : std::set<std::string> connectingTo {};
50 : std::shared_ptr<dhtnet::ChannelSocket> socket {};
51 : };
52 :
53 : constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s};
54 :
55 : struct SyncedConversation
56 : {
57 : std::mutex mtx;
58 : std::unique_ptr<asio::steady_timer> fallbackClone;
59 : std::chrono::seconds fallbackTimer {5s};
60 : ConvInfo info;
61 : std::unique_ptr<PendingConversationFetch> pending;
62 : std::shared_ptr<Conversation> conversation;
63 :
64 377 : SyncedConversation(const std::string& convId)
65 377 : : info {convId}
66 : {
67 377 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
68 377 : }
69 28 : SyncedConversation(const ConvInfo& info)
70 28 : : info {info}
71 : {
72 28 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
73 28 : }
74 :
75 2545 : bool startFetch(const std::string& deviceId, bool checkIfConv = false)
76 : {
77 : // conversation mtx must be locked
78 2545 : if (checkIfConv && conversation)
79 9 : return false; // Already a conversation
80 2536 : if (pending) {
81 391 : if (pending->ready)
82 5 : return false; // Already doing stuff
83 : // if (pending->deviceId == deviceId)
84 : // return false; // Already fetching
85 385 : if (pending->connectingTo.find(deviceId) != pending->connectingTo.end())
86 214 : return false; // Already connecting to this device
87 : } else {
88 2145 : pending = std::make_unique<PendingConversationFetch>();
89 2146 : pending->connectingTo.insert(deviceId);
90 2146 : return true;
91 : }
92 172 : return true;
93 : }
94 :
95 1469 : void stopFetch(const std::string& deviceId)
96 : {
97 : // conversation mtx must be locked
98 1469 : if (!pending)
99 97 : return;
100 1372 : pending->connectingTo.erase(deviceId);
101 1371 : if (pending->connectingTo.empty())
102 1299 : pending.reset();
103 : }
104 :
105 114 : std::vector<std::map<std::string, std::string>> getMembers(bool includeLeft, bool includeBanned) const
106 : {
107 : // conversation mtx must be locked
108 114 : if (conversation)
109 100 : return conversation->getMembers(true, includeLeft, includeBanned);
110 : // If we're cloning, we can return the initial members
111 14 : std::vector<std::map<std::string, std::string>> result;
112 14 : result.reserve(info.members.size());
113 42 : for (const auto& uri : info.members) {
114 56 : result.emplace_back(std::map<std::string, std::string> {{"uri", uri}});
115 : }
116 14 : return result;
117 14 : }
118 : };
119 :
120 : class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
121 : {
122 : public:
123 : Impl(std::shared_ptr<JamiAccount>&& account,
124 : std::shared_ptr<AccountManager>&& accountManager,
125 : NeedsSyncingCb&& needsSyncingCb,
126 : SengMsgCb&& sendMsgCb,
127 : NeedSocketCb&& onNeedSocket,
128 : NeedSocketCb&& onNeedSwarmSocket,
129 : OneToOneRecvCb&& oneToOneRecvCb);
130 :
131 : template<typename S, typename T>
132 114 : inline auto withConv(const S& convId, T&& cb) const
133 : {
134 228 : if (auto conv = getConversation(convId)) {
135 114 : std::lock_guard lk(conv->mtx);
136 114 : return cb(*conv);
137 114 : } else {
138 0 : JAMI_WARNING("Conversation {} not found", convId);
139 : }
140 0 : return decltype(cb(std::declval<SyncedConversation&>()))();
141 : }
142 : template<typename S, typename T>
143 1786 : inline auto withConversation(const S& convId, T&& cb)
144 : {
145 3573 : if (auto conv = getConversation(convId)) {
146 1779 : std::lock_guard lk(conv->mtx);
147 1780 : if (conv->conversation)
148 1757 : return cb(*conv->conversation);
149 1780 : } else {
150 28 : JAMI_WARNING("Conversation {} not found", convId);
151 : }
152 29 : return decltype(cb(std::declval<Conversation&>()))();
153 : }
154 :
155 : // Retrieving recent commits
156 : /**
157 : * Clone a conversation (initial) from device
158 : * @param deviceId
159 : * @param convId
160 : */
161 : void cloneConversation(const std::string& deviceId, const std::string& peer, const std::string& convId);
162 : void cloneConversation(const std::string& deviceId,
163 : const std::string& peer,
164 : const std::shared_ptr<SyncedConversation>& conv);
165 :
166 : /**
167 : * Pull remote device
168 : * @param peer Contact URI
169 : * @param deviceId Contact's device
170 : * @param conversationId
171 : * @param commitId (optional)
172 : */
173 : void fetchNewCommits(const std::string& peer,
174 : const std::string& deviceId,
175 : const std::string& conversationId,
176 : const std::string& commitId = "");
177 : /**
178 : * Handle events to receive new commits
179 : */
180 : void handlePendingConversation(const std::string& conversationId, const std::string& deviceId);
181 :
182 : // Requests
183 : std::optional<ConversationRequest> getRequest(const std::string& id) const;
184 :
185 : // Conversations
186 : /**
187 : * Get members
188 : * @param conversationId
189 : * @param includeBanned
190 : * @return a map of members with their role and details
191 : */
192 : std::vector<std::map<std::string, std::string>> getConversationMembers(const std::string& conversationId,
193 : bool includeBanned = false) const;
194 : void setConversationMembers(const std::string& convId, const std::set<std::string>& members);
195 :
196 : /**
197 : * Remove a repository and all files
198 : * @param convId
199 : * @param sync If we send an update to other account's devices
200 : * @param force True if ignore the removing flag
201 : */
202 : void removeRepository(const std::string& convId, bool sync, bool force = false);
203 : void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false);
204 : /**
205 : * Remove a conversation
206 : * @param conversationId
207 : */
208 : bool removeConversation(const std::string& conversationId, bool forceRemove = false);
209 : bool removeConversationImpl(SyncedConversation& conv, bool forceRemove = false);
210 :
211 : /**
212 : * Send a message notification to all members
213 : * @param conversation
214 : * @param commit
215 : * @param sync If we send an update to other account's devices
216 : * @param deviceId If we need to filter a specific device
217 : */
218 : void sendMessageNotification(const std::string& conversationId,
219 : bool sync,
220 : const std::string& commitId = "",
221 : const std::string& deviceId = "");
222 : void sendMessageNotification(Conversation& conversation,
223 : bool sync,
224 : const std::string& commitId = "",
225 : const std::string& deviceId = "");
226 :
227 : /**
228 : * @return if a convId is a valid conversation (repository cloned & usable)
229 : */
230 507 : bool isConversation(const std::string& convId) const
231 : {
232 507 : std::lock_guard lk(conversationsMtx_);
233 507 : auto c = conversations_.find(convId);
234 1014 : return c != conversations_.end() && c->second;
235 507 : }
236 :
237 2678 : void addConvInfo(const ConvInfo& info)
238 : {
239 2678 : std::lock_guard lk(convInfosMtx_);
240 2678 : convInfos_[info.id] = info;
241 2678 : saveConvInfos();
242 2678 : }
243 :
244 : std::string getOneToOneConversation(const std::string& uri) const noexcept;
245 :
246 : bool updateConvForContact(const std::string& uri, const std::string& oldConv, const std::string& newConv);
247 :
248 114 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const
249 : {
250 114 : std::lock_guard lk(conversationsMtx_);
251 114 : auto c = conversations_.find(convId);
252 228 : return c != conversations_.end() ? c->second : nullptr;
253 114 : }
254 28716 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId)
255 : {
256 28716 : std::lock_guard lk(conversationsMtx_);
257 28722 : auto c = conversations_.find(convId);
258 57422 : return c != conversations_.end() ? c->second : nullptr;
259 28719 : }
260 385 : std::shared_ptr<SyncedConversation> startConversation(const std::string& convId)
261 : {
262 385 : std::lock_guard lk(conversationsMtx_);
263 385 : auto& c = conversations_[convId];
264 385 : if (!c)
265 359 : c = std::make_shared<SyncedConversation>(convId);
266 770 : return c;
267 385 : }
268 113 : std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info)
269 : {
270 113 : std::lock_guard lk(conversationsMtx_);
271 113 : auto& c = conversations_[info.id];
272 113 : if (!c)
273 14 : c = std::make_shared<SyncedConversation>(info);
274 226 : return c;
275 113 : }
276 2490 : std::vector<std::shared_ptr<SyncedConversation>> getSyncedConversations() const
277 : {
278 2490 : std::lock_guard lk(conversationsMtx_);
279 2489 : std::vector<std::shared_ptr<SyncedConversation>> result;
280 2490 : result.reserve(conversations_.size());
281 3846 : for (const auto& [_, c] : conversations_)
282 1359 : result.emplace_back(c);
283 4979 : return result;
284 2489 : }
285 414 : std::vector<std::shared_ptr<Conversation>> getConversations() const
286 : {
287 414 : auto conversations = getSyncedConversations();
288 413 : std::vector<std::shared_ptr<Conversation>> result;
289 414 : result.reserve(conversations.size());
290 661 : for (const auto& sc : conversations) {
291 248 : std::lock_guard lk(sc->mtx);
292 248 : if (sc->conversation)
293 175 : result.emplace_back(sc->conversation);
294 248 : }
295 825 : return result;
296 413 : }
297 :
298 : // Message send/load
299 : void sendMessage(const std::string& conversationId,
300 : Json::Value&& value,
301 : const std::string& replyTo = "",
302 : bool announce = true,
303 : OnCommitCb&& onCommit = {},
304 : OnDoneCb&& cb = {});
305 :
306 : void sendMessage(const std::string& conversationId,
307 : std::string message,
308 : const std::string& replyTo = "",
309 : const std::string& type = "text/plain",
310 : bool announce = true,
311 : OnCommitCb&& onCommit = {},
312 : OnDoneCb&& cb = {});
313 :
314 : void editMessage(const std::string& conversationId, const std::string& newBody, const std::string& editedId);
315 :
316 : void bootstrapCb(std::string convId);
317 :
318 : // The following methods modify what is stored on the disk
319 : /**
320 : * @note convInfosMtx_ should be locked
321 : */
322 3358 : void saveConvInfos() const { ConversationModule::saveConvInfos(accountId_, convInfos_); }
323 : /**
324 : * @note conversationsRequestsMtx_ should be locked
325 : */
326 498 : void saveConvRequests() const { ConversationModule::saveConvRequests(accountId_, conversationsRequests_); }
327 : void declineOtherConversationWith(const std::string& uri);
328 226 : bool addConversationRequest(const std::string& id, const ConversationRequest& req)
329 : {
330 : // conversationsRequestsMtx_ MUST BE LOCKED
331 226 : if (isConversation(id))
332 0 : return false;
333 226 : auto it = conversationsRequests_.find(id);
334 226 : if (it != conversationsRequests_.end()) {
335 : // We only remove requests (if accepted) or change .declined
336 24 : if (!req.declined)
337 20 : return false;
338 202 : } else if (req.isOneToOne()) {
339 : // Check that we're not adding a second one to one trust request
340 : // NOTE: If a new one to one request is received, we can decline the previous one.
341 67 : declineOtherConversationWith(req.from);
342 : }
343 824 : JAMI_DEBUG("[Account {}] [Conversation {}] Adding conversation request from {}", accountId_, id, req.from);
344 206 : conversationsRequests_[id] = req;
345 206 : saveConvRequests();
346 206 : return true;
347 : }
348 282 : void rmConversationRequest(const std::string& id)
349 : {
350 : // conversationsRequestsMtx_ MUST BE LOCKED
351 282 : auto it = conversationsRequests_.find(id);
352 282 : if (it != conversationsRequests_.end()) {
353 173 : auto& md = syncingMetadatas_[id];
354 173 : md = it->second.metadatas;
355 173 : md["syncing"] = "true";
356 173 : md["created"] = std::to_string(it->second.received);
357 : }
358 282 : saveMetadata();
359 282 : conversationsRequests_.erase(id);
360 282 : saveConvRequests();
361 282 : }
362 :
363 : std::weak_ptr<JamiAccount> account_;
364 : std::shared_ptr<AccountManager> accountManager_;
365 : const std::string accountId_ {};
366 : NeedsSyncingCb needsSyncingCb_;
367 : SengMsgCb sendMsgCb_;
368 : NeedSocketCb onNeedSocket_;
369 : NeedSocketCb onNeedSwarmSocket_;
370 : OneToOneRecvCb oneToOneRecvCb_;
371 :
372 : std::string deviceId_ {};
373 : std::string username_ {};
374 :
375 : // Requests
376 : mutable std::mutex conversationsRequestsMtx_;
377 : std::map<std::string, ConversationRequest> conversationsRequests_;
378 :
379 : // Conversations
380 : mutable std::mutex conversationsMtx_ {};
381 : std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_;
382 :
383 : // The following information are stored on the disk
384 : mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed
385 : std::map<std::string, ConvInfo> convInfos_;
386 :
387 : // When sending a new message, we need to send the notification to some peers of the
388 : // conversation However, the conversation may be not bootstrapped, so the list will be empty.
389 : // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync
390 : // with.
391 : std::mutex notSyncedNotificationMtx_;
392 : std::map<std::string, std::string> notSyncedNotification_;
393 :
394 3708 : std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
395 :
396 : std::mutex refreshMtx_;
397 : std::map<std::string, uint64_t> refreshMessage;
398 : std::atomic_int syncCnt {0};
399 :
400 : #ifdef LIBJAMI_TEST
401 : std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_;
402 : #endif
403 :
404 : void fixStructures(std::shared_ptr<JamiAccount> account,
405 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
406 : const std::set<std::string>& toRm);
407 :
408 : void cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
409 : const std::string& deviceId,
410 : const std::string& oldConvId = "");
411 : void bootstrap(const std::string& convId);
412 : void fallbackClone(const asio::error_code& ec, const std::string& conversationId);
413 :
414 : void cloneConversationFrom(const ConversationRequest& request);
415 :
416 : void cloneConversationFrom(const std::string& conversationId,
417 : const std::string& uri,
418 : const std::string& oldConvId = "");
419 :
420 : // While syncing, we do not want to lose metadata (avatar/title and mode)
421 : std::map<std::string, std::map<std::string, std::string>> syncingMetadatas_;
422 484 : void saveMetadata()
423 : {
424 484 : auto path = fileutils::get_data_dir() / accountId_;
425 968 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
426 968 : std::ofstream file(path / "syncingMetadatas", std::ios::trunc | std::ios::binary);
427 484 : msgpack::pack(file, syncingMetadatas_);
428 484 : }
429 :
430 668 : void loadMetadata()
431 : {
432 : try {
433 : // read file
434 668 : auto path = fileutils::get_data_dir() / accountId_;
435 1336 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
436 1336 : auto file = fileutils::loadFile("syncingMetadatas", path);
437 : // load values
438 0 : msgpack::unpacked result;
439 0 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
440 0 : result.get().convert(syncingMetadatas_);
441 2004 : } catch (const std::exception& e) {
442 2672 : JAMI_WARNING("[Account {}] [ConversationModule] unable to load syncing metadata: {}", accountId_, e.what());
443 668 : }
444 668 : }
445 : };
446 :
447 668 : ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account,
448 : std::shared_ptr<AccountManager>&& accountManager,
449 : NeedsSyncingCb&& needsSyncingCb,
450 : SengMsgCb&& sendMsgCb,
451 : NeedSocketCb&& onNeedSocket,
452 : NeedSocketCb&& onNeedSwarmSocket,
453 668 : OneToOneRecvCb&& oneToOneRecvCb)
454 668 : : account_(account)
455 668 : , accountManager_(accountManager)
456 668 : , accountId_(account->getAccountID())
457 668 : , needsSyncingCb_(needsSyncingCb)
458 668 : , sendMsgCb_(sendMsgCb)
459 668 : , onNeedSocket_(onNeedSocket)
460 668 : , onNeedSwarmSocket_(onNeedSwarmSocket)
461 1336 : , oneToOneRecvCb_(oneToOneRecvCb)
462 : {
463 668 : if (auto accm = account->accountManager())
464 668 : if (const auto* info = accm->getInfo()) {
465 668 : deviceId_ = info->deviceId;
466 668 : username_ = info->accountId;
467 668 : }
468 668 : conversationsRequests_ = convRequests(accountId_);
469 668 : loadMetadata();
470 668 : }
471 :
472 : void
473 11 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
474 : const std::string& peerUri,
475 : const std::string& convId)
476 : {
477 44 : JAMI_DEBUG("[Account {}] [Conversation {}] [device {}] Cloning conversation", accountId_, convId, deviceId);
478 :
479 11 : auto conv = startConversation(convId);
480 11 : std::unique_lock lk(conv->mtx);
481 11 : cloneConversation(deviceId, peerUri, conv);
482 11 : }
483 :
484 : void
485 71 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
486 : const std::string& peerUri,
487 : const std::shared_ptr<SyncedConversation>& conv)
488 : {
489 : // conv->mtx must be locked
490 71 : if (!conv->conversation) {
491 : // Note: here we don't return and connect to all members
492 : // the first that will successfully connect will be used for
493 : // cloning.
494 : // This avoid the case when we try to clone from convInfos + sync message
495 : // at the same time.
496 71 : if (!conv->startFetch(deviceId, true)) {
497 120 : JAMI_WARNING("[Account {}] [Conversation {}] [device {}] Already fetching conversation",
498 : accountId_,
499 : conv->info.id,
500 : deviceId);
501 30 : addConvInfo(conv->info);
502 30 : return;
503 : }
504 82 : onNeedSocket_(
505 41 : conv->info.id,
506 : deviceId,
507 41 : [w = weak(), conv, deviceId](const auto& channel) {
508 41 : std::lock_guard lk(conv->mtx);
509 41 : if (conv->pending && !conv->pending->ready) {
510 41 : if (channel) {
511 41 : conv->pending->ready = true;
512 41 : conv->pending->deviceId = channel->deviceId().toString();
513 41 : conv->pending->socket = channel;
514 41 : if (!conv->pending->cloning) {
515 41 : conv->pending->cloning = true;
516 82 : dht::ThreadPool::io().run([w, convId = conv->info.id, deviceId = conv->pending->deviceId]() {
517 82 : if (auto sthis = w.lock())
518 41 : sthis->handlePendingConversation(convId, deviceId);
519 : });
520 : }
521 41 : return true;
522 : } else {
523 0 : conv->stopFetch(deviceId);
524 : }
525 : }
526 0 : return false;
527 41 : },
528 : MIME_TYPE_GIT);
529 :
530 164 : JAMI_LOG("[Account {}] [Conversation {}] [device {}] Requesting device", accountId_, conv->info.id, deviceId);
531 41 : conv->info.members.emplace(username_);
532 41 : conv->info.members.emplace(peerUri);
533 41 : addConvInfo(conv->info);
534 : } else {
535 0 : JAMI_DEBUG("[Account {}] [Conversation {}] Conversation already cloned", accountId_, conv->info.id);
536 : }
537 : }
538 :
539 : void
540 12856 : ConversationModule::Impl::fetchNewCommits(const std::string& peer,
541 : const std::string& deviceId,
542 : const std::string& conversationId,
543 : const std::string& commitId)
544 : {
545 12856 : auto conv = getConversation(conversationId);
546 : {
547 12853 : bool needReclone = false;
548 12853 : std::unique_lock lkInfos(convInfosMtx_);
549 :
550 12853 : auto itConvInfo = convInfos_.find(conversationId);
551 12852 : if (itConvInfo != convInfos_.end() && itConvInfo->second.isRemoved()) {
552 4 : if (!conv)
553 0 : return;
554 :
555 4 : const bool isOneToOne = (itConvInfo->second.mode == ConversationMode::ONE_TO_ONE);
556 4 : auto contactInfo = accountManager_->getContactInfo(peer);
557 4 : const bool shouldReadd = isOneToOne && contactInfo && contactInfo->confirmed && contactInfo->isActive()
558 0 : && !contactInfo->isBanned() && contactInfo->added > itConvInfo->second.removed
559 8 : && contactInfo->conversationId != conversationId;
560 :
561 4 : if (shouldReadd) {
562 0 : if (conv) {
563 0 : std::unique_lock lkSynced(conv->mtx);
564 :
565 0 : if (!conv->conversation) {
566 0 : conv->info.created = std::time(nullptr);
567 0 : conv->info.erased = 0;
568 0 : convInfos_[conversationId] = conv->info;
569 0 : saveConvInfos();
570 0 : needReclone = true;
571 : }
572 0 : }
573 :
574 0 : lkInfos.unlock();
575 :
576 0 : if (needReclone && conv) {
577 : {
578 0 : std::unique_lock lkSynced(conv->mtx);
579 0 : cloneConversation(deviceId, peer, conv);
580 0 : }
581 : }
582 :
583 0 : return;
584 : }
585 :
586 16 : JAMI_WARNING("[Account {:s}] [Conversation {}] Received a commit, but conversation is removed",
587 : accountId_,
588 : conversationId);
589 4 : return;
590 4 : }
591 12851 : }
592 12849 : std::optional<ConversationRequest> oldReq;
593 : {
594 12849 : std::lock_guard lk(conversationsRequestsMtx_);
595 12852 : oldReq = getRequest(conversationId);
596 12850 : if (oldReq != std::nullopt && oldReq->declined) {
597 0 : JAMI_DEBUG("[Account {}] [Conversation {}] Received a request for a conversation already declined.",
598 : accountId_,
599 : conversationId);
600 0 : return;
601 : }
602 12849 : }
603 51399 : JAMI_DEBUG("[Account {:s}] [Conversation {}] [device {}] fetching '{:s}'",
604 : accountId_,
605 : conversationId,
606 : deviceId,
607 : commitId);
608 :
609 12849 : const bool shouldRequestInvite = username_ != peer;
610 12852 : if (!conv) {
611 148 : if (oldReq == std::nullopt && shouldRequestInvite) {
612 : // We didn't find a conversation or a request with the given ID.
613 : // This suggests that someone tried to send us an invitation but
614 : // that we didn't receive it, so we ask for a new one.
615 548 : JAMI_WARNING("[Account {}] [Conversation {}] Unable to find conversation, asking for an invite",
616 : accountId_,
617 : conversationId);
618 274 : sendMsgCb_(peer, {}, std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}}, 0);
619 : }
620 148 : return;
621 : }
622 12702 : std::unique_lock lk(conv->mtx);
623 :
624 12704 : if (conv->conversation) {
625 : // Check if we already have the commit
626 12660 : if (not commitId.empty() && conv->conversation->hasCommit(commitId)) {
627 10547 : return;
628 : }
629 2296 : if (conv->conversation->isRemoving()) {
630 0 : JAMI_WARNING("[Account {}] [Conversation {}] conversaton is being removed", accountId_, conversationId);
631 0 : return;
632 : }
633 2296 : if (!conv->conversation->isMember(peer, true)) {
634 8 : JAMI_WARNING("[Account {}] [Conversation {}] {} is not a member", accountId_, conversationId, peer);
635 2 : return;
636 : }
637 2292 : if (conv->conversation->isBanned(deviceId)) {
638 0 : JAMI_WARNING("[Account {}] [Conversation {}] device {} is banned", accountId_, conversationId, deviceId);
639 0 : return;
640 : }
641 :
642 : // Retrieve current last message
643 2294 : auto lastMessageId = conv->conversation->lastCommitId();
644 2293 : if (lastMessageId.empty()) {
645 4 : JAMI_ERROR("[Account {}] [Conversation {}] No message detected. This is a bug", accountId_, conversationId);
646 1 : return;
647 : }
648 :
649 2291 : if (!conv->startFetch(deviceId)) {
650 716 : JAMI_WARNING("[Account {}] [Conversation {}] Already fetching", accountId_, conversationId);
651 179 : return;
652 : }
653 :
654 2114 : syncCnt.fetch_add(1);
655 6339 : onNeedSocket_(
656 : conversationId,
657 : deviceId,
658 4225 : [w = weak(), conv, conversationId, peer = std::move(peer), deviceId, commitId = std::move(commitId)](
659 : const auto& channel) {
660 3512 : auto sthis = w.lock();
661 3513 : auto acc = sthis ? sthis->account_.lock() : nullptr;
662 3512 : std::unique_lock lk(conv->mtx);
663 3512 : auto conversation = conv->conversation;
664 3513 : if (!channel || !acc || !conversation) {
665 1462 : conv->stopFetch(deviceId);
666 1462 : if (sthis)
667 1462 : sthis->syncCnt.fetch_sub(1);
668 1462 : return false;
669 : }
670 2050 : conversation->addGitSocket(channel->deviceId(), channel);
671 2051 : lk.unlock();
672 8202 : conversation->sync(
673 2051 : peer,
674 2051 : deviceId,
675 4102 : [w, conv, conversationId = std::move(conversationId), peer, deviceId, commitId](bool ok) {
676 2051 : auto shared = w.lock();
677 2051 : if (!shared)
678 0 : return;
679 2051 : if (!ok) {
680 2656 : JAMI_WARNING("[Account {}] [Conversation {}] Unable to fetch new commit from "
681 : "{}, other peer may be disconnected",
682 : shared->accountId_,
683 : conversationId,
684 : deviceId);
685 2656 : JAMI_LOG("[Account {}] [Conversation {}] Relaunch sync with {}",
686 : shared->accountId_,
687 : conversationId,
688 : deviceId);
689 : }
690 :
691 : {
692 2051 : std::lock_guard lk(conv->mtx);
693 2051 : conv->pending.reset();
694 : // Notify peers that a new commit is there (DRT)
695 2051 : if (not commitId.empty() && ok) {
696 999 : shared->sendMessageNotification(*conv->conversation, false, commitId, deviceId);
697 : }
698 2051 : }
699 4102 : if (shared->syncCnt.fetch_sub(1) == 1) {
700 271 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(shared->accountId_);
701 : }
702 2051 : },
703 2050 : commitId);
704 2051 : return true;
705 3513 : },
706 : "");
707 2294 : } else {
708 43 : if (oldReq != std::nullopt)
709 0 : return;
710 43 : if (conv->pending)
711 28 : return;
712 15 : bool clone = !conv->info.isRemoved();
713 15 : if (clone) {
714 15 : cloneConversation(deviceId, peer, conv);
715 15 : return;
716 : }
717 0 : if (!shouldRequestInvite)
718 0 : return;
719 0 : lk.unlock();
720 0 : JAMI_WARNING("[Account {}] [Conversation {}] Unable to find conversation, asking for an invite",
721 : accountId_,
722 : conversationId);
723 0 : sendMsgCb_(peer, {}, std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}}, 0);
724 : }
725 34179 : }
726 :
727 : // Clone and store conversation
728 : void
729 192 : ConversationModule::Impl::handlePendingConversation(const std::string& conversationId, const std::string& deviceId)
730 : {
731 192 : auto acc = account_.lock();
732 192 : if (!acc)
733 0 : return;
734 192 : std::vector<DeviceId> kd;
735 : {
736 192 : std::unique_lock lk(conversationsMtx_);
737 192 : const auto& devices = accountManager_->getKnownDevices();
738 192 : kd.reserve(devices.size());
739 1424 : for (const auto& [id, _] : devices)
740 1232 : kd.emplace_back(id);
741 192 : }
742 192 : auto conv = getConversation(conversationId);
743 192 : if (!conv)
744 0 : return;
745 192 : std::unique_lock lk(conv->mtx, std::defer_lock);
746 378 : auto erasePending = [&] {
747 378 : std::string toRm;
748 378 : if (conv->pending && !conv->pending->removeId.empty())
749 0 : toRm = std::move(conv->pending->removeId);
750 378 : conv->pending.reset();
751 378 : lk.unlock();
752 378 : if (!toRm.empty())
753 0 : removeConversation(toRm);
754 378 : };
755 : try {
756 192 : auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
757 186 : conversation->onMembersChanged([w = weak_from_this(), conversationId](const auto& members) {
758 : // Delay in another thread to avoid deadlocks
759 2846 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
760 2846 : if (auto sthis = w.lock())
761 1423 : sthis->setConversationMembers(conversationId, members);
762 : });
763 1423 : });
764 186 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
765 512 : auto msg = std::make_shared<SyncMsg>();
766 1024 : msg->ms = {{conversationId, status}};
767 512 : needsSyncingCb_(std::move(msg));
768 512 : });
769 186 : conversation->onNeedSocket(onNeedSwarmSocket_);
770 186 : if (!conversation->isMember(username_, true)) {
771 0 : JAMI_ERROR("[Account {}] [Conversation {}] Conversation cloned but we do not seem to be a valid member",
772 : accountId_,
773 : conversationId);
774 0 : conversation->erase();
775 0 : lk.lock();
776 0 : erasePending();
777 0 : return;
778 : }
779 :
780 : // Make sure that the list of members stored in convInfos_ matches the
781 : // one from the conversation's repository.
782 : // (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1026)
783 186 : setConversationMembers(conversationId, conversation->memberUris("", {}));
784 :
785 186 : lk.lock();
786 186 : if (conv->info.mode != conversation->mode()) {
787 52 : JAMI_ERROR(
788 : "[Account {}] [Conversation {}] Cloned conversation mode is {}, but {} was expected from invite.",
789 : accountId_,
790 : conversationId,
791 : static_cast<int>(conversation->mode()),
792 : static_cast<int>(conv->info.mode));
793 : // TODO: erase conversation after transition period
794 : // conversation->erase();
795 : // erasePending();
796 : // return;
797 13 : conv->info.mode = conversation->mode();
798 13 : addConvInfo(conv->info);
799 : }
800 :
801 186 : if (conv->pending && conv->pending->socket)
802 186 : conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket));
803 186 : auto removeRepo = false;
804 : // Note: a removeContact while cloning. In this case, the conversation
805 : // must not be announced and removed.
806 186 : if (conv->info.isRemoved())
807 0 : removeRepo = true;
808 186 : std::map<std::string, std::string> preferences;
809 186 : std::map<std::string, std::map<std::string, std::string>> status;
810 186 : if (conv->pending) {
811 186 : preferences = std::move(conv->pending->preferences);
812 186 : status = std::move(conv->pending->status);
813 : }
814 186 : conv->conversation = conversation;
815 186 : if (removeRepo) {
816 0 : removeRepositoryImpl(*conv, false, true);
817 0 : erasePending();
818 0 : return;
819 : }
820 :
821 186 : auto commitId = conversation->join();
822 186 : if (!commitId.empty())
823 156 : sendMessageNotification(*conversation, false, commitId);
824 186 : erasePending(); // Will unlock
825 :
826 : #ifdef LIBJAMI_TEST
827 186 : conversation->onBootstrapStatus(bootstrapCbTest_);
828 : #endif
829 186 : auto id = conversation->id();
830 372 : conversation->bootstrap(
831 186 : [w = weak(), id = std::move(id)]() {
832 215 : if (auto sthis = w.lock())
833 215 : sthis->bootstrapCb(id);
834 215 : },
835 : kd);
836 :
837 186 : if (!preferences.empty())
838 1 : conversation->updatePreferences(preferences);
839 186 : if (!status.empty())
840 13 : conversation->updateMessageStatus(status);
841 186 : syncingMetadatas_.erase(conversationId);
842 186 : saveMetadata();
843 :
844 : // Inform user that the conversation is ready
845 186 : emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
846 186 : needsSyncingCb_({});
847 : // Download members profile on first sync
848 186 : auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE;
849 186 : auto askForProfile = isOneOne;
850 186 : if (!isOneOne) {
851 : // If not 1:1 only download profiles from self (to avoid non checked files)
852 127 : auto cert = acc->certStore().getCertificate(deviceId);
853 127 : askForProfile = cert && cert->issuer && cert->issuer->getId().toString() == username_;
854 127 : }
855 186 : if (askForProfile) {
856 129 : for (const auto& member : conversation->memberUris(username_)) {
857 57 : acc->askForProfile(conversationId, deviceId, member);
858 72 : }
859 : }
860 192 : } catch (const std::exception& e) {
861 24 : JAMI_WARNING(
862 : "[Account {}] [Conversation {}] Something went wrong when cloning conversation: {}. Re-clone in {}s",
863 : accountId_,
864 : conversationId,
865 : e.what(),
866 : conv->fallbackTimer.count());
867 6 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
868 6 : conv->fallbackTimer *= 2;
869 6 : if (conv->fallbackTimer > MAX_FALLBACK)
870 0 : conv->fallbackTimer = MAX_FALLBACK;
871 12 : conv->fallbackClone->async_wait(std::bind(&ConversationModule::Impl::fallbackClone,
872 12 : shared_from_this(),
873 : std::placeholders::_1,
874 : conversationId));
875 6 : }
876 192 : lk.lock();
877 192 : erasePending();
878 192 : }
879 :
880 : std::optional<ConversationRequest>
881 13203 : ConversationModule::Impl::getRequest(const std::string& id) const
882 : {
883 : // ConversationsRequestsMtx MUST BE LOCKED
884 13203 : auto it = conversationsRequests_.find(id);
885 13197 : if (it != conversationsRequests_.end())
886 204 : return it->second;
887 12996 : return std::nullopt;
888 : }
889 :
890 : std::string
891 496 : ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
892 : {
893 496 : if (auto details = accountManager_->getContactInfo(uri)) {
894 : // If contact is removed there is no conversation
895 : // If banned, conversation is still on disk
896 279 : if (details->removed != 0 && details->banned == 0) {
897 : // Check if contact is removed
898 23 : if (details->removed > details->added)
899 23 : return {};
900 : }
901 256 : return details->conversationId;
902 496 : }
903 217 : return {};
904 : }
905 :
906 : bool
907 9 : ConversationModule::Impl::updateConvForContact(const std::string& uri,
908 : const std::string& oldConv,
909 : const std::string& newConv)
910 : {
911 9 : if (newConv != oldConv) {
912 9 : auto conversation = getOneToOneConversation(uri);
913 9 : if (conversation != oldConv) {
914 0 : JAMI_DEBUG("[Account {}] [Conversation {}] Old conversation is not found in details {} - found: {}",
915 : accountId_,
916 : newConv,
917 : oldConv,
918 : conversation);
919 0 : return false;
920 : }
921 9 : accountManager_->updateContactConversation(uri, newConv);
922 9 : return true;
923 9 : }
924 0 : return false;
925 : }
926 :
927 : void
928 67 : ConversationModule::Impl::declineOtherConversationWith(const std::string& uri)
929 : {
930 : // conversationsRequestsMtx_ MUST BE LOCKED
931 68 : for (auto& [id, request] : conversationsRequests_) {
932 1 : if (request.declined)
933 0 : continue; // Ignore already declined requests
934 1 : if (request.isOneToOne() && request.from == uri) {
935 4 : JAMI_WARNING("[Account {}] [Conversation {}] Decline conversation request from {}", accountId_, id, uri);
936 1 : request.declined = std::time(nullptr);
937 1 : syncingMetadatas_.erase(id);
938 1 : saveMetadata();
939 1 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(accountId_, id);
940 : }
941 : }
942 67 : }
943 :
944 : std::vector<std::map<std::string, std::string>>
945 105 : ConversationModule::Impl::getConversationMembers(const std::string& conversationId, bool includeBanned) const
946 : {
947 210 : return withConv(conversationId, [&](const auto& conv) { return conv.getMembers(true, includeBanned); });
948 : }
949 :
950 : void
951 13 : ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force)
952 : {
953 13 : auto conv = getConversation(conversationId);
954 13 : if (!conv)
955 0 : return;
956 13 : std::unique_lock lk(conv->mtx);
957 13 : removeRepositoryImpl(*conv, sync, force);
958 13 : }
959 :
960 : void
961 21 : ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force)
962 : {
963 21 : if (conv.conversation && (force || conv.conversation->isRemoving())) {
964 : // Stop fetch!
965 21 : conv.pending.reset();
966 :
967 84 : JAMI_LOG("[Account {}] [Conversation {}] Remove conversation", accountId_, conv.info.id);
968 : try {
969 21 : if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
970 42 : for (const auto& member : conv.conversation->getInitialMembers()) {
971 28 : if (member != username_) {
972 : // Note: this can happen while re-adding a contact.
973 : // In this case, check that we are removing the linked conversation.
974 14 : if (conv.info.id == getOneToOneConversation(member)) {
975 0 : accountManager_->removeContactConversation(member);
976 : }
977 : }
978 14 : }
979 : }
980 0 : } catch (const std::exception& e) {
981 0 : JAMI_ERR() << e.what();
982 0 : }
983 21 : conv.conversation->erase();
984 21 : conv.conversation.reset();
985 :
986 21 : if (!sync)
987 1 : return;
988 :
989 20 : conv.info.erased = std::time(nullptr);
990 20 : needsSyncingCb_({});
991 20 : addConvInfo(conv.info);
992 : }
993 : }
994 :
995 : bool
996 9 : ConversationModule::Impl::removeConversation(const std::string& conversationId, bool forceRemove)
997 : {
998 18 : return withConv(conversationId,
999 27 : [this, forceRemove](auto& conv) { return removeConversationImpl(conv, forceRemove); });
1000 : }
1001 :
1002 : bool
1003 9 : ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv, bool forceRemove)
1004 : {
1005 9 : auto members = conv.getMembers(false, false);
1006 9 : auto isSyncing = !conv.conversation;
1007 9 : auto hasMembers = !isSyncing // If syncing there is no member to inform
1008 8 : && std::find_if(members.begin(),
1009 : members.end(),
1010 8 : [&](const auto& member) { return member.at("uri") == username_; })
1011 16 : != members.end() // We must be still a member
1012 17 : && members.size() != 1; // If there is only ourself
1013 9 : conv.info.removed = std::time(nullptr);
1014 9 : if (isSyncing)
1015 1 : conv.info.erased = std::time(nullptr);
1016 9 : if (conv.fallbackClone)
1017 9 : conv.fallbackClone->cancel();
1018 : // Sync now, because it can take some time to really removes the datas
1019 9 : needsSyncingCb_({});
1020 9 : addConvInfo(conv.info);
1021 9 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id);
1022 9 : if (isSyncing)
1023 1 : return true;
1024 :
1025 8 : if (forceRemove && conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
1026 : // skip waiting for sync
1027 1 : removeRepositoryImpl(conv, true);
1028 1 : return true;
1029 : }
1030 :
1031 7 : auto commitId = conv.conversation->leave();
1032 7 : if (hasMembers) {
1033 8 : JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id);
1034 : // Commit that we left
1035 2 : if (!commitId.empty()) {
1036 : // Do not sync as it's synched by convInfos
1037 2 : sendMessageNotification(*conv.conversation, false, commitId);
1038 : } else {
1039 0 : JAMI_ERROR("Failed to send message to conversation {}", conv.info.id);
1040 : }
1041 : // In this case, we wait that another peer sync the conversation
1042 : // to definitely remove it from the device. This is to inform the
1043 : // peer that we left the conversation and never want to receive
1044 : // any messages
1045 2 : return true;
1046 : }
1047 :
1048 : // Else we are the last member, so we can remove
1049 5 : removeRepositoryImpl(conv, true);
1050 5 : return true;
1051 9 : }
1052 :
1053 : void
1054 489 : ConversationModule::Impl::sendMessageNotification(const std::string& conversationId,
1055 : bool sync,
1056 : const std::string& commitId,
1057 : const std::string& deviceId)
1058 : {
1059 489 : if (auto conv = getConversation(conversationId)) {
1060 489 : std::lock_guard lk(conv->mtx);
1061 490 : if (conv->conversation)
1062 489 : sendMessageNotification(*conv->conversation, sync, commitId, deviceId);
1063 980 : }
1064 490 : }
1065 :
1066 : void
1067 1784 : ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
1068 : bool sync,
1069 : const std::string& commitId,
1070 : const std::string& deviceId)
1071 : {
1072 1784 : auto acc = account_.lock();
1073 1784 : if (!acc)
1074 0 : return;
1075 1784 : auto commit = commitId == "" ? conversation.lastCommitId() : commitId;
1076 1784 : Json::Value message;
1077 1784 : message["id"] = conversation.id();
1078 1783 : message["commit"] = commit;
1079 1784 : message["deviceId"] = deviceId_;
1080 1783 : const auto text = json::toString(message);
1081 :
1082 : // Send message notification will announce the new commit in 3 steps.
1083 5352 : const auto messageMap = std::map<std::string, std::string> {{MIME_TYPE_GIT, text}};
1084 :
1085 : // First, because our account can have several devices, announce to other devices
1086 1784 : if (sync) {
1087 : // Announce to our devices
1088 627 : std::lock_guard lk(refreshMtx_);
1089 627 : auto& refresh = refreshMessage[username_];
1090 627 : refresh = sendMsgCb_(username_, {}, messageMap, refresh);
1091 627 : }
1092 :
1093 : // Then, we announce to 2 random members in the conversation that aren't in the DRT
1094 : // This allow new devices without the ability to sync to their other devices to sync with us.
1095 : // Or they can also use an old backup.
1096 1784 : std::vector<std::string> nonConnectedMembers;
1097 1784 : std::vector<NodeId> devices;
1098 : {
1099 1784 : std::lock_guard lk(notSyncedNotificationMtx_);
1100 1784 : devices = conversation.peersToSyncWith();
1101 3568 : auto members = conversation.memberUris(username_, {MemberRole::BANNED});
1102 1784 : std::vector<std::string> connectedMembers;
1103 : // print all members
1104 14078 : for (const auto& device : devices) {
1105 12294 : auto cert = acc->certStore().getCertificate(device.toString());
1106 12285 : if (cert && cert->issuer)
1107 12286 : connectedMembers.emplace_back(cert->issuer->getId().toString());
1108 12288 : }
1109 1778 : std::sort(std::begin(connectedMembers), std::end(connectedMembers));
1110 1784 : std::set_difference(members.begin(),
1111 : members.end(),
1112 : connectedMembers.begin(),
1113 : connectedMembers.end(),
1114 : std::inserter(nonConnectedMembers, nonConnectedMembers.begin()));
1115 1784 : std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand);
1116 1784 : if (nonConnectedMembers.size() > 2)
1117 445 : nonConnectedMembers.resize(2);
1118 1784 : if (!conversation.isBootstrapped()) {
1119 1344 : JAMI_DEBUG("[Conversation {}] Not yet bootstrapped, save notification", conversation.id());
1120 : // Because we can get some git channels but not bootstrapped, we should keep this
1121 : // to refresh when bootstrapped.
1122 336 : notSyncedNotification_[conversation.id()] = commit;
1123 : }
1124 1784 : }
1125 :
1126 1784 : std::lock_guard lk(refreshMtx_);
1127 3454 : for (const auto& member : nonConnectedMembers) {
1128 1671 : auto& refresh = refreshMessage[member];
1129 1671 : refresh = sendMsgCb_(member, {}, messageMap, refresh);
1130 : }
1131 :
1132 : // Finally we send to devices that the DRT choose.
1133 14065 : for (const auto& device : devices) {
1134 12290 : auto deviceIdStr = device.toString();
1135 12285 : auto memberUri = conversation.uriFromDevice(deviceIdStr);
1136 12282 : if (memberUri.empty() || deviceIdStr == deviceId)
1137 992 : continue;
1138 11300 : auto& refresh = refreshMessage[deviceIdStr];
1139 11303 : refresh = sendMsgCb_(memberUri, device, messageMap, refresh);
1140 13287 : }
1141 1782 : }
1142 :
1143 : void
1144 91 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1145 : std::string message,
1146 : const std::string& replyTo,
1147 : const std::string& type,
1148 : bool announce,
1149 : OnCommitCb&& onCommit,
1150 : OnDoneCb&& cb)
1151 : {
1152 91 : Json::Value json;
1153 91 : json["body"] = std::move(message);
1154 91 : json["type"] = type;
1155 91 : sendMessage(conversationId, std::move(json), replyTo, announce, std::move(onCommit), std::move(cb));
1156 91 : }
1157 :
1158 : void
1159 117 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1160 : Json::Value&& value,
1161 : const std::string& replyTo,
1162 : bool announce,
1163 : OnCommitCb&& onCommit,
1164 : OnDoneCb&& cb)
1165 : {
1166 117 : if (auto conv = getConversation(conversationId)) {
1167 117 : std::lock_guard lk(conv->mtx);
1168 117 : if (conv->conversation)
1169 351 : conv->conversation->sendMessage(std::move(value),
1170 : replyTo,
1171 117 : std::move(onCommit),
1172 116 : [this,
1173 : conversationId,
1174 : announce,
1175 117 : cb = std::move(cb)](bool ok, const std::string& commitId) {
1176 116 : if (cb)
1177 5 : cb(ok, commitId);
1178 116 : if (!announce)
1179 0 : return;
1180 116 : if (ok)
1181 115 : sendMessageNotification(conversationId, true, commitId);
1182 : else
1183 1 : JAMI_ERR("Failed to send message to conversation %s",
1184 : conversationId.c_str());
1185 : });
1186 234 : }
1187 117 : }
1188 :
1189 : void
1190 8 : ConversationModule::Impl::editMessage(const std::string& conversationId,
1191 : const std::string& newBody,
1192 : const std::string& editedId)
1193 : {
1194 : // Check that editedId is a valid commit, from ourself and plain/text
1195 8 : auto validCommit = false;
1196 8 : std::string type, fileId;
1197 8 : if (auto conv = getConversation(conversationId)) {
1198 8 : std::lock_guard lk(conv->mtx);
1199 8 : if (conv->conversation) {
1200 8 : auto commit = conv->conversation->getCommit(editedId);
1201 8 : if (commit != std::nullopt) {
1202 7 : type = commit->at("type");
1203 7 : if (type == "application/data-transfer+json") {
1204 2 : fileId = getFileId(editedId, commit->at("tid"), commit->at("displayName"));
1205 : }
1206 21 : validCommit = commit->at("author") == username_
1207 21 : && (type == "text/plain" || type == "application/data-transfer+json");
1208 : }
1209 8 : }
1210 16 : }
1211 8 : if (!validCommit) {
1212 8 : JAMI_ERROR("Unable to edit commit {:s}", editedId);
1213 2 : return;
1214 : }
1215 : // Commit message edition
1216 6 : Json::Value json;
1217 6 : if (type == "application/data-transfer+json") {
1218 2 : json["tid"] = "";
1219 : // Remove file!
1220 4 : auto path = fileutils::get_data_dir() / accountId_ / "conversation_data" / conversationId / fileId;
1221 2 : dhtnet::fileutils::remove(path, true);
1222 2 : } else {
1223 4 : json["body"] = newBody;
1224 : }
1225 6 : json["edit"] = editedId;
1226 6 : json["type"] = type;
1227 6 : sendMessage(conversationId, std::move(json));
1228 10 : }
1229 :
1230 : void
1231 331 : ConversationModule::Impl::bootstrapCb(std::string convId)
1232 : {
1233 331 : std::string commitId;
1234 : {
1235 331 : std::lock_guard lk(notSyncedNotificationMtx_);
1236 331 : auto it = notSyncedNotification_.find(convId);
1237 331 : if (it != notSyncedNotification_.end()) {
1238 215 : commitId = it->second;
1239 215 : notSyncedNotification_.erase(it);
1240 : }
1241 331 : }
1242 1324 : JAMI_DEBUG("[Account {}] [Conversation {}] Resend last message notification", accountId_, convId);
1243 331 : dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] {
1244 331 : if (auto sthis = w.lock())
1245 331 : sthis->sendMessageNotification(convId, true, commitId);
1246 331 : });
1247 331 : }
1248 :
1249 : void
1250 680 : ConversationModule::Impl::fixStructures(
1251 : std::shared_ptr<JamiAccount> acc,
1252 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
1253 : const std::set<std::string>& toRm)
1254 : {
1255 681 : for (const auto& [uri, oldConv, newConv] : updateContactConv) {
1256 1 : updateConvForContact(uri, oldConv, newConv);
1257 : }
1258 : ////////////////////////////////////////////////////////////////
1259 : // Note: This is only to homogenize trust and convRequests
1260 680 : std::vector<std::string> invalidPendingRequests;
1261 : {
1262 680 : auto requests = acc->getTrustRequests();
1263 680 : std::lock_guard lk(conversationsRequestsMtx_);
1264 681 : for (const auto& request : requests) {
1265 1 : auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID);
1266 1 : auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM);
1267 1 : if (itConvId != request.end() && itConvFrom != request.end()) {
1268 : // Check if requests exists or is declined.
1269 1 : auto itReq = conversationsRequests_.find(itConvId->second);
1270 1 : auto declined = itReq == conversationsRequests_.end() || itReq->second.declined;
1271 1 : if (declined) {
1272 4 : JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second);
1273 1 : invalidPendingRequests.emplace_back(itConvFrom->second);
1274 : }
1275 : }
1276 : }
1277 680 : auto requestRemoved = false;
1278 682 : for (auto it = conversationsRequests_.begin(); it != conversationsRequests_.end();) {
1279 2 : if (it->second.from == username_) {
1280 0 : JAMI_WARNING("Detected request from ourself, this makes no sense. Remove {}", it->first);
1281 0 : it = conversationsRequests_.erase(it);
1282 : } else {
1283 2 : ++it;
1284 : }
1285 : }
1286 680 : if (requestRemoved) {
1287 0 : saveConvRequests();
1288 : }
1289 680 : }
1290 681 : for (const auto& invalidPendingRequest : invalidPendingRequests)
1291 1 : acc->discardTrustRequest(invalidPendingRequest);
1292 :
1293 : ////////////////////////////////////////////////////////////////
1294 681 : for (const auto& conv : toRm) {
1295 4 : JAMI_ERROR("[Account {}] Remove conversation ({})", accountId_, conv);
1296 1 : removeConversation(conv, true);
1297 : }
1298 2720 : JAMI_DEBUG("[Account {}] Conversations loaded!", accountId_);
1299 680 : }
1300 :
1301 : void
1302 182 : ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
1303 : const std::string& deviceId,
1304 : const std::string& oldConvId)
1305 : {
1306 182 : std::lock_guard lk(conv->mtx);
1307 182 : const auto& conversationId = conv->info.id;
1308 182 : if (!conv->startFetch(deviceId, true)) {
1309 76 : JAMI_WARNING("[Account {}] [Conversation {}] Already fetching", accountId_, conversationId);
1310 19 : return;
1311 : }
1312 :
1313 163 : onNeedSocket_(
1314 : conversationId,
1315 : deviceId,
1316 163 : [wthis = weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) {
1317 163 : std::lock_guard lk(conv->mtx);
1318 163 : if (conv->pending && !conv->pending->ready) {
1319 158 : conv->pending->removeId = oldConvId;
1320 158 : if (channel) {
1321 151 : conv->pending->ready = true;
1322 151 : conv->pending->deviceId = channel->deviceId().toString();
1323 151 : conv->pending->socket = channel;
1324 151 : if (!conv->pending->cloning) {
1325 151 : conv->pending->cloning = true;
1326 302 : dht::ThreadPool::io().run([wthis, conversationId, deviceId = conv->pending->deviceId]() {
1327 302 : if (auto sthis = wthis.lock())
1328 151 : sthis->handlePendingConversation(conversationId, deviceId);
1329 : });
1330 : }
1331 151 : return true;
1332 14 : } else if (auto sthis = wthis.lock()) {
1333 7 : conv->stopFetch(deviceId);
1334 28 : JAMI_WARNING("[Account {}] [Conversation {}] [device {}] Clone failed. Re-clone in {}s",
1335 : sthis->accountId_,
1336 : conversationId,
1337 : deviceId,
1338 : conv->fallbackTimer.count());
1339 7 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
1340 7 : conv->fallbackTimer *= 2;
1341 7 : if (conv->fallbackTimer > MAX_FALLBACK)
1342 0 : conv->fallbackTimer = MAX_FALLBACK;
1343 14 : conv->fallbackClone->async_wait(std::bind(&ConversationModule::Impl::fallbackClone,
1344 : sthis,
1345 : std::placeholders::_1,
1346 7 : conversationId));
1347 : }
1348 : }
1349 12 : return false;
1350 163 : },
1351 : MIME_TYPE_GIT);
1352 182 : }
1353 :
1354 : void
1355 11 : ConversationModule::Impl::fallbackClone(const asio::error_code& ec, const std::string& conversationId)
1356 : {
1357 11 : if (ec == asio::error::operation_aborted)
1358 1 : return;
1359 10 : auto conv = getConversation(conversationId);
1360 10 : if (!conv || conv->conversation)
1361 0 : return;
1362 10 : auto members = getConversationMembers(conversationId);
1363 30 : for (const auto& member : members)
1364 20 : if (member.at("uri") != username_)
1365 10 : cloneConversationFrom(conversationId, member.at("uri"));
1366 10 : }
1367 :
1368 : void
1369 832 : ConversationModule::Impl::bootstrap(const std::string& convId)
1370 : {
1371 832 : std::vector<DeviceId> kd;
1372 : {
1373 832 : std::unique_lock lk(conversationsMtx_);
1374 832 : const auto& devices = accountManager_->getKnownDevices();
1375 832 : kd.reserve(devices.size());
1376 2728 : for (const auto& [id, _] : devices)
1377 1896 : kd.emplace_back(id);
1378 832 : }
1379 832 : std::vector<std::string> toClone;
1380 832 : std::vector<std::shared_ptr<Conversation>> conversations;
1381 832 : if (convId.empty()) {
1382 671 : std::vector<std::shared_ptr<SyncedConversation>> convs;
1383 : {
1384 671 : std::lock_guard lk(convInfosMtx_);
1385 710 : for (const auto& [conversationId, convInfo] : convInfos_) {
1386 39 : if (auto conv = getConversation(conversationId))
1387 39 : convs.emplace_back(std::move(conv));
1388 : }
1389 671 : }
1390 710 : for (const auto& conv : convs) {
1391 39 : std::lock_guard lk(conv->mtx);
1392 39 : if (!conv->conversation && !conv->info.isRemoved()) {
1393 : // we need to ask to clone requests when bootstrapping all conversations
1394 : // otherwise it can stay syncing
1395 2 : toClone.emplace_back(conv->info.id);
1396 37 : } else if (conv->conversation) {
1397 35 : conversations.emplace_back(conv->conversation);
1398 : }
1399 39 : }
1400 832 : } else if (auto conv = getConversation(convId)) {
1401 106 : std::lock_guard lk(conv->mtx);
1402 106 : if (conv->conversation)
1403 106 : conversations.emplace_back(conv->conversation);
1404 267 : }
1405 :
1406 973 : for (const auto& conversation : conversations) {
1407 : #ifdef LIBJAMI_TEST
1408 141 : conversation->onBootstrapStatus(bootstrapCbTest_);
1409 : #endif
1410 141 : conversation->bootstrap(
1411 39 : [w = weak(), id = conversation->id()] {
1412 39 : if (auto sthis = w.lock())
1413 39 : sthis->bootstrapCb(id);
1414 39 : },
1415 : kd);
1416 : }
1417 834 : for (const auto& cid : toClone) {
1418 2 : auto members = getConversationMembers(cid);
1419 6 : for (const auto& member : members) {
1420 4 : if (member.at("uri") != username_)
1421 2 : cloneConversationFrom(cid, member.at("uri"));
1422 : }
1423 2 : }
1424 832 : }
1425 :
1426 : void
1427 176 : ConversationModule::Impl::cloneConversationFrom(const ConversationRequest& request)
1428 : {
1429 176 : auto memberHash = dht::InfoHash(request.from);
1430 176 : if (!memberHash) {
1431 0 : JAMI_WARNING("Invalid member detected: {}", request.from);
1432 0 : return;
1433 : }
1434 176 : auto conv = startConversation(request.conversationId);
1435 176 : std::lock_guard lk(conv->mtx);
1436 176 : if (conv->info.created == 0) {
1437 173 : conv->info = {request.conversationId};
1438 173 : conv->info.created = request.received;
1439 173 : conv->info.members.emplace(username_);
1440 173 : conv->info.members.emplace(request.from);
1441 173 : conv->info.mode = request.mode();
1442 173 : addConvInfo(conv->info);
1443 : }
1444 176 : accountManager_->forEachDevice(memberHash, [w = weak(), conv](const auto& pk) {
1445 176 : auto sthis = w.lock();
1446 176 : auto deviceId = pk->getLongId().toString();
1447 176 : if (!sthis or deviceId == sthis->deviceId_)
1448 0 : return;
1449 176 : sthis->cloneConversationFrom(conv, deviceId);
1450 176 : });
1451 176 : }
1452 :
1453 : void
1454 12 : ConversationModule::Impl::cloneConversationFrom(const std::string& conversationId,
1455 : const std::string& uri,
1456 : const std::string& oldConvId)
1457 : {
1458 12 : auto memberHash = dht::InfoHash(uri);
1459 12 : if (!memberHash) {
1460 0 : JAMI_WARNING("Invalid member detected: {}", uri);
1461 0 : return;
1462 : }
1463 12 : auto conv = startConversation(conversationId);
1464 12 : accountManager_->forEachDevice(memberHash,
1465 3 : [w = weak(), conv, conversationId, oldConvId](
1466 : const std::shared_ptr<dht::crypto::PublicKey>& pk) {
1467 3 : auto sthis = w.lock();
1468 3 : auto deviceId = pk->getLongId().toString();
1469 3 : if (!sthis or deviceId == sthis->deviceId_)
1470 0 : return;
1471 3 : sthis->cloneConversationFrom(conv, deviceId, oldConvId);
1472 3 : });
1473 12 : }
1474 :
1475 : ////////////////////////////////////////////////////////////////
1476 :
1477 : void
1478 498 : ConversationModule::saveConvRequests(const std::string& accountId,
1479 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1480 : {
1481 498 : auto path = fileutils::get_data_dir() / accountId;
1482 498 : saveConvRequestsToPath(path, conversationsRequests);
1483 498 : }
1484 :
1485 : void
1486 1271 : ConversationModule::saveConvRequestsToPath(const std::filesystem::path& path,
1487 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1488 : {
1489 1271 : auto p = path / "convRequests";
1490 1271 : std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
1491 1271 : std::ofstream file(p, std::ios::trunc | std::ios::binary);
1492 1271 : msgpack::pack(file, conversationsRequests);
1493 1271 : }
1494 :
1495 : void
1496 3357 : ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations)
1497 : {
1498 3357 : auto path = fileutils::get_data_dir() / accountId;
1499 3358 : saveConvInfosToPath(path, conversations);
1500 3358 : }
1501 :
1502 : void
1503 4130 : ConversationModule::saveConvInfosToPath(const std::filesystem::path& path, const ConvInfoMap& conversations)
1504 : {
1505 8260 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
1506 8259 : std::ofstream file(path / "convInfo", std::ios::trunc | std::ios::binary);
1507 4131 : msgpack::pack(file, conversations);
1508 4131 : }
1509 :
1510 : ////////////////////////////////////////////////////////////////
1511 :
1512 668 : ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account,
1513 : std::shared_ptr<AccountManager> accountManager,
1514 : NeedsSyncingCb&& needsSyncingCb,
1515 : SengMsgCb&& sendMsgCb,
1516 : NeedSocketCb&& onNeedSocket,
1517 : NeedSocketCb&& onNeedSwarmSocket,
1518 : OneToOneRecvCb&& oneToOneRecvCb,
1519 668 : bool autoLoadConversations)
1520 668 : : pimpl_ {std::make_unique<Impl>(std::move(account),
1521 668 : std::move(accountManager),
1522 668 : std::move(needsSyncingCb),
1523 668 : std::move(sendMsgCb),
1524 668 : std::move(onNeedSocket),
1525 668 : std::move(onNeedSwarmSocket),
1526 668 : std::move(oneToOneRecvCb))}
1527 : {
1528 668 : if (autoLoadConversations) {
1529 668 : loadConversations();
1530 : }
1531 668 : }
1532 :
1533 : void
1534 16 : ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager)
1535 : {
1536 16 : std::unique_lock lk(pimpl_->conversationsMtx_);
1537 16 : pimpl_->accountManager_ = accountManager;
1538 16 : }
1539 :
1540 : #ifdef LIBJAMI_TEST
1541 : void
1542 18 : ConversationModule::onBootstrapStatus(const std::function<void(std::string, Conversation::BootstrapStatus)>& cb)
1543 : {
1544 18 : pimpl_->bootstrapCbTest_ = cb;
1545 19 : for (auto& c : pimpl_->getConversations())
1546 19 : c->onBootstrapStatus(pimpl_->bootstrapCbTest_);
1547 18 : }
1548 : #endif
1549 :
1550 : void
1551 680 : ConversationModule::loadConversations()
1552 : {
1553 680 : auto acc = pimpl_->account_.lock();
1554 680 : if (!acc)
1555 0 : return;
1556 2720 : JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
1557 1360 : auto conversationPath = fileutils::get_data_dir() / pimpl_->accountId_ / "conversations";
1558 :
1559 680 : std::unique_lock lk(pimpl_->conversationsMtx_);
1560 680 : auto contacts = pimpl_->accountManager_->getContacts(
1561 680 : true); // Avoid to lock configurationMtx while conv Mtx is locked
1562 680 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1563 680 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1564 680 : pimpl_->conversations_.clear();
1565 :
1566 : struct Ctx
1567 : {
1568 : std::mutex cvMtx;
1569 : std::condition_variable cv;
1570 : std::mutex toRmMtx;
1571 : std::set<std::string> toRm;
1572 : std::mutex convMtx;
1573 : size_t convNb;
1574 : std::map<dht::InfoHash, Contact> contacts;
1575 : std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
1576 : };
1577 : struct PendingConvCounter
1578 : {
1579 : std::shared_ptr<Ctx> ctx;
1580 18 : PendingConvCounter(std::shared_ptr<Ctx> c)
1581 18 : : ctx(std::move(c))
1582 : {
1583 18 : std::lock_guard lk {ctx->cvMtx};
1584 18 : ++ctx->convNb;
1585 18 : }
1586 18 : ~PendingConvCounter()
1587 : {
1588 18 : std::lock_guard lk {ctx->cvMtx};
1589 18 : --ctx->convNb;
1590 18 : ctx->cv.notify_all();
1591 18 : }
1592 : };
1593 680 : auto ctx = std::make_shared<Ctx>();
1594 680 : ctx->convNb = 0;
1595 680 : ctx->contacts = std::move(contacts);
1596 :
1597 680 : std::error_code ec;
1598 2058 : for (const auto& convIt : std::filesystem::directory_iterator(conversationPath, ec)) {
1599 : // ignore if not regular file or hidden
1600 18 : auto name = convIt.path().filename().string();
1601 18 : if (!convIt.is_directory() || name[0] == '.')
1602 0 : continue;
1603 36 : dht::ThreadPool::io().run(
1604 18 : [this, ctx, repository = std::move(name), acc, _ = std::make_shared<PendingConvCounter>(ctx)] {
1605 : try {
1606 18 : auto sconv = std::make_shared<SyncedConversation>(repository);
1607 18 : auto conv = std::make_shared<Conversation>(acc, repository);
1608 18 : conv->onMessageStatusChanged([this, repository](const auto& status) {
1609 6 : auto msg = std::make_shared<SyncMsg>();
1610 12 : msg->ms = {{repository, status}};
1611 6 : pimpl_->needsSyncingCb_(std::move(msg));
1612 6 : });
1613 18 : conv->onMembersChanged([w = pimpl_->weak_from_this(), repository](const auto& members) {
1614 : // Delay in another thread to avoid deadlocks
1615 8 : dht::ThreadPool::io().run([w, repository, members = std::move(members)] {
1616 8 : if (auto sthis = w.lock())
1617 4 : sthis->setConversationMembers(repository, members);
1618 : });
1619 4 : });
1620 18 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1621 18 : auto members = conv->memberUris(acc->getUsername(), {});
1622 : // NOTE: The following if is here to protect against any incorrect state
1623 : // that can be introduced
1624 18 : if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
1625 : // If we got a 1:1 conversation, but not in the contact details, it's rather a
1626 : // duplicate or a weird state
1627 5 : auto otherUri = *members.begin();
1628 5 : auto itContact = ctx->contacts.find(dht::InfoHash(otherUri));
1629 5 : if (itContact == ctx->contacts.end()) {
1630 0 : JAMI_WARNING("Contact {} not found", otherUri);
1631 0 : return;
1632 : }
1633 5 : const std::string& convFromDetails = itContact->second.conversationId;
1634 5 : auto isRemoved = !itContact->second.isActive();
1635 5 : if (convFromDetails != repository) {
1636 3 : if (convFromDetails.empty()) {
1637 2 : if (isRemoved) {
1638 : // If details is empty, contact is removed and not banned.
1639 4 : JAMI_ERROR("Conversation {} detected for {} and should be removed",
1640 : repository,
1641 : otherUri);
1642 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1643 1 : ctx->toRm.insert(repository);
1644 1 : } else {
1645 4 : JAMI_ERROR("No conversation detected for {} but one exists ({}). Update details",
1646 : otherUri,
1647 : repository);
1648 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1649 2 : ctx->updateContactConv.emplace_back(
1650 2 : std::make_tuple(otherUri, convFromDetails, repository));
1651 1 : }
1652 : }
1653 : }
1654 5 : }
1655 : {
1656 18 : std::lock_guard lkMtx {ctx->convMtx};
1657 18 : auto convInfo = pimpl_->convInfos_.find(repository);
1658 18 : if (convInfo == pimpl_->convInfos_.end()) {
1659 12 : JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
1660 3 : sconv->info.created = std::time(nullptr);
1661 3 : sconv->info.lastDisplayed = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
1662 : } else {
1663 15 : sconv->info = convInfo->second;
1664 15 : if (convInfo->second.isRemoved()) {
1665 : // A conversation was removed, but repository still exists
1666 1 : conv->setRemovingFlag();
1667 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1668 1 : ctx->toRm.insert(repository);
1669 1 : }
1670 : }
1671 : // Even if we found the conversation in convInfos_, unable to assume that the
1672 : // list of members stored in `convInfo` is correct
1673 : // (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1025). For this
1674 : // reason, we always use the list we got from the conversation repository to set
1675 : // the value of `sconv->info.members`.
1676 18 : members.emplace(acc->getUsername());
1677 18 : sconv->info.members = std::move(members);
1678 : // convInfosMtx_ is already locked
1679 18 : pimpl_->convInfos_[repository] = sconv->info;
1680 18 : }
1681 18 : auto commits = conv->commitsEndedCalls();
1682 :
1683 18 : if (!commits.empty()) {
1684 : // Note: here, this means that some calls were actives while the
1685 : // daemon finished (can be a crash).
1686 : // Notify other in the conversation that the call is finished
1687 0 : pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
1688 : }
1689 18 : sconv->conversation = conv;
1690 18 : std::lock_guard lkMtx {ctx->convMtx};
1691 18 : pimpl_->conversations_.emplace(repository, std::move(sconv));
1692 18 : } catch (const std::logic_error& e) {
1693 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
1694 0 : }
1695 : });
1696 698 : }
1697 680 : if (ec) {
1698 2652 : JAMI_ERROR("Failed to read conversations directory {}: {}", conversationPath, ec.message());
1699 : }
1700 :
1701 680 : std::unique_lock lkCv(ctx->cvMtx);
1702 1378 : ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; });
1703 :
1704 : // Prune any invalid conversations without members and
1705 : // set the removed flag if needed
1706 680 : std::set<std::string> removed;
1707 712 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1708 32 : const auto& info = itInfo->second;
1709 32 : if (info.members.empty()) {
1710 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1711 0 : continue;
1712 : }
1713 32 : if (info.isRemoved())
1714 2 : removed.insert(info.id);
1715 32 : auto itConv = pimpl_->conversations_.find(info.id);
1716 32 : if (itConv == pimpl_->conversations_.end()) {
1717 : // convInfos_ can contain a conversation that is not yet cloned
1718 : // so we need to add it there.
1719 14 : itConv = pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info)).first;
1720 : }
1721 32 : if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation && info.isRemoved())
1722 1 : itConv->second->conversation->setRemovingFlag();
1723 32 : if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
1724 : // In this case, the conversation is not synced and we only know ourself
1725 0 : if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
1726 0 : JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.", pimpl_->accountId_, info.id);
1727 0 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, info.id);
1728 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1729 0 : continue;
1730 0 : }
1731 : }
1732 32 : ++itInfo;
1733 : }
1734 : // On oldest version, removeConversation didn't update "appdata/contacts"
1735 : // causing a potential incorrect state between "appdata/contacts" and "appdata/convInfos"
1736 680 : if (!removed.empty())
1737 2 : acc->unlinkConversations(removed);
1738 :
1739 687 : for (const auto& [contactId, contact] : ctx->contacts) {
1740 7 : if (contact.conversationId.empty())
1741 7 : continue;
1742 :
1743 5 : if (pimpl_->convInfos_.find(contact.conversationId) != pimpl_->convInfos_.end())
1744 5 : continue;
1745 :
1746 0 : ConvInfo newInfo;
1747 0 : newInfo.id = contact.conversationId;
1748 0 : newInfo.created = std::time(nullptr);
1749 0 : newInfo.members.emplace(pimpl_->username_);
1750 0 : newInfo.members.emplace(contactId.toString());
1751 0 : pimpl_->conversations_.emplace(contact.conversationId, std::make_shared<SyncedConversation>(newInfo));
1752 0 : pimpl_->convInfos_.emplace(contact.conversationId, std::move(newInfo));
1753 0 : }
1754 :
1755 680 : pimpl_->saveConvInfos();
1756 :
1757 680 : ilk.unlock();
1758 680 : lk.unlock();
1759 :
1760 2720 : dht::ThreadPool::io().run(
1761 2040 : [w = pimpl_->weak(), acc, updateContactConv = std::move(ctx->updateContactConv), toRm = std::move(ctx->toRm)]() {
1762 : // Will lock account manager
1763 680 : if (auto shared = w.lock())
1764 680 : shared->fixStructures(acc, updateContactConv, toRm);
1765 680 : });
1766 680 : }
1767 :
1768 : void
1769 0 : ConversationModule::loadSingleConversation(const std::string& convId)
1770 : {
1771 0 : auto acc = pimpl_->account_.lock();
1772 0 : if (!acc)
1773 0 : return;
1774 0 : JAMI_LOG("[Account {}] Start loading conversation {}", pimpl_->accountId_, convId);
1775 :
1776 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
1777 0 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1778 : // Load convInfos to retrieve requests that have been accepted but not yet synchronized.
1779 0 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1780 0 : pimpl_->conversations_.clear();
1781 :
1782 : try {
1783 0 : auto sconv = std::make_shared<SyncedConversation>(convId);
1784 :
1785 0 : auto conv = std::make_shared<Conversation>(acc, convId);
1786 :
1787 0 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1788 :
1789 0 : sconv->conversation = conv;
1790 0 : pimpl_->conversations_.emplace(convId, std::move(sconv));
1791 0 : } catch (const std::logic_error& e) {
1792 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
1793 0 : }
1794 :
1795 : // Add all other conversations as dummy conversations to indicate their existence so
1796 : // isConversation could detect conversations correctly.
1797 0 : auto conversationsRepositoryIds = dhtnet::fileutils::readDirectory(fileutils::get_data_dir() / pimpl_->accountId_
1798 0 : / "conversations");
1799 0 : for (auto repositoryId : conversationsRepositoryIds) {
1800 0 : if (repositoryId != convId) {
1801 0 : auto conv = std::make_shared<SyncedConversation>(repositoryId);
1802 0 : pimpl_->conversations_.emplace(repositoryId, conv);
1803 0 : }
1804 0 : }
1805 :
1806 : // Add conversations from convInfos_ so isConversation could detect conversations correctly.
1807 : // This includes conversations that have been accepted but are not yet synchronized.
1808 0 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1809 0 : const auto& info = itInfo->second;
1810 0 : if (info.members.empty()) {
1811 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1812 0 : continue;
1813 : }
1814 0 : auto itConv = pimpl_->conversations_.find(info.id);
1815 0 : if (itConv == pimpl_->conversations_.end()) {
1816 : // convInfos_ can contain a conversation that is not yet cloned
1817 : // so we need to add it there.
1818 0 : pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info));
1819 : }
1820 0 : ++itInfo;
1821 : }
1822 :
1823 0 : ilk.unlock();
1824 0 : lk.unlock();
1825 0 : }
1826 :
1827 : void
1828 832 : ConversationModule::bootstrap(const std::string& convId)
1829 : {
1830 832 : pimpl_->bootstrap(convId);
1831 832 : }
1832 :
1833 : void
1834 0 : ConversationModule::monitor()
1835 : {
1836 0 : for (auto& conv : pimpl_->getConversations())
1837 0 : conv->monitor();
1838 0 : }
1839 :
1840 : void
1841 692 : ConversationModule::clearPendingFetch()
1842 : {
1843 : // Note: This is a workaround. convModule() is kept if account is disabled/re-enabled.
1844 : // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch
1845 : // is not erased (callback not called), it will block the new messages as it will not
1846 : // sync. The best way to debug this is to get logs from the last ICE connection for
1847 : // syncing the conversation. It may have been killed in some un-expected way avoiding to
1848 : // call the callbacks. This should never happen, but if it's the case, this will allow
1849 : // new messages to be synced correctly.
1850 721 : for (auto& conv : pimpl_->getSyncedConversations()) {
1851 29 : std::lock_guard lk(conv->mtx);
1852 29 : if (conv && conv->pending) {
1853 0 : JAMI_ERR("This is a bug, seems to still fetch to some device on initializing");
1854 0 : conv->pending.reset();
1855 : }
1856 721 : }
1857 692 : }
1858 :
1859 : void
1860 0 : ConversationModule::reloadRequests()
1861 : {
1862 0 : pimpl_->conversationsRequests_ = convRequests(pimpl_->accountId_);
1863 0 : }
1864 :
1865 : std::vector<std::string>
1866 12 : ConversationModule::getConversations() const
1867 : {
1868 12 : std::vector<std::string> result;
1869 12 : std::lock_guard lk(pimpl_->convInfosMtx_);
1870 12 : result.reserve(pimpl_->convInfos_.size());
1871 25 : for (const auto& [key, conv] : pimpl_->convInfos_) {
1872 13 : if (conv.isRemoved())
1873 3 : continue;
1874 10 : result.emplace_back(key);
1875 : }
1876 24 : return result;
1877 12 : }
1878 :
1879 : std::string
1880 473 : ConversationModule::getOneToOneConversation(const std::string& uri) const noexcept
1881 : {
1882 473 : return pimpl_->getOneToOneConversation(uri);
1883 : }
1884 :
1885 : bool
1886 8 : ConversationModule::updateConvForContact(const std::string& uri, const std::string& oldConv, const std::string& newConv)
1887 : {
1888 8 : return pimpl_->updateConvForContact(uri, oldConv, newConv);
1889 : }
1890 :
1891 : std::vector<std::map<std::string, std::string>>
1892 12 : ConversationModule::getConversationRequests() const
1893 : {
1894 12 : std::vector<std::map<std::string, std::string>> requests;
1895 12 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1896 12 : requests.reserve(pimpl_->conversationsRequests_.size());
1897 24 : for (const auto& [id, request] : pimpl_->conversationsRequests_) {
1898 12 : if (request.declined)
1899 6 : continue; // Do not add declined requests
1900 6 : requests.emplace_back(request.toMap());
1901 : }
1902 24 : return requests;
1903 12 : }
1904 :
1905 : void
1906 76 : ConversationModule::onTrustRequest(const std::string& uri,
1907 : const std::string& conversationId,
1908 : const std::vector<uint8_t>& payload,
1909 : time_t received)
1910 : {
1911 76 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1912 76 : ConversationRequest req;
1913 76 : req.from = uri;
1914 76 : req.conversationId = conversationId;
1915 76 : req.received = std::time(nullptr);
1916 152 : req.metadatas = ConversationRepository::infosFromVCard(
1917 228 : vCard::utils::toMap(std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size())));
1918 76 : auto reqMap = req.toMap();
1919 :
1920 76 : auto contactInfo = pimpl_->accountManager_->getContactInfo(uri);
1921 76 : if (contactInfo && contactInfo->confirmed && !contactInfo->isBanned() && contactInfo->isActive()) {
1922 20 : JAMI_LOG("[Account {}] Contact {} is confirmed, cloning {}", pimpl_->accountId_, uri, conversationId);
1923 5 : lk.unlock();
1924 5 : updateConvForContact(uri, contactInfo->conversationId, conversationId);
1925 5 : pimpl_->cloneConversationFrom(req);
1926 5 : return;
1927 : }
1928 :
1929 71 : if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
1930 63 : lk.unlock();
1931 63 : emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_,
1932 : conversationId,
1933 : uri,
1934 : payload,
1935 : received);
1936 63 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_, conversationId, reqMap);
1937 63 : pimpl_->needsSyncingCb_({});
1938 : } else {
1939 32 : JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. Ignore", pimpl_->accountId_);
1940 : }
1941 91 : }
1942 :
1943 : void
1944 260 : ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
1945 : {
1946 260 : ConversationRequest req(value);
1947 260 : auto isOneToOne = req.isOneToOne();
1948 260 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1949 1040 : JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
1950 : pimpl_->accountId_,
1951 : req.conversationId,
1952 : from);
1953 260 : auto convId = req.conversationId;
1954 :
1955 : // Already accepted request, do nothing
1956 260 : if (pimpl_->isConversation(convId))
1957 99 : return;
1958 161 : auto oldReq = pimpl_->getRequest(convId);
1959 161 : if (oldReq != std::nullopt) {
1960 100 : JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. "
1961 : "Ignore. Declined: {}",
1962 : pimpl_->accountId_,
1963 : static_cast<int>(oldReq->declined));
1964 25 : return;
1965 : }
1966 136 : req.received = std::time(nullptr);
1967 136 : req.from = from;
1968 :
1969 136 : if (isOneToOne) {
1970 3 : auto contactInfo = pimpl_->accountManager_->getContactInfo(from);
1971 3 : if (contactInfo && contactInfo->confirmed && !contactInfo->isBanned() && contactInfo->isActive()) {
1972 8 : JAMI_LOG("[Account {}] Contact {} is confirmed, cloning {}", pimpl_->accountId_, from, convId);
1973 2 : lk.unlock();
1974 2 : updateConvForContact(from, contactInfo->conversationId, convId);
1975 2 : pimpl_->cloneConversationFrom(req);
1976 2 : return;
1977 : }
1978 3 : }
1979 :
1980 134 : auto reqMap = req.toMap();
1981 134 : if (pimpl_->addConversationRequest(convId, std::move(req))) {
1982 134 : lk.unlock();
1983 : // Note: no need to sync here because other connected devices should receive
1984 : // the same conversation request. Will sync when the conversation will be added
1985 134 : if (isOneToOne)
1986 1 : pimpl_->oneToOneRecvCb_(convId, from);
1987 134 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_, convId, reqMap);
1988 : }
1989 539 : }
1990 :
1991 : std::string
1992 3 : ConversationModule::peerFromConversationRequest(const std::string& convId) const
1993 : {
1994 3 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1995 3 : auto it = pimpl_->conversationsRequests_.find(convId);
1996 3 : if (it != pimpl_->conversationsRequests_.end()) {
1997 3 : return it->second.from;
1998 : }
1999 0 : return {};
2000 3 : }
2001 :
2002 : void
2003 136 : ConversationModule::onNeedConversationRequest(const std::string& from, const std::string& conversationId)
2004 : {
2005 136 : pimpl_->withConversation(conversationId, [&](auto& conversation) {
2006 136 : if (!conversation.isMember(from, true)) {
2007 0 : JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId);
2008 0 : return;
2009 : }
2010 544 : JAMI_LOG("{} is asking a new invite for {}", from, conversationId);
2011 136 : pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
2012 : });
2013 136 : }
2014 :
2015 : void
2016 191 : ConversationModule::acceptConversationRequest(const std::string& conversationId, const std::string& deviceId)
2017 : {
2018 : // For all conversation members, try to open a git channel with this conversation ID
2019 191 : std::unique_lock lkCr(pimpl_->conversationsRequestsMtx_);
2020 191 : auto request = pimpl_->getRequest(conversationId);
2021 191 : if (request == std::nullopt) {
2022 22 : lkCr.unlock();
2023 22 : if (auto conv = pimpl_->getConversation(conversationId)) {
2024 10 : std::unique_lock lk(conv->mtx);
2025 10 : if (!conv->conversation) {
2026 3 : lk.unlock();
2027 3 : pimpl_->cloneConversationFrom(conv, deviceId);
2028 : }
2029 32 : }
2030 88 : JAMI_WARNING("[Account {}] [Conversation {}] [device {}] Request not found.",
2031 : pimpl_->accountId_,
2032 : conversationId,
2033 : deviceId);
2034 22 : return;
2035 : }
2036 169 : pimpl_->rmConversationRequest(conversationId);
2037 169 : lkCr.unlock();
2038 169 : pimpl_->accountManager_->acceptTrustRequest(request->from, true);
2039 169 : pimpl_->cloneConversationFrom(*request);
2040 213 : }
2041 :
2042 : void
2043 6 : ConversationModule::declineConversationRequest(const std::string& conversationId)
2044 : {
2045 6 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2046 6 : auto it = pimpl_->conversationsRequests_.find(conversationId);
2047 6 : if (it != pimpl_->conversationsRequests_.end()) {
2048 6 : it->second.declined = std::time(nullptr);
2049 6 : pimpl_->saveConvRequests();
2050 : }
2051 6 : pimpl_->syncingMetadatas_.erase(conversationId);
2052 6 : pimpl_->saveMetadata();
2053 6 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_, conversationId);
2054 6 : pimpl_->needsSyncingCb_({});
2055 6 : }
2056 :
2057 : std::string
2058 186 : ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash& otherMember)
2059 : {
2060 186 : auto acc = pimpl_->account_.lock();
2061 186 : if (!acc)
2062 0 : return {};
2063 186 : std::vector<DeviceId> kd;
2064 1375 : for (const auto& [id, _] : acc->getKnownDevices())
2065 1375 : kd.emplace_back(id);
2066 : // Create the conversation object
2067 186 : std::shared_ptr<Conversation> conversation;
2068 : try {
2069 186 : conversation = std::make_shared<Conversation>(acc, mode, otherMember.toString());
2070 186 : auto conversationId = conversation->id();
2071 186 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
2072 567 : auto msg = std::make_shared<SyncMsg>();
2073 1134 : msg->ms = {{conversationId, status}};
2074 567 : pimpl_->needsSyncingCb_(std::move(msg));
2075 567 : });
2076 186 : conversation->onMembersChanged([w = pimpl_->weak_from_this(), conversationId](const auto& members) {
2077 : // Delay in another thread to avoid deadlocks
2078 1154 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
2079 1154 : if (auto sthis = w.lock())
2080 576 : sthis->setConversationMembers(conversationId, members);
2081 : });
2082 577 : });
2083 186 : conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
2084 : #ifdef LIBJAMI_TEST
2085 186 : conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
2086 : #endif
2087 372 : conversation->bootstrap(
2088 186 : [w = pimpl_->weak_from_this(), conversationId]() {
2089 77 : if (auto sthis = w.lock())
2090 77 : sthis->bootstrapCb(conversationId);
2091 77 : },
2092 : kd);
2093 186 : } catch (const std::exception& e) {
2094 0 : JAMI_ERROR("[Account {}] Error while generating a conversation {}", pimpl_->accountId_, e.what());
2095 0 : return {};
2096 0 : }
2097 186 : auto convId = conversation->id();
2098 186 : auto conv = pimpl_->startConversation(convId);
2099 186 : std::unique_lock lk(conv->mtx);
2100 186 : conv->info.created = std::time(nullptr);
2101 186 : conv->info.members.emplace(pimpl_->username_);
2102 186 : if (otherMember)
2103 69 : conv->info.members.emplace(otherMember.toString());
2104 186 : conv->conversation = conversation;
2105 186 : addConvInfo(conv->info);
2106 186 : lk.unlock();
2107 :
2108 186 : pimpl_->needsSyncingCb_({});
2109 186 : emitSignal<libjami::ConversationSignal::ConversationReady>(pimpl_->accountId_, convId);
2110 186 : return convId;
2111 186 : }
2112 :
2113 : void
2114 0 : ConversationModule::cloneConversationFrom(const std::string& conversationId,
2115 : const std::string& uri,
2116 : const std::string& oldConvId)
2117 : {
2118 0 : pimpl_->cloneConversationFrom(conversationId, uri, oldConvId);
2119 0 : }
2120 :
2121 : // Message send/load
2122 : void
2123 91 : ConversationModule::sendMessage(const std::string& conversationId,
2124 : std::string message,
2125 : const std::string& replyTo,
2126 : const std::string& type,
2127 : bool announce,
2128 : OnCommitCb&& onCommit,
2129 : OnDoneCb&& cb)
2130 : {
2131 91 : pimpl_->sendMessage(conversationId, std::move(message), replyTo, type, announce, std::move(onCommit), std::move(cb));
2132 91 : }
2133 :
2134 : void
2135 17 : ConversationModule::sendMessage(const std::string& conversationId,
2136 : Json::Value&& value,
2137 : const std::string& replyTo,
2138 : bool announce,
2139 : OnCommitCb&& onCommit,
2140 : OnDoneCb&& cb)
2141 : {
2142 17 : pimpl_->sendMessage(conversationId, std::move(value), replyTo, announce, std::move(onCommit), std::move(cb));
2143 17 : }
2144 :
2145 : void
2146 8 : ConversationModule::editMessage(const std::string& conversationId,
2147 : const std::string& newBody,
2148 : const std::string& editedId)
2149 : {
2150 8 : pimpl_->editMessage(conversationId, newBody, editedId);
2151 8 : }
2152 :
2153 : void
2154 3 : ConversationModule::reactToMessage(const std::string& conversationId,
2155 : const std::string& newBody,
2156 : const std::string& reactToId)
2157 : {
2158 : // Commit message edition
2159 3 : Json::Value json;
2160 3 : json["body"] = newBody;
2161 3 : json["react-to"] = reactToId;
2162 3 : json["type"] = "text/plain";
2163 3 : pimpl_->sendMessage(conversationId, std::move(json));
2164 3 : }
2165 :
2166 : void
2167 93 : ConversationModule::addCallHistoryMessage(const std::string& uri, uint64_t duration_ms, const std::string& reason)
2168 : {
2169 93 : auto finalUri = uri.substr(0, uri.find("@ring.dht"));
2170 93 : finalUri = finalUri.substr(0, uri.find("@jami.dht"));
2171 93 : auto convId = getOneToOneConversation(finalUri);
2172 93 : if (!convId.empty()) {
2173 3 : Json::Value value;
2174 3 : value["to"] = finalUri;
2175 3 : value["type"] = "application/call-history+json";
2176 3 : value["duration"] = std::to_string(duration_ms);
2177 3 : if (!reason.empty())
2178 3 : value["reason"] = reason;
2179 3 : sendMessage(convId, std::move(value));
2180 3 : }
2181 93 : }
2182 :
2183 : bool
2184 18 : ConversationModule::onMessageDisplayed(const std::string& peer,
2185 : const std::string& conversationId,
2186 : const std::string& interactionId)
2187 : {
2188 18 : if (auto conv = pimpl_->getConversation(conversationId)) {
2189 18 : std::unique_lock lk(conv->mtx);
2190 18 : if (auto conversation = conv->conversation) {
2191 18 : lk.unlock();
2192 18 : return conversation->setMessageDisplayed(peer, interactionId);
2193 18 : }
2194 36 : }
2195 0 : return false;
2196 : }
2197 :
2198 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
2199 196 : ConversationModule::convMessageStatus() const
2200 : {
2201 196 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
2202 284 : for (const auto& conv : pimpl_->getConversations()) {
2203 87 : auto d = conv->messageStatus();
2204 87 : if (!d.empty())
2205 29 : messageStatus[conv->id()] = std::move(d);
2206 284 : }
2207 198 : return messageStatus;
2208 0 : }
2209 :
2210 : void
2211 0 : ConversationModule::clearCache(const std::string& conversationId)
2212 : {
2213 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2214 0 : std::lock_guard lk(conv->mtx);
2215 0 : if (conv->conversation) {
2216 0 : conv->conversation->clearCache();
2217 : }
2218 0 : }
2219 0 : }
2220 :
2221 : uint32_t
2222 2 : ConversationModule::loadConversation(const std::string& conversationId, const std::string& fromMessage, size_t n)
2223 : {
2224 2 : auto acc = pimpl_->account_.lock();
2225 2 : if (auto conv = pimpl_->getConversation(conversationId)) {
2226 2 : std::lock_guard lk(conv->mtx);
2227 2 : if (conv->conversation) {
2228 2 : const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2229 2 : LogOptions options;
2230 2 : options.from = fromMessage;
2231 2 : options.nbOfCommits = n;
2232 4 : conv->conversation->loadMessages(
2233 2 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2234 2 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id, accountId, conversationId, messages);
2235 2 : },
2236 : options);
2237 2 : return id;
2238 2 : }
2239 4 : }
2240 0 : return 0;
2241 2 : }
2242 :
2243 : uint32_t
2244 0 : ConversationModule::loadSwarmUntil(const std::string& conversationId,
2245 : const std::string& fromMessage,
2246 : const std::string& toMessage)
2247 : {
2248 0 : auto acc = pimpl_->account_.lock();
2249 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2250 0 : std::lock_guard lk(conv->mtx);
2251 0 : if (conv->conversation) {
2252 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2253 0 : LogOptions options;
2254 0 : options.from = fromMessage;
2255 0 : options.to = toMessage;
2256 0 : options.includeTo = true;
2257 0 : conv->conversation->loadMessages(
2258 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2259 0 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id, accountId, conversationId, messages);
2260 0 : },
2261 : options);
2262 0 : return id;
2263 0 : }
2264 0 : }
2265 0 : return 0;
2266 0 : }
2267 :
2268 : std::shared_ptr<TransferManager>
2269 85 : ConversationModule::dataTransfer(const std::string& conversationId) const
2270 : {
2271 167 : return pimpl_->withConversation(conversationId, [](auto& conversation) { return conversation.dataTransfer(); });
2272 : }
2273 :
2274 : bool
2275 14 : ConversationModule::onFileChannelRequest(const std::string& conversationId,
2276 : const std::string& member,
2277 : const std::string& fileId,
2278 : bool verifyShaSum) const
2279 : {
2280 14 : if (auto conv = pimpl_->getConversation(conversationId)) {
2281 14 : std::filesystem::path path;
2282 14 : std::string sha3sum;
2283 14 : std::unique_lock lk(conv->mtx);
2284 14 : if (!conv->conversation)
2285 0 : return false;
2286 14 : if (!conv->conversation->onFileChannelRequest(member, fileId, path, sha3sum))
2287 0 : return false;
2288 :
2289 : // Release the lock here to prevent the sha3 calculation from blocking other threads.
2290 14 : lk.unlock();
2291 14 : if (!std::filesystem::is_regular_file(path)) {
2292 4 : JAMI_WARNING("[Account {:s}] [Conversation {}] {:s} asked for non existing file {}",
2293 : pimpl_->accountId_,
2294 : conversationId,
2295 : member,
2296 : fileId);
2297 1 : return false;
2298 : }
2299 : // Check that our file is correct before sending
2300 13 : if (verifyShaSum && sha3sum != fileutils::sha3File(path)) {
2301 4 : JAMI_WARNING("[Account {:s}] [Conversation {}] {:s} asked for file {:s}, but our version is not "
2302 : "complete or corrupted",
2303 : pimpl_->accountId_,
2304 : conversationId,
2305 : member,
2306 : fileId);
2307 1 : return false;
2308 : }
2309 12 : return true;
2310 28 : }
2311 0 : return false;
2312 : }
2313 :
2314 : bool
2315 13 : ConversationModule::downloadFile(const std::string& conversationId,
2316 : const std::string& interactionId,
2317 : const std::string& fileId,
2318 : const std::string& path)
2319 : {
2320 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2321 13 : std::lock_guard lk(conv->mtx);
2322 13 : if (conv->conversation)
2323 13 : return conv->conversation->downloadFile(interactionId, fileId, path, "", "");
2324 26 : }
2325 0 : return false;
2326 : }
2327 :
2328 : void
2329 689 : ConversationModule::syncConversations(const std::string& peer, const std::string& deviceId)
2330 : {
2331 : // Sync conversations where peer is member
2332 689 : std::set<std::string> toFetch;
2333 690 : std::set<std::string> toClone;
2334 1358 : for (const auto& conv : pimpl_->getSyncedConversations()) {
2335 667 : std::lock_guard lk(conv->mtx);
2336 667 : if (conv->conversation) {
2337 652 : if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
2338 463 : toFetch.emplace(conv->info.id);
2339 : }
2340 14 : } else if (!conv->info.isRemoved()
2341 36 : && std::find(conv->info.members.begin(), conv->info.members.end(), peer)
2342 36 : != conv->info.members.end()) {
2343 : // In this case the conversation was never cloned (can be after an import)
2344 11 : toClone.emplace(conv->info.id);
2345 : }
2346 1356 : }
2347 1154 : for (const auto& cid : toFetch)
2348 463 : pimpl_->fetchNewCommits(peer, deviceId, cid);
2349 702 : for (const auto& cid : toClone)
2350 11 : pimpl_->cloneConversation(deviceId, peer, cid);
2351 1382 : if (pimpl_->syncCnt.load() == 0)
2352 184 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_);
2353 691 : }
2354 :
2355 : void
2356 159 : ConversationModule::onSyncData(const SyncMsg& msg, const std::string& peerId, const std::string& deviceId)
2357 : {
2358 159 : std::vector<std::string> toClone;
2359 272 : for (const auto& [key, convInfo] : msg.c) {
2360 113 : const auto& convId = convInfo.id;
2361 : {
2362 113 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2363 113 : pimpl_->rmConversationRequest(convId);
2364 113 : }
2365 :
2366 113 : auto conv = pimpl_->startConversation(convInfo);
2367 113 : std::unique_lock lk(conv->mtx);
2368 : // Skip outdated info
2369 113 : if (std::max(convInfo.created, convInfo.removed) < std::max(conv->info.created, conv->info.removed))
2370 3 : continue;
2371 110 : if (not convInfo.isRemoved()) {
2372 : // If multi devices, it can detect a conversation that was already
2373 : // removed, so just check if the convinfo contains a removed conv
2374 102 : if (conv->info.removed) {
2375 0 : if (conv->info.removed >= convInfo.created) {
2376 : // Only reclone if re-added, else the peer is not synced yet (could be
2377 : // offline before)
2378 0 : continue;
2379 : }
2380 0 : JAMI_DEBUG("Re-add previously removed conversation {:s}", convId);
2381 : }
2382 102 : conv->info = convInfo;
2383 102 : if (!conv->conversation) {
2384 45 : if (deviceId != "") {
2385 45 : pimpl_->cloneConversation(deviceId, peerId, conv);
2386 : } else {
2387 : // In this case, information is from JAMS
2388 : // JAMS does not store the conversation itself, so we
2389 : // must use information to clone the conversation
2390 0 : addConvInfo(convInfo);
2391 0 : toClone.emplace_back(convId);
2392 : }
2393 : }
2394 : } else {
2395 8 : if (conv->conversation && !conv->conversation->isRemoving()) {
2396 1 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, convId);
2397 1 : conv->conversation->setRemovingFlag();
2398 : }
2399 8 : auto update = false;
2400 8 : if (!conv->info.removed) {
2401 1 : update = true;
2402 1 : conv->info.removed = std::time(nullptr);
2403 1 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, convId);
2404 : }
2405 8 : if (convInfo.erased && !conv->info.erased) {
2406 1 : conv->info.erased = std::time(nullptr);
2407 1 : pimpl_->addConvInfo(conv->info);
2408 1 : pimpl_->removeRepositoryImpl(*conv, false);
2409 7 : } else if (update) {
2410 1 : pimpl_->addConvInfo(conv->info);
2411 : }
2412 : }
2413 116 : }
2414 :
2415 159 : for (const auto& cid : toClone) {
2416 0 : auto members = getConversationMembers(cid);
2417 0 : for (const auto& member : members) {
2418 0 : if (member.at("uri") != pimpl_->username_)
2419 0 : cloneConversationFrom(cid, member.at("uri"));
2420 : }
2421 0 : }
2422 :
2423 180 : for (const auto& [convId, req] : msg.cr) {
2424 21 : if (req.from == pimpl_->username_) {
2425 0 : JAMI_WARNING("Detected request from ourself, ignore {}.", convId);
2426 17 : continue;
2427 0 : }
2428 21 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
2429 21 : if (pimpl_->isConversation(convId)) {
2430 : // Already handled request
2431 0 : pimpl_->rmConversationRequest(convId);
2432 0 : continue;
2433 : }
2434 :
2435 : // New request
2436 21 : if (!pimpl_->addConversationRequest(convId, req))
2437 12 : continue;
2438 :
2439 9 : if (req.declined != 0) {
2440 : // Request declined
2441 20 : JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
2442 : pimpl_->accountId_,
2443 : convId,
2444 : deviceId);
2445 5 : pimpl_->syncingMetadatas_.erase(convId);
2446 5 : pimpl_->saveMetadata();
2447 5 : lk.unlock();
2448 5 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_, convId);
2449 5 : continue;
2450 5 : }
2451 4 : lk.unlock();
2452 :
2453 16 : JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})",
2454 : pimpl_->accountId_,
2455 : convId,
2456 : deviceId);
2457 :
2458 4 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_, convId, req.toMap());
2459 21 : }
2460 :
2461 : // Updates preferences for conversations
2462 163 : for (const auto& [convId, p] : msg.p) {
2463 4 : if (auto conv = pimpl_->getConversation(convId)) {
2464 4 : std::unique_lock lk(conv->mtx);
2465 4 : if (conv->conversation) {
2466 2 : auto conversation = conv->conversation;
2467 2 : lk.unlock();
2468 2 : conversation->updatePreferences(p);
2469 4 : } else if (conv->pending) {
2470 2 : conv->pending->preferences = p;
2471 : }
2472 8 : }
2473 : }
2474 :
2475 : // Updates displayed for conversations
2476 208 : for (const auto& [convId, ms] : msg.ms) {
2477 49 : if (auto conv = pimpl_->getConversation(convId)) {
2478 49 : std::unique_lock lk(conv->mtx);
2479 49 : if (conv->conversation) {
2480 31 : auto conversation = conv->conversation;
2481 31 : lk.unlock();
2482 31 : conversation->updateMessageStatus(ms);
2483 49 : } else if (conv->pending) {
2484 18 : conv->pending->status = ms;
2485 : }
2486 98 : }
2487 : }
2488 159 : }
2489 :
2490 : bool
2491 2 : ConversationModule::needsSyncingWith(const std::string& memberUri) const
2492 : {
2493 : // Check if a conversation needs to fetch remote or to be cloned
2494 2 : std::lock_guard lk(pimpl_->conversationsMtx_);
2495 2 : for (const auto& [key, ci] : pimpl_->conversations_) {
2496 1 : std::lock_guard lk(ci->mtx);
2497 1 : if (ci->conversation) {
2498 0 : if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false))
2499 0 : return true;
2500 1 : } else if (!ci->info.removed
2501 1 : && std::find(ci->info.members.begin(), ci->info.members.end(), memberUri) != ci->info.members.end()) {
2502 : // In this case the conversation was never cloned (can be after an import)
2503 1 : return true;
2504 : }
2505 1 : }
2506 1 : return false;
2507 2 : }
2508 :
2509 : void
2510 1117 : ConversationModule::setFetched(const std::string& conversationId,
2511 : const std::string& deviceId,
2512 : const std::string& commitId)
2513 : {
2514 1117 : if (auto conv = pimpl_->getConversation(conversationId)) {
2515 1117 : std::lock_guard lk(conv->mtx);
2516 1117 : if (conv->conversation) {
2517 1117 : bool remove = conv->conversation->isRemoving();
2518 1117 : conv->conversation->hasFetched(deviceId, commitId);
2519 1117 : if (remove)
2520 1 : pimpl_->removeRepositoryImpl(*conv, true);
2521 : }
2522 2234 : }
2523 1117 : }
2524 :
2525 : void
2526 12390 : ConversationModule::fetchNewCommits(const std::string& peer,
2527 : const std::string& deviceId,
2528 : const std::string& conversationId,
2529 : const std::string& commitId)
2530 : {
2531 12390 : pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId);
2532 12393 : }
2533 :
2534 : void
2535 138 : ConversationModule::addConversationMember(const std::string& conversationId,
2536 : const dht::InfoHash& contactUri,
2537 : bool sendRequest)
2538 : {
2539 138 : auto conv = pimpl_->getConversation(conversationId);
2540 138 : if (not conv || not conv->conversation) {
2541 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2542 0 : return;
2543 : }
2544 138 : std::unique_lock lk(conv->mtx);
2545 :
2546 138 : auto contactUriStr = contactUri.toString();
2547 138 : if (conv->conversation->isMember(contactUriStr, true)) {
2548 0 : JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUriStr, conversationId);
2549 : // Note: This should not be necessary, but if for whatever reason the other side didn't
2550 : // join we should not forbid new invites
2551 0 : auto invite = conv->conversation->generateInvitation();
2552 0 : lk.unlock();
2553 0 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2554 0 : return;
2555 0 : }
2556 :
2557 138 : conv->conversation->addMember(contactUriStr,
2558 138 : [this, conv, conversationId, sendRequest, contactUriStr](bool ok,
2559 404 : const std::string& commitId) {
2560 138 : if (ok) {
2561 136 : std::unique_lock lk(conv->mtx);
2562 136 : pimpl_->sendMessageNotification(*conv->conversation,
2563 : true,
2564 : commitId); // For the other members
2565 136 : if (sendRequest) {
2566 132 : auto invite = conv->conversation->generateInvitation();
2567 132 : lk.unlock();
2568 132 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2569 132 : }
2570 136 : }
2571 138 : });
2572 138 : }
2573 :
2574 : void
2575 14 : ConversationModule::removeConversationMember(const std::string& conversationId,
2576 : const dht::InfoHash& contactUri,
2577 : bool isDevice)
2578 : {
2579 14 : auto contactUriStr = contactUri.toString();
2580 14 : if (auto conv = pimpl_->getConversation(conversationId)) {
2581 14 : std::lock_guard lk(conv->mtx);
2582 14 : if (conv->conversation)
2583 14 : return conv->conversation
2584 14 : ->removeMember(contactUriStr, isDevice, [this, conversationId](bool ok, const std::string& commitId) {
2585 14 : if (ok) {
2586 12 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2587 : }
2588 28 : });
2589 28 : }
2590 14 : }
2591 :
2592 : std::vector<std::map<std::string, std::string>>
2593 93 : ConversationModule::getConversationMembers(const std::string& conversationId, bool includeBanned) const
2594 : {
2595 93 : return pimpl_->getConversationMembers(conversationId, includeBanned);
2596 : }
2597 :
2598 : uint32_t
2599 4 : ConversationModule::countInteractions(const std::string& convId,
2600 : const std::string& toId,
2601 : const std::string& fromId,
2602 : const std::string& authorUri) const
2603 : {
2604 4 : if (auto conv = pimpl_->getConversation(convId)) {
2605 4 : std::lock_guard lk(conv->mtx);
2606 4 : if (conv->conversation)
2607 4 : return conv->conversation->countInteractions(toId, fromId, authorUri);
2608 8 : }
2609 0 : return 0;
2610 : }
2611 :
2612 : void
2613 4 : ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const
2614 : {
2615 4 : if (convId.empty()) {
2616 0 : auto convs = pimpl_->getConversations();
2617 0 : if (convs.empty()) {
2618 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2619 0 : pimpl_->accountId_,
2620 0 : std::string {},
2621 0 : std::vector<std::map<std::string, std::string>> {});
2622 0 : return;
2623 : }
2624 0 : auto finishedFlag = std::make_shared<std::atomic_int>(convs.size());
2625 0 : for (const auto& conv : convs) {
2626 0 : conv->search(req, filter, finishedFlag);
2627 : }
2628 4 : } else if (auto conv = pimpl_->getConversation(convId)) {
2629 4 : std::lock_guard lk(conv->mtx);
2630 4 : if (conv->conversation)
2631 4 : conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1));
2632 8 : }
2633 : }
2634 :
2635 : void
2636 9 : ConversationModule::updateConversationInfos(const std::string& conversationId,
2637 : const std::map<std::string, std::string>& infos,
2638 : bool sync)
2639 : {
2640 9 : auto conv = pimpl_->getConversation(conversationId);
2641 9 : if (not conv or not conv->conversation) {
2642 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2643 0 : return;
2644 : }
2645 9 : std::lock_guard lk(conv->mtx);
2646 9 : conv->conversation->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) {
2647 9 : if (ok && sync) {
2648 8 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2649 1 : } else if (sync)
2650 4 : JAMI_WARNING("Unable to update info on {:s}", conversationId);
2651 9 : });
2652 9 : }
2653 :
2654 : std::map<std::string, std::string>
2655 5 : ConversationModule::conversationInfos(const std::string& conversationId) const
2656 : {
2657 : {
2658 5 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2659 5 : auto itReq = pimpl_->conversationsRequests_.find(conversationId);
2660 5 : if (itReq != pimpl_->conversationsRequests_.end())
2661 1 : return itReq->second.metadatas;
2662 5 : }
2663 4 : if (auto conv = pimpl_->getConversation(conversationId)) {
2664 4 : std::lock_guard lk(conv->mtx);
2665 4 : std::map<std::string, std::string> md;
2666 : {
2667 4 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2668 4 : auto syncingMetadatasIt = pimpl_->syncingMetadatas_.find(conversationId);
2669 4 : if (syncingMetadatasIt != pimpl_->syncingMetadatas_.end()) {
2670 2 : if (conv->conversation) {
2671 0 : pimpl_->syncingMetadatas_.erase(syncingMetadatasIt);
2672 0 : pimpl_->saveMetadata();
2673 : } else {
2674 2 : md = syncingMetadatasIt->second;
2675 : }
2676 : }
2677 4 : }
2678 4 : if (conv->conversation)
2679 2 : return conv->conversation->infos();
2680 : else
2681 2 : return md;
2682 8 : }
2683 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2684 0 : return {};
2685 : }
2686 :
2687 : void
2688 5 : ConversationModule::setConversationPreferences(const std::string& conversationId,
2689 : const std::map<std::string, std::string>& prefs)
2690 : {
2691 5 : if (auto conv = pimpl_->getConversation(conversationId)) {
2692 5 : std::unique_lock lk(conv->mtx);
2693 5 : if (not conv->conversation) {
2694 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2695 0 : return;
2696 : }
2697 5 : auto conversation = conv->conversation;
2698 5 : lk.unlock();
2699 5 : conversation->updatePreferences(prefs);
2700 5 : auto msg = std::make_shared<SyncMsg>();
2701 10 : msg->p = {{conversationId, conversation->preferences(true)}};
2702 5 : pimpl_->needsSyncingCb_(std::move(msg));
2703 10 : }
2704 : }
2705 :
2706 : std::map<std::string, std::string>
2707 14 : ConversationModule::getConversationPreferences(const std::string& conversationId, bool includeCreated) const
2708 : {
2709 14 : if (auto conv = pimpl_->getConversation(conversationId)) {
2710 14 : std::lock_guard lk(conv->mtx);
2711 14 : if (conv->conversation)
2712 13 : return conv->conversation->preferences(includeCreated);
2713 28 : }
2714 1 : return {};
2715 : }
2716 :
2717 : std::map<std::string, std::map<std::string, std::string>>
2718 196 : ConversationModule::convPreferences() const
2719 : {
2720 196 : std::map<std::string, std::map<std::string, std::string>> p;
2721 285 : for (const auto& conv : pimpl_->getConversations()) {
2722 87 : auto prefs = conv->preferences(true);
2723 87 : if (!prefs.empty())
2724 3 : p[conv->id()] = std::move(prefs);
2725 285 : }
2726 197 : return p;
2727 0 : }
2728 :
2729 : std::vector<uint8_t>
2730 0 : ConversationModule::conversationVCard(const std::string& conversationId) const
2731 : {
2732 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2733 0 : std::lock_guard lk(conv->mtx);
2734 0 : if (conv->conversation)
2735 0 : return conv->conversation->vCard();
2736 0 : }
2737 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2738 0 : return {};
2739 : }
2740 :
2741 : bool
2742 4054 : ConversationModule::isBanned(const std::string& convId, const std::string& uri) const
2743 : {
2744 : dhtnet::tls::TrustStore::PermissionStatus status;
2745 : {
2746 4054 : std::lock_guard lk(pimpl_->conversationsMtx_);
2747 4055 : status = pimpl_->accountManager_->getCertificateStatus(uri);
2748 4055 : }
2749 4055 : if (auto conv = pimpl_->getConversation(convId)) {
2750 4044 : std::lock_guard lk(conv->mtx);
2751 4044 : if (!conv->conversation)
2752 53 : return true;
2753 3991 : if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE)
2754 3336 : return conv->conversation->isBanned(uri);
2755 : // If 1:1 we check the certificate status
2756 655 : return status == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
2757 8099 : }
2758 10 : return true;
2759 : }
2760 :
2761 : void
2762 23 : ConversationModule::removeContact(const std::string& uri, bool banned)
2763 : {
2764 : // Remove linked conversation's requests
2765 : {
2766 23 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2767 23 : auto update = false;
2768 27 : for (auto it = pimpl_->conversationsRequests_.begin(); it != pimpl_->conversationsRequests_.end(); ++it) {
2769 4 : if (it->second.from == uri && !it->second.declined) {
2770 16 : JAMI_DEBUG("Declining conversation request {:s} from {:s}", it->first, uri);
2771 4 : pimpl_->syncingMetadatas_.erase(it->first);
2772 4 : pimpl_->saveMetadata();
2773 4 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_, it->first);
2774 4 : update = true;
2775 4 : it->second.declined = std::time(nullptr);
2776 : }
2777 : }
2778 23 : if (update) {
2779 4 : pimpl_->saveConvRequests();
2780 4 : pimpl_->needsSyncingCb_({});
2781 : }
2782 23 : }
2783 23 : if (banned) {
2784 7 : auto conversationId = getOneToOneConversation(uri);
2785 7 : pimpl_->withConversation(conversationId, [&](auto& conv) { conv.shutdownConnections(); });
2786 7 : return; // Keep the conversation in banned model but stop connections
2787 7 : }
2788 :
2789 : // Removed contacts should not be linked to any conversation
2790 16 : pimpl_->accountManager_->updateContactConversation(uri, "");
2791 :
2792 : // Remove all one-to-one conversations with the removed contact
2793 16 : auto isSelf = uri == pimpl_->username_;
2794 16 : std::vector<std::string> toRm;
2795 15 : auto removeConvInfo = [&](const auto& conv, const auto& members) {
2796 1 : if ((isSelf && members.size() == 1)
2797 16 : || (!isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
2798 : // Mark the conversation as removed if it wasn't already
2799 14 : if (!conv->info.isRemoved()) {
2800 14 : conv->info.removed = std::time(nullptr);
2801 14 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, conv->info.id);
2802 14 : pimpl_->addConvInfo(conv->info);
2803 14 : return true;
2804 : }
2805 : }
2806 1 : return false;
2807 16 : };
2808 : {
2809 16 : std::lock_guard lk(pimpl_->conversationsMtx_);
2810 31 : for (auto& [convId, conv] : pimpl_->conversations_) {
2811 15 : std::lock_guard lk(conv->mtx);
2812 15 : if (conv->conversation) {
2813 : try {
2814 : // Note it's important to check getUsername(), else
2815 : // removing self can remove all conversations
2816 14 : if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) {
2817 14 : auto initMembers = conv->conversation->getInitialMembers();
2818 14 : if (removeConvInfo(conv, initMembers))
2819 13 : toRm.emplace_back(convId);
2820 14 : }
2821 0 : } catch (const std::exception& e) {
2822 0 : JAMI_WARN("%s", e.what());
2823 0 : }
2824 : } else {
2825 1 : removeConvInfo(conv, conv->info.members);
2826 : }
2827 15 : }
2828 16 : }
2829 29 : for (const auto& id : toRm)
2830 13 : pimpl_->removeRepository(id, true, true);
2831 16 : }
2832 :
2833 : bool
2834 9 : ConversationModule::removeConversation(const std::string& conversationId)
2835 : {
2836 9 : auto conversation = pimpl_->getConversation(conversationId);
2837 9 : std::string existingConvId;
2838 :
2839 9 : if (!conversation) {
2840 4 : JAMI_LOG("Conversation {} not found", conversationId);
2841 1 : return false;
2842 : }
2843 :
2844 8 : std::shared_ptr<Conversation> conv;
2845 : {
2846 8 : std::lock_guard lk(conversation->mtx);
2847 8 : conv = conversation->conversation;
2848 8 : }
2849 :
2850 2 : auto sendNotification = [&](const std::string& convId) {
2851 2 : if (auto convObj = pimpl_->getConversation(convId)) {
2852 2 : std::lock_guard lk(convObj->mtx);
2853 2 : if (convObj->conversation) {
2854 2 : auto commitId = convObj->conversation->lastCommitId();
2855 2 : if (!commitId.empty()) {
2856 2 : pimpl_->sendMessageNotification(*convObj->conversation, true, commitId);
2857 : }
2858 2 : }
2859 4 : }
2860 2 : };
2861 :
2862 2 : auto handleNewConversation = [&](const std::string& uri) {
2863 2 : std::string newConvId = startConversation(ConversationMode::ONE_TO_ONE, dht::InfoHash(uri));
2864 2 : pimpl_->accountManager_->updateContactConversation(uri, newConvId, true);
2865 2 : sendNotification(newConvId);
2866 2 : return newConvId;
2867 0 : };
2868 :
2869 8 : if (!conv) {
2870 1 : auto contacts = pimpl_->accountManager_->getContacts(false);
2871 2 : for (const auto& contact : contacts) {
2872 1 : if (contact.second.conversationId == conversationId) {
2873 1 : const std::string& uri = contact.first.toString();
2874 1 : handleNewConversation(uri);
2875 1 : }
2876 : }
2877 :
2878 1 : return pimpl_->removeConversation(conversationId);
2879 1 : }
2880 :
2881 7 : if (conv->mode() == ConversationMode::ONE_TO_ONE) {
2882 1 : auto members = conv->getMembers(true, false, false);
2883 1 : if (members.empty()) {
2884 0 : return false;
2885 : }
2886 :
2887 3 : for (const auto& m : members) {
2888 2 : const auto& uri = m.at("uri");
2889 :
2890 2 : if (members.size() == 1 && uri == pimpl_->username_) {
2891 0 : if (conv->getInitialMembers().size() == 1 && conv->getInitialMembers()[0] == pimpl_->username_) {
2892 : // Self conversation, create new conversation and remove the old one
2893 0 : handleNewConversation(uri);
2894 : } else {
2895 0 : existingConvId = findMatchingOneToOneConversation(conversationId, conv->memberUris("", {}));
2896 0 : if (existingConvId.empty()) {
2897 : // If left with only ended conversation of peer
2898 0 : for (const auto& otherMember : conv->getInitialMembers()) {
2899 0 : if (otherMember != pimpl_->username_) {
2900 0 : handleNewConversation(otherMember);
2901 : }
2902 0 : }
2903 : }
2904 : }
2905 0 : break;
2906 : }
2907 :
2908 2 : if (uri == pimpl_->username_)
2909 1 : continue;
2910 :
2911 1 : existingConvId = findMatchingOneToOneConversation(conversationId, conv->memberUris("", {}));
2912 1 : if (!existingConvId.empty()) {
2913 : // Found an existing conversation, just update the contact
2914 0 : pimpl_->accountManager_->updateContactConversation(uri, existingConvId, true);
2915 0 : sendNotification(existingConvId);
2916 : } else {
2917 : // No existing conversation found, create a new one
2918 1 : handleNewConversation(uri);
2919 : }
2920 : }
2921 1 : }
2922 :
2923 7 : return pimpl_->removeConversation(conversationId);
2924 9 : }
2925 :
2926 : std::string
2927 1 : ConversationModule::findMatchingOneToOneConversation(const std::string& excludedConversationId,
2928 : const std::set<std::string>& targetUris) const
2929 : {
2930 1 : std::lock_guard lk(pimpl_->conversationsMtx_);
2931 2 : for (const auto& [otherConvId, otherConvPtr] : pimpl_->conversations_) {
2932 1 : if (otherConvId == excludedConversationId)
2933 1 : continue;
2934 :
2935 0 : std::lock_guard lk(otherConvPtr->mtx);
2936 0 : if (!otherConvPtr->conversation || otherConvPtr->conversation->mode() != ConversationMode::ONE_TO_ONE)
2937 0 : continue;
2938 :
2939 0 : const auto& info = otherConvPtr->info;
2940 0 : if (info.removed != 0 && info.isRemoved())
2941 0 : continue;
2942 :
2943 0 : auto otherUris = otherConvPtr->conversation->memberUris();
2944 :
2945 0 : if (otherUris == targetUris)
2946 0 : return otherConvId;
2947 0 : }
2948 :
2949 1 : return {};
2950 1 : }
2951 :
2952 : bool
2953 64 : ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const
2954 : {
2955 64 : if (conversationId.empty()) {
2956 54 : std::lock_guard lk(pimpl_->conversationsMtx_);
2957 54 : return std::find_if(pimpl_->conversations_.cbegin(),
2958 54 : pimpl_->conversations_.cend(),
2959 10 : [&](const auto& conv) {
2960 10 : return conv.second->conversation && conv.second->conversation->isHosting(confId);
2961 : })
2962 108 : != pimpl_->conversations_.cend();
2963 64 : } else if (auto conv = pimpl_->getConversation(conversationId)) {
2964 10 : if (conv->conversation) {
2965 10 : return conv->conversation->isHosting(confId);
2966 : }
2967 10 : }
2968 0 : return false;
2969 : }
2970 :
2971 : std::vector<std::map<std::string, std::string>>
2972 16 : ConversationModule::getActiveCalls(const std::string& conversationId) const
2973 : {
2974 16 : return pimpl_->withConversation(conversationId,
2975 32 : [](const auto& conversation) { return conversation.currentCalls(); });
2976 : }
2977 :
2978 : std::shared_ptr<SIPCall>
2979 22 : ConversationModule::call(const std::string& url,
2980 : const std::vector<libjami::MediaMap>& mediaList,
2981 : std::function<void(const std::string&, const DeviceId&, const std::shared_ptr<SIPCall>&)>&& cb)
2982 : {
2983 88 : std::string conversationId = "", confId = "", uri = "", deviceId = "";
2984 22 : if (url.find('/') == std::string::npos) {
2985 13 : conversationId = url;
2986 : } else {
2987 9 : auto parameters = jami::split_string(url, '/');
2988 9 : if (parameters.size() != 4) {
2989 0 : JAMI_ERROR("Incorrect url {:s}", url);
2990 0 : return {};
2991 : }
2992 9 : conversationId = parameters[0];
2993 9 : uri = parameters[1];
2994 9 : deviceId = parameters[2];
2995 9 : confId = parameters[3];
2996 9 : }
2997 :
2998 22 : auto conv = pimpl_->getConversation(conversationId);
2999 22 : if (!conv)
3000 0 : return {};
3001 22 : std::unique_lock lk(conv->mtx);
3002 22 : if (!conv->conversation) {
3003 0 : JAMI_ERROR("Conversation {:s} not found", conversationId);
3004 0 : return {};
3005 : }
3006 :
3007 : // Check if we want to join a specific conference
3008 : // So, if confId is specified or if there is some activeCalls
3009 : // or if we are the default host.
3010 22 : auto activeCalls = conv->conversation->currentCalls();
3011 22 : auto infos = conv->conversation->infos();
3012 22 : auto itRdvAccount = infos.find("rdvAccount");
3013 22 : auto itRdvDevice = infos.find("rdvDevice");
3014 22 : auto sendCallRequest = false;
3015 22 : if (!confId.empty()) {
3016 9 : sendCallRequest = true;
3017 36 : JAMI_DEBUG("Calling self, join conference");
3018 13 : } else if (!activeCalls.empty()) {
3019 : // Else, we try to join active calls
3020 0 : sendCallRequest = true;
3021 0 : auto& ac = *activeCalls.rbegin();
3022 0 : confId = ac.at("id");
3023 0 : uri = ac.at("uri");
3024 0 : deviceId = ac.at("device");
3025 13 : } else if (itRdvAccount != infos.end() && itRdvDevice != infos.end() && !itRdvAccount->second.empty()) {
3026 : // Else, creates "to" (accountId/deviceId/conversationId/confId) and ask remote host
3027 3 : sendCallRequest = true;
3028 3 : uri = itRdvAccount->second;
3029 3 : deviceId = itRdvDevice->second;
3030 3 : confId = "0";
3031 12 : JAMI_DEBUG("Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
3032 : }
3033 22 : lk.unlock();
3034 :
3035 22 : auto account = pimpl_->account_.lock();
3036 22 : std::vector<libjami::MediaMap> mediaMap = mediaList.empty() ? MediaAttribute::mediaAttributesToMediaMaps(
3037 41 : pimpl_->account_.lock()->createDefaultMediaList(
3038 60 : pimpl_->account_.lock()->isVideoEnabled()))
3039 41 : : mediaList;
3040 :
3041 22 : if (!sendCallRequest || (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_)) {
3042 11 : confId = confId == "0" ? Manager::instance().callFactory.getNewCallID() : confId;
3043 : // TODO attach host with media list
3044 11 : hostConference(conversationId, confId, "", mediaMap);
3045 11 : return {};
3046 : }
3047 :
3048 : // Else we need to create a call
3049 11 : auto& manager = Manager::instance();
3050 11 : std::shared_ptr<SIPCall> call = manager.callFactory.newSipCall(account, Call::CallType::OUTGOING, mediaMap);
3051 :
3052 11 : if (not call)
3053 0 : return {};
3054 :
3055 11 : auto callUri = fmt::format("{}/{}/{}/{}", conversationId, uri, deviceId, confId);
3056 44 : account->getIceOptions([call,
3057 11 : accountId = account->getAccountID(),
3058 : callUri,
3059 11 : uri = std::move(uri),
3060 : conversationId,
3061 : deviceId,
3062 11 : cb = std::move(cb)](auto&& opts) {
3063 11 : if (call->isIceEnabled()) {
3064 11 : if (not call->createIceMediaTransport(false)
3065 22 : or not call->initIceMediaTransport(true, std::forward<dhtnet::IceTransportOptions>(opts))) {
3066 0 : return;
3067 : }
3068 : }
3069 44 : JAMI_DEBUG("New outgoing call with {}", uri);
3070 11 : call->setPeerNumber(uri);
3071 11 : call->setPeerUri("swarm:" + uri);
3072 :
3073 44 : JAMI_DEBUG("Calling: {:s}", callUri);
3074 11 : call->setState(Call::ConnectionState::TRYING);
3075 11 : call->setPeerNumber(callUri);
3076 11 : call->setPeerUri("rdv:" + callUri);
3077 22 : call->addStateListener(
3078 11 : [accountId, conversationId](Call::CallState call_state, Call::ConnectionState cnx_state, int) {
3079 62 : if (cnx_state == Call::ConnectionState::DISCONNECTED && call_state == Call::CallState::MERROR) {
3080 2 : emitSignal<libjami::ConfigurationSignal::NeedsHost>(accountId, conversationId);
3081 2 : return true;
3082 : }
3083 60 : return true;
3084 : });
3085 11 : cb(callUri, DeviceId(deviceId), call);
3086 : });
3087 :
3088 11 : return call;
3089 22 : }
3090 :
3091 : void
3092 14 : ConversationModule::hostConference(const std::string& conversationId,
3093 : const std::string& confId,
3094 : const std::string& callId,
3095 : const std::vector<libjami::MediaMap>& mediaList)
3096 : {
3097 14 : auto acc = pimpl_->account_.lock();
3098 14 : if (!acc)
3099 0 : return;
3100 14 : auto conf = acc->getConference(confId);
3101 14 : auto createConf = !conf;
3102 14 : std::shared_ptr<SIPCall> call;
3103 14 : if (!callId.empty()) {
3104 3 : call = std::dynamic_pointer_cast<SIPCall>(acc->getCall(callId));
3105 3 : if (!call) {
3106 0 : JAMI_WARNING("No call with id {} found", callId);
3107 0 : return;
3108 : }
3109 : }
3110 14 : if (createConf) {
3111 14 : conf = std::make_shared<Conference>(acc, confId);
3112 14 : acc->attach(conf);
3113 : }
3114 :
3115 14 : if (!callId.empty())
3116 3 : conf->addSubCall(callId);
3117 :
3118 14 : if (callId.empty())
3119 11 : conf->attachHost(mediaList);
3120 :
3121 14 : if (createConf) {
3122 14 : emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(), conversationId, conf->getConfId());
3123 : } else {
3124 0 : conf->reportMediaNegotiationStatus();
3125 0 : emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(), conf->getConfId(), conf->getStateStr());
3126 0 : return;
3127 : }
3128 :
3129 14 : auto conv = pimpl_->getConversation(conversationId);
3130 14 : if (!conv)
3131 0 : return;
3132 14 : std::unique_lock lk(conv->mtx);
3133 14 : if (!conv->conversation) {
3134 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3135 0 : return;
3136 : }
3137 : // Add commit to conversation
3138 14 : Json::Value value;
3139 14 : value["uri"] = pimpl_->username_;
3140 14 : value["device"] = pimpl_->deviceId_;
3141 14 : value["confId"] = conf->getConfId();
3142 14 : value["type"] = "application/call-history+json";
3143 28 : conv->conversation->hostConference(std::move(value),
3144 14 : [w = pimpl_->weak(), conversationId](bool ok, const std::string& commitId) {
3145 14 : if (ok) {
3146 14 : if (auto shared = w.lock())
3147 14 : shared->sendMessageNotification(conversationId, true, commitId);
3148 : } else {
3149 0 : JAMI_ERR("Failed to send message to conversation %s",
3150 : conversationId.c_str());
3151 : }
3152 14 : });
3153 :
3154 : // When conf finished = remove host & commit
3155 : // Master call, so when it's stopped, the conference will be stopped (as we use the hold
3156 : // state for detaching the call)
3157 28 : conf->onShutdown([w = pimpl_->weak(),
3158 14 : accountUri = pimpl_->username_,
3159 14 : confId = conf->getConfId(),
3160 : conversationId,
3161 : conv](int duration) {
3162 14 : auto shared = w.lock();
3163 14 : if (shared) {
3164 10 : Json::Value value;
3165 10 : value["uri"] = accountUri;
3166 10 : value["device"] = shared->deviceId_;
3167 10 : value["confId"] = confId;
3168 10 : value["type"] = "application/call-history+json";
3169 10 : value["duration"] = std::to_string(duration);
3170 :
3171 10 : std::lock_guard lk(conv->mtx);
3172 10 : if (!conv->conversation) {
3173 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3174 0 : return;
3175 : }
3176 10 : conv->conversation
3177 10 : ->removeActiveConference(std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
3178 10 : if (ok) {
3179 10 : if (auto shared = w.lock()) {
3180 10 : shared->sendMessageNotification(conversationId, true, commitId);
3181 10 : }
3182 : } else {
3183 0 : JAMI_ERROR("Failed to send message to conversation {}", conversationId);
3184 : }
3185 10 : });
3186 10 : }
3187 14 : });
3188 14 : }
3189 :
3190 : std::map<std::string, ConvInfo>
3191 878 : ConversationModule::convInfos(const std::string& accountId)
3192 : {
3193 1756 : return convInfosFromPath(fileutils::get_data_dir() / accountId);
3194 : }
3195 :
3196 : std::map<std::string, ConvInfo>
3197 918 : ConversationModule::convInfosFromPath(const std::filesystem::path& path)
3198 : {
3199 918 : std::map<std::string, ConvInfo> convInfos;
3200 : try {
3201 : // read file
3202 1835 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
3203 918 : auto file = fileutils::loadFile("convInfo", path);
3204 : // load values
3205 918 : msgpack::unpacked result;
3206 918 : msgpack::unpack(result, (const char*) file.data(), file.size());
3207 916 : result.get().convert(convInfos);
3208 924 : } catch (const std::exception& e) {
3209 2 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3210 2 : }
3211 918 : return convInfos;
3212 0 : }
3213 :
3214 : std::map<std::string, ConversationRequest>
3215 866 : ConversationModule::convRequests(const std::string& accountId)
3216 : {
3217 1731 : return convRequestsFromPath(fileutils::get_data_dir() / accountId);
3218 : }
3219 :
3220 : std::map<std::string, ConversationRequest>
3221 906 : ConversationModule::convRequestsFromPath(const std::filesystem::path& path)
3222 : {
3223 906 : std::map<std::string, ConversationRequest> convRequests;
3224 : try {
3225 : // read file
3226 1812 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convRequests"));
3227 906 : auto file = fileutils::loadFile("convRequests", path);
3228 : // load values
3229 905 : msgpack::unpacked result;
3230 906 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
3231 906 : result.get().convert(convRequests);
3232 906 : } catch (const std::exception& e) {
3233 0 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3234 0 : }
3235 905 : return convRequests;
3236 0 : }
3237 :
3238 : void
3239 186 : ConversationModule::addConvInfo(const ConvInfo& info)
3240 : {
3241 186 : pimpl_->addConvInfo(info);
3242 186 : }
3243 :
3244 : void
3245 2190 : ConversationModule::Impl::setConversationMembers(const std::string& convId, const std::set<std::string>& members)
3246 : {
3247 2190 : if (auto conv = getConversation(convId)) {
3248 2189 : std::lock_guard lk(conv->mtx);
3249 2190 : conv->info.members = members;
3250 2190 : addConvInfo(conv->info);
3251 4380 : }
3252 2190 : }
3253 :
3254 : std::shared_ptr<Conversation>
3255 0 : ConversationModule::getConversation(const std::string& convId)
3256 : {
3257 0 : if (auto conv = pimpl_->getConversation(convId)) {
3258 0 : std::lock_guard lk(conv->mtx);
3259 0 : return conv->conversation;
3260 0 : }
3261 0 : return nullptr;
3262 : }
3263 :
3264 : std::shared_ptr<dhtnet::ChannelSocket>
3265 5308 : ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const
3266 : {
3267 5308 : if (auto conv = pimpl_->getConversation(convId)) {
3268 5308 : std::lock_guard lk(conv->mtx);
3269 5308 : if (conv->conversation)
3270 9819 : return conv->conversation->gitSocket(DeviceId(deviceId));
3271 398 : else if (conv->pending)
3272 396 : return conv->pending->socket;
3273 10616 : }
3274 2 : return nullptr;
3275 : }
3276 :
3277 : void
3278 0 : ConversationModule::addGitSocket(std::string_view deviceId,
3279 : std::string_view convId,
3280 : const std::shared_ptr<dhtnet::ChannelSocket>& channel)
3281 : {
3282 0 : if (auto conv = pimpl_->getConversation(convId)) {
3283 0 : std::lock_guard lk(conv->mtx);
3284 0 : conv->conversation->addGitSocket(DeviceId(deviceId), channel);
3285 0 : } else
3286 0 : JAMI_WARNING("addGitSocket: Unable to find conversation {:s}", convId);
3287 0 : }
3288 :
3289 : void
3290 844 : ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
3291 : {
3292 1669 : pimpl_->withConversation(convId, [&](auto& conv) { conv.removeGitSocket(DeviceId(deviceId)); });
3293 844 : }
3294 :
3295 : void
3296 693 : ConversationModule::shutdownConnections()
3297 : {
3298 1109 : for (const auto& c : pimpl_->getSyncedConversations()) {
3299 416 : std::lock_guard lkc(c->mtx);
3300 416 : if (c->conversation)
3301 381 : c->conversation->shutdownConnections();
3302 416 : if (c->pending)
3303 26 : c->pending->socket = {};
3304 1109 : }
3305 693 : }
3306 : void
3307 698 : ConversationModule::addSwarmChannel(const std::string& conversationId, std::shared_ptr<dhtnet::ChannelSocket> channel)
3308 : {
3309 1396 : pimpl_->withConversation(conversationId, [&](auto& conv) { conv.addSwarmChannel(std::move(channel)); });
3310 699 : }
3311 :
3312 : void
3313 0 : ConversationModule::connectivityChanged()
3314 : {
3315 0 : for (const auto& conv : pimpl_->getConversations())
3316 0 : conv->connectivityChanged();
3317 0 : }
3318 :
3319 : std::shared_ptr<Typers>
3320 9 : ConversationModule::getTypers(const std::string& convId)
3321 : {
3322 9 : if (auto c = pimpl_->getConversation(convId)) {
3323 9 : std::lock_guard lk(c->mtx);
3324 9 : if (c->conversation)
3325 9 : return c->conversation->typers();
3326 18 : }
3327 0 : return nullptr;
3328 : }
3329 :
3330 : } // namespace jami
|