Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation_module.h"
19 :
20 : #include <algorithm>
21 : #include <fstream>
22 :
23 : #include <opendht/thread_pool.h>
24 :
25 : #include "account_const.h"
26 : #include "call.h"
27 : #include "client/ring_signal.h"
28 : #include "fileutils.h"
29 : #include "jamidht/account_manager.h"
30 : #include "jamidht/jamiaccount.h"
31 : #include "manager.h"
32 : #include "sip/sipcall.h"
33 : #include "vcard.h"
34 :
35 : namespace jami {
36 :
37 : using ConvInfoMap = std::map<std::string, ConvInfo>;
38 :
39 : struct PendingConversationFetch
40 : {
41 : bool ready {false};
42 : bool cloning {false};
43 : std::string deviceId {};
44 : std::string removeId {};
45 : std::map<std::string, std::string> preferences {};
46 : std::map<std::string, std::map<std::string, std::string>> status {};
47 : std::set<std::string> connectingTo {};
48 : std::shared_ptr<dhtnet::ChannelSocket> socket {};
49 : };
50 :
51 : constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s};
52 :
53 : struct SyncedConversation
54 : {
55 : std::mutex mtx;
56 : std::unique_ptr<asio::steady_timer> fallbackClone;
57 : std::chrono::seconds fallbackTimer {5s};
58 : ConvInfo info;
59 : std::unique_ptr<PendingConversationFetch> pending;
60 : std::shared_ptr<Conversation> conversation;
61 :
62 365 : SyncedConversation(const std::string& convId)
63 365 : : info {convId}
64 : {
65 365 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
66 365 : }
67 31 : SyncedConversation(const ConvInfo& info)
68 31 : : info {info}
69 : {
70 31 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
71 31 : }
72 :
73 2951 : bool startFetch(const std::string& deviceId, bool checkIfConv = false)
74 : {
75 : // conversation mtx must be locked
76 2951 : if (checkIfConv && conversation)
77 20 : return false; // Already a conversation
78 2931 : if (pending) {
79 1107 : if (pending->ready)
80 46 : return false; // Already doing stuff
81 : // if (pending->deviceId == deviceId)
82 : // return false; // Already fetching
83 1061 : if (pending->connectingTo.find(deviceId) != pending->connectingTo.end())
84 443 : return false; // Already connecting to this device
85 : } else {
86 1824 : pending = std::make_unique<PendingConversationFetch>();
87 1826 : pending->connectingTo.insert(deviceId);
88 1825 : return true;
89 : }
90 617 : return true;
91 : }
92 :
93 1599 : void stopFetch(const std::string& deviceId)
94 : {
95 : // conversation mtx must be locked
96 1599 : if (!pending)
97 79 : return;
98 1520 : pending->connectingTo.erase(deviceId);
99 1520 : if (pending->connectingTo.empty())
100 1136 : pending.reset();
101 : }
102 :
103 117 : std::vector<std::map<std::string, std::string>> getMembers(bool includeLeft,
104 : bool includeBanned) const
105 : {
106 : // conversation mtx must be locked
107 117 : if (conversation)
108 99 : return conversation->getMembers(true, includeLeft, includeBanned);
109 : // If we're cloning, we can return the initial members
110 18 : std::vector<std::map<std::string, std::string>> result;
111 18 : result.reserve(info.members.size());
112 53 : for (const auto& uri : info.members) {
113 70 : result.emplace_back(std::map<std::string, std::string> {{"uri", uri}});
114 : }
115 18 : return result;
116 18 : }
117 : };
118 :
119 : class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
120 : {
121 : public:
122 : Impl(std::shared_ptr<JamiAccount>&& account,
123 : std::shared_ptr<AccountManager>&& accountManager,
124 : NeedsSyncingCb&& needsSyncingCb,
125 : SengMsgCb&& sendMsgCb,
126 : NeedSocketCb&& onNeedSocket,
127 : NeedSocketCb&& onNeedSwarmSocket,
128 : OneToOneRecvCb&& oneToOneRecvCb);
129 :
130 : template<typename S, typename T>
131 118 : inline auto withConv(const S& convId, T&& cb) const
132 : {
133 236 : if (auto conv = getConversation(convId)) {
134 117 : std::lock_guard lk(conv->mtx);
135 117 : return cb(*conv);
136 117 : } else {
137 3 : JAMI_WARNING("Conversation {} not found", convId);
138 : }
139 1 : return decltype(cb(std::declval<SyncedConversation&>()))();
140 : }
141 : template<typename S, typename T>
142 4737 : inline auto withConversation(const S& convId, T&& cb)
143 : {
144 9474 : if (auto conv = getConversation(convId)) {
145 4727 : std::lock_guard lk(conv->mtx);
146 4727 : if (conv->conversation)
147 4697 : return cb(*conv->conversation);
148 4727 : } else {
149 30 : JAMI_WARNING("Conversation {} not found", convId);
150 : }
151 40 : return decltype(cb(std::declval<Conversation&>()))();
152 : }
153 :
154 : // Retrieving recent commits
155 : /**
156 : * Clone a conversation (initial) from device
157 : * @param deviceId
158 : * @param convId
159 : */
160 : void cloneConversation(const std::string& deviceId,
161 : const std::string& peer,
162 : const std::string& convId);
163 : void cloneConversation(const std::string& deviceId,
164 : const std::string& peer,
165 : const std::shared_ptr<SyncedConversation>& conv);
166 :
167 : /**
168 : * Pull remote device
169 : * @param peer Contact URI
170 : * @param deviceId Contact's device
171 : * @param conversationId
172 : * @param commitId (optional)
173 : */
174 : void fetchNewCommits(const std::string& peer,
175 : const std::string& deviceId,
176 : const std::string& conversationId,
177 : const std::string& commitId = "");
178 : /**
179 : * Handle events to receive new commits
180 : */
181 : void handlePendingConversation(const std::string& conversationId, const std::string& deviceId);
182 :
183 : // Requests
184 : std::optional<ConversationRequest> getRequest(const std::string& id) const;
185 :
186 : // Conversations
187 : /**
188 : * Get members
189 : * @param conversationId
190 : * @param includeBanned
191 : * @return a map of members with their role and details
192 : */
193 : std::vector<std::map<std::string, std::string>> getConversationMembers(
194 : const std::string& conversationId, bool includeBanned = false) const;
195 : void setConversationMembers(const std::string& convId, const std::set<std::string>& members);
196 :
197 : /**
198 : * Remove a repository and all files
199 : * @param convId
200 : * @param sync If we send an update to other account's devices
201 : * @param force True if ignore the removing flag
202 : */
203 : void removeRepository(const std::string& convId, bool sync, bool force = false);
204 : void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false);
205 : /**
206 : * Remove a conversation
207 : * @param conversationId
208 : */
209 : bool removeConversation(const std::string& conversationId);
210 : bool removeConversationImpl(SyncedConversation& conv);
211 :
212 : /**
213 : * Send a message notification to all members
214 : * @param conversation
215 : * @param commit
216 : * @param sync If we send an update to other account's devices
217 : * @param deviceId If we need to filter a specific device
218 : */
219 : void sendMessageNotification(const std::string& conversationId,
220 : bool sync,
221 : const std::string& commitId = "",
222 : const std::string& deviceId = "");
223 : void sendMessageNotification(Conversation& conversation,
224 : bool sync,
225 : const std::string& commitId = "",
226 : const std::string& deviceId = "");
227 :
228 : /**
229 : * @return if a convId is a valid conversation (repository cloned & usable)
230 : */
231 9704 : bool isConversation(const std::string& convId) const
232 : {
233 9704 : std::lock_guard lk(conversationsMtx_);
234 9704 : auto c = conversations_.find(convId);
235 19408 : return c != conversations_.end() && c->second;
236 9704 : }
237 :
238 2761 : void addConvInfo(const ConvInfo& info)
239 : {
240 2761 : std::lock_guard lk(convInfosMtx_);
241 2761 : convInfos_[info.id] = info;
242 2761 : saveConvInfos();
243 2761 : }
244 :
245 : std::string getOneToOneConversation(const std::string& uri) const noexcept;
246 :
247 : bool updateConvForContact(const std::string& uri,
248 : const std::string& oldConv,
249 : const std::string& newConv);
250 :
251 :
252 118 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const
253 : {
254 118 : std::lock_guard lk(conversationsMtx_);
255 118 : auto c = conversations_.find(convId);
256 236 : return c != conversations_.end() ? c->second : nullptr;
257 118 : }
258 35959 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId)
259 : {
260 35959 : std::lock_guard lk(conversationsMtx_);
261 35961 : auto c = conversations_.find(convId);
262 71917 : return c != conversations_.end() ? c->second : nullptr;
263 35959 : }
264 396 : std::shared_ptr<SyncedConversation> startConversation(const std::string& convId)
265 : {
266 396 : std::lock_guard lk(conversationsMtx_);
267 396 : auto& c = conversations_[convId];
268 396 : if (!c)
269 347 : c = std::make_shared<SyncedConversation>(convId);
270 792 : return c;
271 396 : }
272 107 : std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info)
273 : {
274 107 : std::lock_guard lk(conversationsMtx_);
275 107 : auto& c = conversations_[info.id];
276 107 : if (!c)
277 17 : c = std::make_shared<SyncedConversation>(info);
278 214 : return c;
279 107 : }
280 2782 : std::vector<std::shared_ptr<SyncedConversation>> getSyncedConversations() const
281 : {
282 2782 : std::lock_guard lk(conversationsMtx_);
283 2782 : std::vector<std::shared_ptr<SyncedConversation>> result;
284 2782 : result.reserve(conversations_.size());
285 3922 : for (const auto& [_, c] : conversations_)
286 1140 : result.emplace_back(c);
287 5564 : return result;
288 2782 : }
289 420 : std::vector<std::shared_ptr<Conversation>> getConversations() const
290 : {
291 420 : std::lock_guard lk(conversationsMtx_);
292 420 : std::vector<std::shared_ptr<Conversation>> result;
293 419 : result.reserve(conversations_.size());
294 670 : for (const auto& [_, sc] : conversations_) {
295 250 : if (auto c = sc->conversation)
296 250 : result.emplace_back(std::move(c));
297 : }
298 840 : return result;
299 420 : }
300 :
301 : // Message send/load
302 : void sendMessage(const std::string& conversationId,
303 : Json::Value&& value,
304 : const std::string& replyTo = "",
305 : bool announce = true,
306 : OnCommitCb&& onCommit = {},
307 : OnDoneCb&& cb = {});
308 :
309 : void sendMessage(const std::string& conversationId,
310 : std::string message,
311 : const std::string& replyTo = "",
312 : const std::string& type = "text/plain",
313 : bool announce = true,
314 : OnCommitCb&& onCommit = {},
315 : OnDoneCb&& cb = {});
316 :
317 : void editMessage(const std::string& conversationId,
318 : const std::string& newBody,
319 : const std::string& editedId);
320 :
321 : void bootstrapCb(std::string convId);
322 :
323 : // The following methods modify what is stored on the disk
324 : /**
325 : * @note convInfosMtx_ should be locked
326 : */
327 3450 : void saveConvInfos() const { ConversationModule::saveConvInfos(accountId_, convInfos_); }
328 : /**
329 : * @note conversationsRequestsMtx_ should be locked
330 : */
331 489 : void saveConvRequests() const
332 : {
333 489 : ConversationModule::saveConvRequests(accountId_, conversationsRequests_);
334 489 : }
335 : void declineOtherConversationWith(const std::string& uri) noexcept;
336 224 : bool addConversationRequest(const std::string& id, const ConversationRequest& req)
337 : {
338 : // conversationsRequestsMtx_ MUST BE LOCKED
339 224 : if (isConversation(id))
340 0 : return false;
341 224 : auto it = conversationsRequests_.find(id);
342 224 : if (it != conversationsRequests_.end()) {
343 : // We only remove requests (if accepted) or change .declined
344 24 : if (!req.declined)
345 20 : return false;
346 200 : } else if (req.isOneToOne()) {
347 : // Check that we're not adding a second one to one trust request
348 : // NOTE: If a new one to one request is received, we can decline the previous one.
349 66 : declineOtherConversationWith(req.from);
350 : }
351 612 : JAMI_DEBUG("Adding conversation request from {} ({})", req.from, id);
352 204 : conversationsRequests_[id] = req;
353 204 : saveConvRequests();
354 204 : return true;
355 : }
356 275 : void rmConversationRequest(const std::string& id)
357 : {
358 : // conversationsRequestsMtx_ MUST BE LOCKED
359 275 : auto it = conversationsRequests_.find(id);
360 275 : if (it != conversationsRequests_.end()) {
361 176 : auto& md = syncingMetadatas_[id];
362 176 : md = it->second.metadatas;
363 176 : md["syncing"] = "true";
364 176 : md["created"] = std::to_string(it->second.received);
365 : }
366 275 : saveMetadatas();
367 275 : conversationsRequests_.erase(id);
368 275 : saveConvRequests();
369 275 : }
370 :
371 : std::weak_ptr<JamiAccount> account_;
372 : std::shared_ptr<AccountManager> accountManager_;
373 : NeedsSyncingCb needsSyncingCb_;
374 : SengMsgCb sendMsgCb_;
375 : NeedSocketCb onNeedSocket_;
376 : NeedSocketCb onNeedSwarmSocket_;
377 : OneToOneRecvCb oneToOneRecvCb_;
378 :
379 : std::string accountId_ {};
380 : std::string deviceId_ {};
381 : std::string username_ {};
382 :
383 : // Requests
384 : mutable std::mutex conversationsRequestsMtx_;
385 : std::map<std::string, ConversationRequest> conversationsRequests_;
386 :
387 : // Conversations
388 : mutable std::mutex conversationsMtx_ {};
389 : std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_;
390 :
391 : // The following information are stored on the disk
392 : mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed
393 : std::map<std::string, ConvInfo> convInfos_;
394 :
395 : // When sending a new message, we need to send the notification to some peers of the
396 : // conversation However, the conversation may be not bootstraped, so the list will be empty.
397 : // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync
398 : // with.
399 : std::mutex notSyncedNotificationMtx_;
400 : std::map<std::string, std::string> notSyncedNotification_;
401 :
402 3715 : std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
403 :
404 : // Replay conversations (after erasing/re-adding)
405 : std::mutex replayMtx_;
406 : std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_;
407 : std::map<std::string, uint64_t> refreshMessage;
408 : std::atomic_int syncCnt {0};
409 :
410 : #ifdef LIBJAMI_TESTABLE
411 : std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_;
412 : #endif
413 :
414 : void fixStructures(
415 : std::shared_ptr<JamiAccount> account,
416 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
417 : const std::set<std::string>& toRm);
418 :
419 : void cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
420 : const std::string& deviceId,
421 : const std::string& oldConvId = "");
422 : void bootstrap(const std::string& convId);
423 : void fallbackClone(const asio::error_code& ec, const std::string& conversationId);
424 : void cloneConversationFrom(const std::string& conversationId,
425 : const std::string& uri,
426 : const std::string& oldConvId = "");
427 :
428 : // While syncing, we do not want to lose metadata (avatar/title and mode)
429 : std::map<std::string, std::map<std::string, std::string>> syncingMetadatas_;
430 477 : void saveMetadatas()
431 : {
432 477 : auto path = fileutils::get_data_dir() / accountId_;
433 954 : std::ofstream file(path / "syncingMetadatas", std::ios::trunc | std::ios::binary);
434 477 : msgpack::pack(file, syncingMetadatas_);
435 477 : }
436 :
437 677 : void loadMetadatas()
438 : {
439 : try {
440 : // read file
441 677 : auto path = fileutils::get_data_dir() / accountId_;
442 1354 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
443 1354 : auto file = fileutils::loadFile("syncingMetadatas", path);
444 : // load values
445 0 : msgpack::unpacked result;
446 0 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
447 0 : result.get().convert(syncingMetadatas_);
448 2031 : } catch (const std::exception& e) {
449 2031 : JAMI_WARNING("[ConversationModule] error loading syncingMetadatas_: {}", e.what());
450 677 : }
451 677 : }
452 : };
453 :
454 677 : ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account,
455 : std::shared_ptr<AccountManager>&& accountManager,
456 : NeedsSyncingCb&& needsSyncingCb,
457 : SengMsgCb&& sendMsgCb,
458 : NeedSocketCb&& onNeedSocket,
459 : NeedSocketCb&& onNeedSwarmSocket,
460 677 : OneToOneRecvCb&& oneToOneRecvCb)
461 677 : : account_(account)
462 677 : , accountManager_(accountManager)
463 677 : , needsSyncingCb_(needsSyncingCb)
464 677 : , sendMsgCb_(sendMsgCb)
465 676 : , onNeedSocket_(onNeedSocket)
466 677 : , onNeedSwarmSocket_(onNeedSwarmSocket)
467 676 : , oneToOneRecvCb_(oneToOneRecvCb)
468 1353 : , accountId_(account->getAccountID())
469 : {
470 677 : if (auto accm = account->accountManager())
471 677 : if (const auto* info = accm->getInfo()) {
472 677 : deviceId_ = info->deviceId;
473 677 : username_ = info->accountId;
474 677 : }
475 677 : conversationsRequests_ = convRequests(accountId_);
476 677 : loadMetadatas();
477 677 : }
478 :
479 : void
480 25 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
481 : const std::string& peerUri,
482 : const std::string& convId)
483 : {
484 75 : JAMI_DEBUG("[Account {}] Clone conversation on device {}", accountId_, deviceId);
485 :
486 25 : auto conv = startConversation(convId);
487 25 : std::unique_lock lk(conv->mtx);
488 25 : cloneConversation(deviceId, peerUri, conv);
489 25 : }
490 :
491 : void
492 154 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
493 : const std::string& peerUri,
494 : const std::shared_ptr<SyncedConversation>& conv)
495 : {
496 : // conv->mtx must be locked
497 154 : if (!conv->conversation) {
498 : // Note: here we don't return and connect to all members
499 : // the first that will successfully connect will be used for
500 : // cloning.
501 : // This avoid the case when we try to clone from convInfos + sync message
502 : // at the same time.
503 154 : if (!conv->startFetch(deviceId, true)) {
504 129 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conv->info.id);
505 43 : addConvInfo(conv->info);
506 43 : return;
507 : }
508 222 : onNeedSocket_(
509 111 : conv->info.id,
510 : deviceId,
511 111 : [w = weak(), conv, deviceId](const auto& channel) {
512 111 : std::lock_guard lk(conv->mtx);
513 111 : if (conv->pending && !conv->pending->ready) {
514 111 : if (channel) {
515 111 : conv->pending->ready = true;
516 111 : conv->pending->deviceId = channel->deviceId().toString();
517 111 : conv->pending->socket = channel;
518 111 : if (!conv->pending->cloning) {
519 111 : conv->pending->cloning = true;
520 333 : dht::ThreadPool::io().run([w,
521 111 : convId = conv->info.id,
522 111 : deviceId = conv->pending->deviceId]() {
523 222 : if (auto sthis = w.lock())
524 111 : sthis->handlePendingConversation(convId, deviceId);
525 : });
526 : }
527 111 : return true;
528 : } else {
529 0 : conv->stopFetch(deviceId);
530 : }
531 : }
532 0 : return false;
533 111 : },
534 : MIME_TYPE_GIT);
535 :
536 333 : JAMI_LOG("[Account {}] New conversation detected: {}. Ask device {} to clone it",
537 : accountId_,
538 : conv->info.id,
539 : deviceId);
540 111 : conv->info.members.emplace(username_);
541 111 : conv->info.members.emplace(peerUri);
542 111 : addConvInfo(conv->info);
543 : } else {
544 0 : JAMI_DEBUG("[Account {}] Already have conversation {}", accountId_, conv->info.id);
545 : }
546 : }
547 :
548 : void
549 16295 : ConversationModule::Impl::fetchNewCommits(const std::string& peer,
550 : const std::string& deviceId,
551 : const std::string& conversationId,
552 : const std::string& commitId)
553 : {
554 : {
555 16295 : std::lock_guard lk(convInfosMtx_);
556 16300 : auto itConv = convInfos_.find(conversationId);
557 16300 : if (itConv != convInfos_.end() && itConv->second.isRemoved()) {
558 : // If the conversation is removed and we receives a new commit,
559 : // it means that the contact was removed but not banned.
560 : // If he wants a new conversation, they must removes/re-add the contact who declined.
561 6 : JAMI_WARNING("[Account {:s}] Received a commit for {}, but conversation is removed",
562 : accountId_,
563 : conversationId);
564 2 : return;
565 : }
566 16296 : }
567 16295 : std::optional<ConversationRequest> oldReq;
568 : {
569 16295 : std::lock_guard lk(conversationsRequestsMtx_);
570 16294 : oldReq = getRequest(conversationId);
571 16292 : if (oldReq != std::nullopt && oldReq->declined) {
572 6 : JAMI_DEBUG("[Account {}] Received a request for a conversation already declined.",
573 : accountId_);
574 2 : return;
575 : }
576 16296 : }
577 48873 : JAMI_DEBUG("[Account {:s}] fetch commits from {:s}, for {:s}, commit {:s}",
578 : accountId_,
579 : peer,
580 : conversationId,
581 : commitId);
582 :
583 16296 : auto conv = getConversation(conversationId);
584 16296 : if (!conv) {
585 3801 : JAMI_WARNING("[Account {}] Unable to find conversation {}, ask for an invite",
586 : accountId_,
587 : conversationId);
588 1267 : sendMsgCb_(peer,
589 : {},
590 3801 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
591 : 0);
592 1267 : return;
593 : }
594 15029 : std::unique_lock lk(conv->mtx);
595 :
596 15028 : if (conv->conversation) {
597 : // Check if we already have the commit
598 14813 : if (not commitId.empty() && conv->conversation->getCommit(commitId) != std::nullopt) {
599 12571 : return;
600 : }
601 2623 : if (conv->conversation->isRemoving()) {
602 0 : JAMI_WARNING("[Account {}] Conversation {} is being removed",
603 : accountId_,
604 : conversationId);
605 0 : return;
606 : }
607 2623 : if (!conv->conversation->isMember(peer, true)) {
608 9 : JAMI_WARNING("[Account {}] {} is not a member of {}", accountId_, peer, conversationId);
609 3 : return;
610 : }
611 2620 : if (conv->conversation->isBanned(deviceId)) {
612 0 : JAMI_WARNING("[Account {}] {} is a banned device in conversation {}",
613 : accountId_,
614 : deviceId,
615 : conversationId);
616 0 : return;
617 : }
618 :
619 : // Retrieve current last message
620 2620 : auto lastMessageId = conv->conversation->lastCommitId();
621 2619 : if (lastMessageId.empty()) {
622 0 : JAMI_ERROR("[Account {}] No message detected. This is a bug", accountId_);
623 0 : return;
624 : }
625 :
626 2619 : if (!conv->startFetch(deviceId)) {
627 1137 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
628 379 : return;
629 : }
630 :
631 2238 : syncCnt.fetch_add(1);
632 11200 : onNeedSocket_(
633 : conversationId,
634 : deviceId,
635 3754 : [w = weak(),
636 : conv,
637 2241 : conversationId = std::move(conversationId),
638 2241 : peer = std::move(peer),
639 2241 : deviceId = std::move(deviceId),
640 2240 : commitId = std::move(commitId)](const auto& channel) {
641 3754 : auto sthis = w.lock();
642 3753 : auto acc = sthis ? sthis->account_.lock() : nullptr;
643 3751 : std::unique_lock lk(conv->mtx);
644 3753 : auto conversation = conv->conversation;
645 3753 : if (!channel || !acc || !conversation) {
646 1591 : conv->stopFetch(deviceId);
647 1591 : if (sthis)
648 1591 : sthis->syncCnt.fetch_sub(1);
649 1591 : return false;
650 : }
651 2163 : conversation->addGitSocket(channel->deviceId(), channel);
652 2163 : lk.unlock();
653 12974 : conversation->sync(
654 2161 : peer,
655 2161 : deviceId,
656 2163 : [w,
657 2163 : conv,
658 2163 : conversationId = std::move(conversationId),
659 2163 : peer = std::move(peer),
660 2162 : deviceId = std::move(deviceId),
661 2162 : commitId = std::move(commitId)](bool ok) {
662 2163 : auto shared = w.lock();
663 2163 : if (!shared)
664 0 : return;
665 2163 : if (!ok) {
666 3207 : JAMI_WARNING("[Account {}] Unable to fetch new commit from "
667 : "{} for {}, other "
668 : "peer may be disconnected",
669 : shared->accountId_,
670 : deviceId,
671 : conversationId);
672 3207 : JAMI_LOG("[Account {}] Relaunch sync with {} for {}",
673 : shared->accountId_,
674 : deviceId,
675 : conversationId);
676 : }
677 :
678 : {
679 2163 : std::lock_guard lk(conv->mtx);
680 2163 : conv->pending.reset();
681 : // Notify peers that a new commit is there (DRT)
682 2163 : if (not commitId.empty() && ok) {
683 2030 : shared->sendMessageNotification(*conv->conversation,
684 : false,
685 1015 : commitId,
686 1015 : deviceId);
687 : }
688 2163 : }
689 4326 : if (shared->syncCnt.fetch_sub(1) == 1) {
690 199 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(shared->accountId_);
691 : }
692 2163 : },
693 2162 : commitId);
694 2163 : return true;
695 3754 : },
696 : "");
697 2620 : } else {
698 216 : if (oldReq != std::nullopt)
699 3 : return;
700 213 : if (conv->pending)
701 130 : return;
702 83 : bool clone = !conv->info.isRemoved();
703 83 : if (clone) {
704 82 : cloneConversation(deviceId, peer, conv);
705 82 : return;
706 : }
707 1 : lk.unlock();
708 3 : JAMI_WARNING("[Account {}] Unable to find conversation {}, ask for an invite",
709 : accountId_,
710 : conversationId);
711 1 : sendMsgCb_(peer,
712 : {},
713 3 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
714 : 0);
715 : }
716 43135 : }
717 :
718 : // Clone and store conversation
719 : void
720 193 : ConversationModule::Impl::handlePendingConversation(const std::string& conversationId,
721 : const std::string& deviceId)
722 : {
723 193 : auto acc = account_.lock();
724 193 : if (!acc)
725 0 : return;
726 193 : std::vector<DeviceId> kd;
727 : {
728 193 : std::unique_lock lk(conversationsMtx_);
729 193 : const auto& devices = accountManager_->getKnownDevices();
730 193 : kd.reserve(devices.size());
731 1427 : for (const auto& [id, _] : devices)
732 1234 : kd.emplace_back(id);
733 193 : }
734 193 : auto conv = getConversation(conversationId);
735 193 : if (!conv)
736 0 : return;
737 193 : std::unique_lock lk(conv->mtx, std::defer_lock);
738 379 : auto erasePending = [&] {
739 379 : std::string toRm;
740 379 : if (conv->pending && !conv->pending->removeId.empty())
741 6 : toRm = std::move(conv->pending->removeId);
742 379 : conv->pending.reset();
743 379 : lk.unlock();
744 379 : if (!toRm.empty())
745 6 : removeConversation(toRm);
746 379 : };
747 : try {
748 193 : auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
749 186 : conversation->onMembersChanged([w=weak_from_this(), conversationId](const auto& members) {
750 : // Delay in another thread to avoid deadlocks
751 2832 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
752 2832 : if (auto sthis = w.lock())
753 1416 : sthis->setConversationMembers(conversationId, members);
754 : });
755 1416 : });
756 186 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
757 492 : auto msg = std::make_shared<SyncMsg>();
758 984 : msg->ms = {{conversationId, status}};
759 492 : needsSyncingCb_(std::move(msg));
760 492 : });
761 186 : conversation->onNeedSocket(onNeedSwarmSocket_);
762 186 : if (!conversation->isMember(username_, true)) {
763 0 : JAMI_ERR("Conversation cloned but does not seems to be a valid member");
764 0 : conversation->erase();
765 0 : lk.lock();
766 0 : erasePending();
767 0 : return;
768 : }
769 :
770 : // Make sure that the list of members stored in convInfos_ matches the
771 : // one from the conversation's repository.
772 : // (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1026)
773 186 : setConversationMembers(conversationId, conversation->memberUris("", {}));
774 :
775 186 : lk.lock();
776 :
777 186 : if (conv->pending && conv->pending->socket)
778 186 : conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket));
779 186 : auto removeRepo = false;
780 : // Note: a removeContact while cloning. In this case, the conversation
781 : // must not be announced and removed.
782 186 : if (conv->info.isRemoved())
783 0 : removeRepo = true;
784 186 : std::map<std::string, std::string> preferences;
785 186 : std::map<std::string, std::map<std::string, std::string>> status;
786 186 : if (conv->pending) {
787 186 : preferences = std::move(conv->pending->preferences);
788 186 : status = std::move(conv->pending->status);
789 : }
790 186 : conv->conversation = conversation;
791 186 : if (removeRepo) {
792 0 : removeRepositoryImpl(*conv, false, true);
793 0 : erasePending();
794 0 : return;
795 : }
796 :
797 186 : auto commitId = conversation->join();
798 186 : std::vector<std::map<std::string, std::string>> messages;
799 : {
800 186 : std::lock_guard lk(replayMtx_);
801 186 : auto replayIt = replay_.find(conversationId);
802 186 : if (replayIt != replay_.end()) {
803 7 : messages = std::move(replayIt->second);
804 7 : replay_.erase(replayIt);
805 : }
806 186 : }
807 186 : if (!commitId.empty())
808 150 : sendMessageNotification(*conversation, false, commitId);
809 186 : erasePending(); // Will unlock
810 :
811 : #ifdef LIBJAMI_TESTABLE
812 186 : conversation->onBootstrapStatus(bootstrapCbTest_);
813 : #endif // LIBJAMI_TESTABLE
814 372 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
815 186 : this,
816 372 : conversation->id()),
817 : kd);
818 :
819 186 : if (!preferences.empty())
820 1 : conversation->updatePreferences(preferences);
821 186 : if (!status.empty())
822 25 : conversation->updateMessageStatus(status);
823 186 : syncingMetadatas_.erase(conversationId);
824 186 : saveMetadatas();
825 :
826 : // Inform user that the conversation is ready
827 186 : emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
828 186 : needsSyncingCb_({});
829 186 : std::vector<Json::Value> values;
830 186 : values.reserve(messages.size());
831 195 : for (const auto& message : messages) {
832 : // For now, only replay text messages.
833 : // File transfers will need more logic, and don't care about calls for now.
834 9 : if (message.at("type") == "text/plain" && message.at("author") == username_) {
835 2 : Json::Value json;
836 2 : json["body"] = message.at("body");
837 2 : json["type"] = "text/plain";
838 2 : values.emplace_back(std::move(json));
839 2 : }
840 : }
841 186 : if (!values.empty())
842 1 : conversation->sendMessages(std::move(values),
843 1 : [w = weak(), conversationId](const auto& commits) {
844 1 : auto shared = w.lock();
845 1 : if (shared and not commits.empty())
846 3 : shared->sendMessageNotification(conversationId,
847 : true,
848 2 : *commits.rbegin());
849 1 : });
850 : // Download members profile on first sync
851 186 : auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE;
852 186 : auto askForProfile = isOneOne;
853 186 : if (!isOneOne) {
854 : // If not 1:1 only download profiles from self (to avoid non checked files)
855 127 : auto cert = acc->certStore().getCertificate(deviceId);
856 127 : askForProfile = cert && cert->issuer
857 254 : && cert->issuer->getId().toString() == username_;
858 127 : }
859 186 : if (askForProfile) {
860 132 : for (const auto& member : conversation->memberUris(username_)) {
861 59 : acc->askForProfile(conversationId, deviceId, member);
862 73 : }
863 : }
864 193 : } catch (const std::exception& e) {
865 21 : JAMI_WARNING("Something went wrong when cloning conversation: {}. Re-clone in {}s", e.what(), conv->fallbackTimer.count());
866 7 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
867 7 : conv->fallbackTimer *= 2;
868 7 : if (conv->fallbackTimer > MAX_FALLBACK)
869 0 : conv->fallbackTimer = MAX_FALLBACK;
870 14 : conv->fallbackClone->async_wait(
871 14 : std::bind(&ConversationModule::Impl::fallbackClone,
872 14 : shared_from_this(),
873 : std::placeholders::_1,
874 : conversationId));
875 :
876 7 : }
877 193 : lk.lock();
878 193 : erasePending();
879 193 : }
880 :
881 : std::optional<ConversationRequest>
882 23690 : ConversationModule::Impl::getRequest(const std::string& id) const
883 : {
884 : // ConversationsRequestsMtx MUST BE LOCKED
885 23690 : auto it = conversationsRequests_.find(id);
886 23689 : if (it != conversationsRequests_.end())
887 8280 : return it->second;
888 15407 : return std::nullopt;
889 : }
890 :
891 : std::string
892 815 : ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
893 : {
894 815 : auto details = accountManager_->getContactDetails(uri);
895 815 : auto itRemoved = details.find("removed");
896 : // If contact is removed there is no conversation
897 815 : if (itRemoved != details.end() && itRemoved->second != "0") {
898 53 : auto itBanned = details.find("banned");
899 : // If banned, conversation is still on disk
900 53 : if (itBanned == details.end() || itBanned->second == "0") {
901 : // Check if contact is removed
902 44 : auto itAdded = details.find("added");
903 44 : if (std::stoi(itRemoved->second) > std::stoi(itAdded->second))
904 43 : return {};
905 : }
906 : }
907 772 : auto it = details.find(libjami::Account::TrustRequest::CONVERSATIONID);
908 772 : if (it != details.end())
909 367 : return it->second;
910 405 : return {};
911 815 : }
912 :
913 : bool
914 33 : ConversationModule::Impl::updateConvForContact(const std::string& uri,
915 : const std::string& oldConv,
916 : const std::string& newConv)
917 : {
918 33 : if (newConv != oldConv) {
919 33 : auto conversation = getOneToOneConversation(uri);
920 33 : if (conversation != oldConv) {
921 66 : JAMI_DEBUG("Old conversation is not found in details {} - found: {}",
922 : oldConv,
923 : conversation);
924 22 : return false;
925 : }
926 11 : accountManager_->updateContactConversation(uri, newConv);
927 11 : return true;
928 33 : }
929 0 : return false;
930 : }
931 :
932 : void
933 66 : ConversationModule::Impl::declineOtherConversationWith(const std::string& uri) noexcept
934 : {
935 : // conversationsRequestsMtx_ MUST BE LOCKED
936 67 : for (auto& [id, request] : conversationsRequests_) {
937 1 : if (request.declined)
938 0 : continue; // Ignore already declined requests
939 1 : if (request.isOneToOne() && request.from == uri) {
940 3 : JAMI_WARNING("Decline conversation request ({}) from {}", id, uri);
941 1 : request.declined = std::time(nullptr);
942 1 : syncingMetadatas_.erase(id);
943 1 : saveMetadatas();
944 1 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(accountId_, id);
945 : }
946 : }
947 66 : }
948 :
949 : std::vector<std::map<std::string, std::string>>
950 101 : ConversationModule::Impl::getConversationMembers(const std::string& conversationId,
951 : bool includeBanned) const
952 : {
953 : return withConv(conversationId,
954 202 : [&](const auto& conv) { return conv.getMembers(true, includeBanned); });
955 : }
956 :
957 : void
958 14 : ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force)
959 : {
960 14 : auto conv = getConversation(conversationId);
961 14 : if (!conv)
962 0 : return;
963 14 : std::unique_lock lk(conv->mtx);
964 14 : removeRepositoryImpl(*conv, sync, force);
965 14 : }
966 :
967 : void
968 30 : ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force)
969 : {
970 30 : if (conv.conversation && (force || conv.conversation->isRemoving())) {
971 : // Stop fetch!
972 22 : conv.pending.reset();
973 :
974 66 : JAMI_LOG("Remove conversation: {}", conv.info.id);
975 : try {
976 22 : if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
977 45 : for (const auto& member : conv.conversation->getInitialMembers()) {
978 30 : if (member != username_) {
979 : // Note: this can happen while re-adding a contact.
980 : // In this case, check that we are removing the linked conversation.
981 15 : if (conv.info.id == getOneToOneConversation(member)) {
982 0 : accountManager_->removeContactConversation(member);
983 : }
984 : }
985 15 : }
986 : }
987 0 : } catch (const std::exception& e) {
988 0 : JAMI_ERR() << e.what();
989 0 : }
990 22 : conv.conversation->erase();
991 22 : conv.conversation.reset();
992 :
993 22 : if (!sync)
994 1 : return;
995 :
996 21 : conv.info.erased = std::time(nullptr);
997 21 : needsSyncingCb_({});
998 21 : addConvInfo(conv.info);
999 : }
1000 : }
1001 :
1002 : bool
1003 17 : ConversationModule::Impl::removeConversation(const std::string& conversationId)
1004 : {
1005 33 : return withConv(conversationId, [this](auto& conv) { return removeConversationImpl(conv); });
1006 : }
1007 :
1008 : bool
1009 16 : ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv)
1010 : {
1011 16 : auto members = conv.getMembers(false, false);
1012 16 : auto isSyncing = !conv.conversation;
1013 16 : auto hasMembers = !isSyncing // If syncing there is no member to inform
1014 30 : && std::find_if(members.begin(),
1015 : members.end(),
1016 16 : [&](const auto& member) {
1017 16 : return member.at("uri") == username_;
1018 : })
1019 30 : != members.end() // We must be still a member
1020 31 : && members.size() != 1; // If there is only ourself
1021 16 : conv.info.removed = std::time(nullptr);
1022 16 : if (isSyncing)
1023 1 : conv.info.erased = std::time(nullptr);
1024 : // Sync now, because it can take some time to really removes the datas
1025 16 : needsSyncingCb_({});
1026 16 : addConvInfo(conv.info);
1027 16 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id);
1028 16 : if (isSyncing)
1029 1 : return true;
1030 15 : if (conv.conversation->mode() != ConversationMode::ONE_TO_ONE) {
1031 : // For one to one, we do not notify the leave. The other can still generate request
1032 : // and this is managed by the banned part. If we re-accept, the old conversation will be
1033 : // retrieved
1034 6 : auto commitId = conv.conversation->leave();
1035 6 : if (hasMembers) {
1036 3 : JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id);
1037 : // Commit that we left
1038 1 : if (!commitId.empty()) {
1039 : // Do not sync as it's synched by convInfos
1040 1 : sendMessageNotification(*conv.conversation, false, commitId);
1041 : } else {
1042 0 : JAMI_ERROR("Failed to send message to conversation {}", conv.info.id);
1043 : }
1044 : // In this case, we wait that another peer sync the conversation
1045 : // to definitely remove it from the device. This is to inform the
1046 : // peer that we left the conversation and never want to receive
1047 : // any messages
1048 1 : return true;
1049 : }
1050 6 : } else {
1051 27 : for (const auto& m : members)
1052 18 : if (username_ != m.at("uri"))
1053 9 : updateConvForContact(m.at("uri"), conv.info.id, "");
1054 : }
1055 : // Else we are the last member, so we can remove
1056 14 : removeRepositoryImpl(conv, true);
1057 14 : return true;
1058 16 : }
1059 :
1060 : void
1061 614 : ConversationModule::Impl::sendMessageNotification(const std::string& conversationId,
1062 : bool sync,
1063 : const std::string& commitId,
1064 : const std::string& deviceId)
1065 : {
1066 614 : if (auto conv = getConversation(conversationId)) {
1067 615 : std::lock_guard lk(conv->mtx);
1068 614 : if (conv->conversation)
1069 613 : sendMessageNotification(*conv->conversation, sync, commitId, deviceId);
1070 1230 : }
1071 615 : }
1072 :
1073 : void
1074 1915 : ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
1075 : bool sync,
1076 : const std::string& commitId,
1077 : const std::string& deviceId)
1078 : {
1079 1915 : auto acc = account_.lock();
1080 1914 : if (!acc)
1081 0 : return;
1082 1914 : Json::Value message;
1083 1914 : auto commit = commitId == "" ? conversation.lastCommitId() : commitId;
1084 1914 : message["id"] = conversation.id();
1085 1915 : message["commit"] = commit;
1086 1915 : message["deviceId"] = deviceId_;
1087 1915 : Json::StreamWriterBuilder builder;
1088 1915 : const auto text = Json::writeString(builder, message);
1089 :
1090 : // Send message notification will announce the new commit in 3 steps.
1091 :
1092 : // First, because our account can have several devices, announce to other devices
1093 1915 : if (sync) {
1094 : // Announce to our devices
1095 749 : refreshMessage[username_] = sendMsgCb_(username_,
1096 : {},
1097 2996 : std::map<std::string, std::string> {
1098 1498 : {MIME_TYPE_GIT, text}},
1099 749 : refreshMessage[username_]);
1100 : }
1101 :
1102 : // Then, we announce to 2 random members in the conversation that aren't in the DRT
1103 : // This allow new devices without the ability to sync to their other devices to sync with us.
1104 : // Or they can also use an old backup.
1105 1915 : std::vector<std::string> nonConnectedMembers;
1106 1915 : std::vector<NodeId> devices;
1107 : {
1108 1915 : std::lock_guard lk(notSyncedNotificationMtx_);
1109 1915 : devices = conversation.peersToSyncWith();
1110 3830 : auto members = conversation.memberUris(username_, {MemberRole::BANNED});
1111 1915 : std::vector<std::string> connectedMembers;
1112 : // print all members
1113 18478 : for (const auto& device : devices) {
1114 16564 : auto cert = acc->certStore().getCertificate(device.toString());
1115 16564 : if (cert && cert->issuer)
1116 16562 : connectedMembers.emplace_back(cert->issuer->getId().toString());
1117 16563 : }
1118 1913 : std::sort(std::begin(connectedMembers), std::end(connectedMembers));
1119 1915 : std::set_difference(members.begin(),
1120 : members.end(),
1121 : connectedMembers.begin(),
1122 : connectedMembers.end(),
1123 : std::inserter(nonConnectedMembers, nonConnectedMembers.begin()));
1124 1915 : std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand);
1125 1914 : if (nonConnectedMembers.size() > 2)
1126 143 : nonConnectedMembers.resize(2);
1127 1914 : if (!conversation.isBootstraped()) {
1128 981 : JAMI_DEBUG("[Conversation {}] Not yet bootstraped, save notification",
1129 : conversation.id());
1130 : // Because we can get some git channels but not bootstraped, we should keep this
1131 : // to refresh when bootstraped.
1132 327 : notSyncedNotification_[conversation.id()] = commit;
1133 : }
1134 1915 : }
1135 :
1136 2861 : for (const auto& member : nonConnectedMembers) {
1137 946 : refreshMessage[member] = sendMsgCb_(member,
1138 : {},
1139 3784 : std::map<std::string, std::string> {
1140 1892 : {MIME_TYPE_GIT, text}},
1141 946 : refreshMessage[member]);
1142 : }
1143 :
1144 : // Finally we send to devices that the DRT choose.
1145 18480 : for (const auto& device : devices) {
1146 16565 : auto deviceIdStr = device.toString();
1147 16564 : auto memberUri = conversation.uriFromDevice(deviceIdStr);
1148 16564 : if (memberUri.empty() || deviceIdStr == deviceId)
1149 1010 : continue;
1150 15555 : refreshMessage[deviceIdStr] = sendMsgCb_(memberUri,
1151 : device,
1152 62219 : std::map<std::string, std::string> {
1153 31110 : {MIME_TYPE_GIT, text}},
1154 15551 : refreshMessage[deviceIdStr]);
1155 17575 : }
1156 1915 : }
1157 :
1158 : void
1159 92 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1160 : std::string message,
1161 : const std::string& replyTo,
1162 : const std::string& type,
1163 : bool announce,
1164 : OnCommitCb&& onCommit,
1165 : OnDoneCb&& cb)
1166 : {
1167 92 : Json::Value json;
1168 92 : json["body"] = std::move(message);
1169 92 : json["type"] = type;
1170 92 : sendMessage(conversationId,
1171 92 : std::move(json),
1172 : replyTo,
1173 : announce,
1174 92 : std::move(onCommit),
1175 92 : std::move(cb));
1176 92 : }
1177 :
1178 : void
1179 114 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1180 : Json::Value&& value,
1181 : const std::string& replyTo,
1182 : bool announce,
1183 : OnCommitCb&& onCommit,
1184 : OnDoneCb&& cb)
1185 : {
1186 114 : if (auto conv = getConversation(conversationId)) {
1187 114 : std::lock_guard lk(conv->mtx);
1188 114 : if (conv->conversation)
1189 114 : conv->conversation
1190 342 : ->sendMessage(std::move(value),
1191 : replyTo,
1192 114 : std::move(onCommit),
1193 113 : [this,
1194 : conversationId,
1195 : announce,
1196 114 : cb = std::move(cb)](bool ok, const std::string& commitId) {
1197 113 : if (cb)
1198 3 : cb(ok, commitId);
1199 113 : if (!announce)
1200 0 : return;
1201 113 : if (ok)
1202 112 : sendMessageNotification(conversationId, true, commitId);
1203 : else
1204 1 : JAMI_ERR("Failed to send message to conversation %s",
1205 : conversationId.c_str());
1206 : });
1207 228 : }
1208 114 : }
1209 :
1210 : void
1211 7 : ConversationModule::Impl::editMessage(const std::string& conversationId,
1212 : const std::string& newBody,
1213 : const std::string& editedId)
1214 : {
1215 : // Check that editedId is a valid commit, from ourself and plain/text
1216 7 : auto validCommit = false;
1217 7 : std::string type, tid;
1218 7 : if (auto conv = getConversation(conversationId)) {
1219 7 : std::lock_guard lk(conv->mtx);
1220 7 : if (conv->conversation) {
1221 7 : auto commit = conv->conversation->getCommit(editedId);
1222 7 : if (commit != std::nullopt) {
1223 6 : type = commit->at("type");
1224 6 : if (type == "application/data-transfer+json")
1225 1 : tid = commit->at("tid");
1226 18 : validCommit = commit->at("author") == username_
1227 18 : && (type == "text/plain" || type == "application/data-transfer+json");
1228 : }
1229 7 : }
1230 14 : }
1231 7 : if (!validCommit) {
1232 6 : JAMI_ERROR("Unable to edit commit {:s}", editedId);
1233 2 : return;
1234 : }
1235 : // Commit message edition
1236 5 : Json::Value json;
1237 5 : if (type == "application/data-transfer+json") {
1238 1 : json["tid"] = "";
1239 : // Remove file!
1240 2 : auto path = fileutils::get_data_dir() / accountId_
1241 4 : / "conversation_data" / conversationId
1242 3 : / fmt::format("{}_{}", editedId, tid);
1243 1 : dhtnet::fileutils::remove(path, true);
1244 1 : } else {
1245 4 : json["body"] = newBody;
1246 : }
1247 5 : json["edit"] = editedId;
1248 5 : json["type"] = type;
1249 5 : sendMessage(conversationId, std::move(json));
1250 9 : }
1251 :
1252 : void
1253 456 : ConversationModule::Impl::bootstrapCb(std::string convId)
1254 : {
1255 456 : std::string commitId;
1256 : {
1257 456 : std::lock_guard lk(notSyncedNotificationMtx_);
1258 456 : auto it = notSyncedNotification_.find(convId);
1259 456 : if (it != notSyncedNotification_.end()) {
1260 215 : commitId = it->second;
1261 215 : notSyncedNotification_.erase(it);
1262 : }
1263 456 : }
1264 1368 : JAMI_DEBUG("[Conversation {}] Resend last message notification", convId);
1265 456 : dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] {
1266 455 : if (auto sthis = w.lock())
1267 455 : sthis->sendMessageNotification(convId, true, commitId);
1268 456 : });
1269 456 : }
1270 :
1271 : void
1272 689 : ConversationModule::Impl::fixStructures(
1273 : std::shared_ptr<JamiAccount> acc,
1274 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
1275 : const std::set<std::string>& toRm)
1276 : {
1277 690 : for (const auto& [uri, oldConv, newConv] : updateContactConv) {
1278 1 : updateConvForContact(uri, oldConv, newConv);
1279 : }
1280 : ////////////////////////////////////////////////////////////////
1281 : // Note: This is only to homogeneize trust and convRequests
1282 689 : std::vector<std::string> invalidPendingRequests;
1283 : {
1284 689 : auto requests = acc->getTrustRequests();
1285 689 : std::lock_guard lk(conversationsRequestsMtx_);
1286 690 : for (const auto& request : requests) {
1287 1 : auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID);
1288 1 : auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM);
1289 1 : if (itConvId != request.end() && itConvFrom != request.end()) {
1290 : // Check if requests exists or is declined.
1291 1 : auto itReq = conversationsRequests_.find(itConvId->second);
1292 1 : auto declined = itReq == conversationsRequests_.end() || itReq->second.declined;
1293 1 : if (declined) {
1294 3 : JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second);
1295 1 : invalidPendingRequests.emplace_back(itConvFrom->second);
1296 : }
1297 : }
1298 : }
1299 689 : auto requestRemoved = false;
1300 691 : for (auto it = conversationsRequests_.begin(); it != conversationsRequests_.end();) {
1301 2 : if (it->second.from == username_) {
1302 0 : JAMI_WARNING("Detected request from ourself, this makes no sense. Remove {}",
1303 : it->first);
1304 0 : it = conversationsRequests_.erase(it);
1305 : } else {
1306 2 : ++it;
1307 : }
1308 : }
1309 689 : if (requestRemoved) {
1310 0 : saveConvRequests();
1311 : }
1312 689 : }
1313 690 : for (const auto& invalidPendingRequest : invalidPendingRequests)
1314 1 : acc->discardTrustRequest(invalidPendingRequest);
1315 :
1316 : ////////////////////////////////////////////////////////////////
1317 691 : for (const auto& conv : toRm) {
1318 6 : JAMI_ERROR("Remove conversation ({})", conv);
1319 2 : removeConversation(conv);
1320 : }
1321 2067 : JAMI_DEBUG("[Account {}] Conversations loaded!", accountId_);
1322 689 : }
1323 :
1324 : void
1325 179 : ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
1326 : const std::string& deviceId,
1327 : const std::string& oldConvId)
1328 : {
1329 179 : std::lock_guard lk(conv->mtx);
1330 179 : const auto& conversationId = conv->info.id;
1331 179 : if (!conv->startFetch(deviceId, true)) {
1332 261 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
1333 87 : return;
1334 : }
1335 :
1336 92 : onNeedSocket_(
1337 : conversationId,
1338 : deviceId,
1339 92 : [wthis=weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) {
1340 92 : std::lock_guard lk(conv->mtx);
1341 92 : if (conv->pending && !conv->pending->ready) {
1342 90 : conv->pending->removeId = oldConvId;
1343 90 : if (channel) {
1344 82 : conv->pending->ready = true;
1345 82 : conv->pending->deviceId = channel->deviceId().toString();
1346 82 : conv->pending->socket = channel;
1347 82 : if (!conv->pending->cloning) {
1348 82 : conv->pending->cloning = true;
1349 246 : dht::ThreadPool::io().run([wthis,
1350 82 : conversationId,
1351 82 : deviceId = conv->pending->deviceId]() {
1352 164 : if (auto sthis = wthis.lock())
1353 82 : sthis->handlePendingConversation(conversationId, deviceId);
1354 : });
1355 : }
1356 82 : return true;
1357 16 : } else if (auto sthis = wthis.lock()) {
1358 8 : conv->stopFetch(deviceId);
1359 24 : JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count());
1360 8 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
1361 8 : conv->fallbackTimer *= 2;
1362 8 : if (conv->fallbackTimer > MAX_FALLBACK)
1363 0 : conv->fallbackTimer = MAX_FALLBACK;
1364 16 : conv->fallbackClone->async_wait(
1365 : std::bind(&ConversationModule::Impl::fallbackClone,
1366 : sthis,
1367 : std::placeholders::_1,
1368 8 : conversationId));
1369 :
1370 :
1371 : }
1372 : }
1373 10 : return false;
1374 92 : },
1375 : MIME_TYPE_GIT);
1376 179 : }
1377 :
1378 : void
1379 13 : ConversationModule::Impl::fallbackClone(const asio::error_code& ec, const std::string& conversationId)
1380 : {
1381 13 : if (ec == asio::error::operation_aborted)
1382 1 : return;
1383 12 : auto conv = getConversation(conversationId);
1384 12 : if (!conv || conv->conversation)
1385 0 : return;
1386 12 : auto members = getConversationMembers(conversationId);
1387 36 : for (const auto& member : members)
1388 24 : if (member.at("uri") != username_)
1389 12 : cloneConversationFrom(conversationId, member.at("uri"));
1390 12 : }
1391 :
1392 : void
1393 848 : ConversationModule::Impl::bootstrap(const std::string& convId)
1394 : {
1395 848 : std::vector<DeviceId> kd;
1396 : {
1397 848 : std::unique_lock lk(conversationsMtx_);
1398 848 : const auto& devices = accountManager_->getKnownDevices();
1399 848 : kd.reserve(devices.size());
1400 2759 : for (const auto& [id, _] : devices)
1401 1911 : kd.emplace_back(id);
1402 848 : }
1403 144 : auto bootstrap = [&](auto& conv) {
1404 144 : if (conv) {
1405 : #ifdef LIBJAMI_TESTABLE
1406 144 : conv->onBootstrapStatus(bootstrapCbTest_);
1407 : #endif // LIBJAMI_TESTABLE
1408 144 : conv->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb, this, conv->id()), kd);
1409 : }
1410 144 : };
1411 848 : std::vector<std::string> toClone;
1412 848 : if (convId.empty()) {
1413 683 : std::lock_guard lk(convInfosMtx_);
1414 722 : for (const auto& [conversationId, convInfo] : convInfos_) {
1415 39 : auto conv = getConversation(conversationId);
1416 39 : if (!conv)
1417 0 : return;
1418 39 : if ((!conv->conversation && !conv->info.isRemoved())) {
1419 : // Because we're not tracking contact presence in order to sync now,
1420 : // we need to ask to clone requests when bootstraping all conversations
1421 : // else it can stay syncing
1422 4 : toClone.emplace_back(conversationId);
1423 35 : } else if (conv->conversation) {
1424 34 : bootstrap(conv->conversation);
1425 : }
1426 39 : }
1427 848 : } else if (auto conv = getConversation(convId)) {
1428 110 : std::lock_guard lk(conv->mtx);
1429 110 : if (conv->conversation)
1430 110 : bootstrap(conv->conversation);
1431 275 : }
1432 :
1433 852 : for (const auto& cid : toClone) {
1434 4 : auto members = getConversationMembers(cid);
1435 11 : for (const auto& member : members) {
1436 7 : if (member.at("uri") != username_)
1437 3 : cloneConversationFrom(cid, member.at("uri"));
1438 : }
1439 4 : }
1440 848 : }
1441 :
1442 : void
1443 190 : ConversationModule::Impl::cloneConversationFrom(const std::string& conversationId,
1444 : const std::string& uri,
1445 : const std::string& oldConvId)
1446 : {
1447 190 : auto memberHash = dht::InfoHash(uri);
1448 190 : if (!memberHash) {
1449 0 : JAMI_WARNING("Invalid member detected: {}", uri);
1450 0 : return;
1451 : }
1452 190 : auto conv = startConversation(conversationId);
1453 190 : std::lock_guard lk(conv->mtx);
1454 190 : conv->info = {conversationId};
1455 190 : conv->info.created = std::time(nullptr);
1456 190 : conv->info.members.emplace(username_);
1457 190 : conv->info.members.emplace(uri);
1458 190 : accountManager_->forEachDevice(
1459 : memberHash,
1460 175 : [w = weak(), conv, conversationId, oldConvId](
1461 : const std::shared_ptr<dht::crypto::PublicKey>& pk) {
1462 175 : auto sthis = w.lock();
1463 175 : auto deviceId = pk->getLongId().toString();
1464 175 : if (!sthis or deviceId == sthis->deviceId_)
1465 0 : return;
1466 175 : sthis->cloneConversationFrom(conv, deviceId, oldConvId);
1467 175 : });
1468 190 : addConvInfo(conv->info);
1469 190 : }
1470 :
1471 : ////////////////////////////////////////////////////////////////
1472 :
1473 : void
1474 489 : ConversationModule::saveConvRequests(
1475 : const std::string& accountId,
1476 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1477 : {
1478 489 : auto path = fileutils::get_data_dir() / accountId;
1479 489 : saveConvRequestsToPath(path, conversationsRequests);
1480 489 : }
1481 :
1482 : void
1483 1272 : ConversationModule::saveConvRequestsToPath(
1484 : const std::filesystem::path& path,
1485 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1486 : {
1487 1272 : auto p = path / "convRequests";
1488 1272 : std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
1489 1272 : std::ofstream file(p, std::ios::trunc | std::ios::binary);
1490 1272 : msgpack::pack(file, conversationsRequests);
1491 1272 : }
1492 :
1493 : void
1494 3450 : ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations)
1495 : {
1496 3450 : auto path = fileutils::get_data_dir() / accountId;
1497 3450 : saveConvInfosToPath(path, conversations);
1498 3450 : }
1499 :
1500 : void
1501 4233 : ConversationModule::saveConvInfosToPath(const std::filesystem::path& path,
1502 : const ConvInfoMap& conversations)
1503 : {
1504 8466 : std::ofstream file(path / "convInfo", std::ios::trunc | std::ios::binary);
1505 4233 : msgpack::pack(file, conversations);
1506 4233 : }
1507 :
1508 : ////////////////////////////////////////////////////////////////
1509 :
1510 677 : ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account,
1511 : std::shared_ptr<AccountManager> accountManager,
1512 : NeedsSyncingCb&& needsSyncingCb,
1513 : SengMsgCb&& sendMsgCb,
1514 : NeedSocketCb&& onNeedSocket,
1515 : NeedSocketCb&& onNeedSwarmSocket,
1516 : OneToOneRecvCb&& oneToOneRecvCb,
1517 677 : bool autoLoadConversations)
1518 677 : : pimpl_ {std::make_unique<Impl>(std::move(account),
1519 677 : std::move(accountManager),
1520 677 : std::move(needsSyncingCb),
1521 677 : std::move(sendMsgCb),
1522 677 : std::move(onNeedSocket),
1523 677 : std::move(onNeedSwarmSocket),
1524 677 : std::move(oneToOneRecvCb))}
1525 : {
1526 677 : if (autoLoadConversations) {
1527 677 : loadConversations();
1528 : }
1529 677 : }
1530 :
1531 : void
1532 16 : ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager)
1533 : {
1534 16 : std::unique_lock lk(pimpl_->conversationsMtx_);
1535 16 : pimpl_->accountManager_ = accountManager;
1536 16 : }
1537 :
1538 : #ifdef LIBJAMI_TESTABLE
1539 : void
1540 18 : ConversationModule::onBootstrapStatus(
1541 : const std::function<void(std::string, Conversation::BootstrapStatus)>& cb)
1542 : {
1543 18 : pimpl_->bootstrapCbTest_ = cb;
1544 19 : for (auto& c : pimpl_->getConversations())
1545 19 : c->onBootstrapStatus(pimpl_->bootstrapCbTest_);
1546 18 : }
1547 : #endif
1548 :
1549 : void
1550 689 : ConversationModule::loadConversations()
1551 : {
1552 689 : auto acc = pimpl_->account_.lock();
1553 689 : if (!acc)
1554 0 : return;
1555 2067 : JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
1556 : auto conversationsRepositories = dhtnet::fileutils::readDirectory(
1557 1378 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1558 :
1559 689 : std::unique_lock lk(pimpl_->conversationsMtx_);
1560 689 : auto contacts = pimpl_->accountManager_->getContacts(true); // Avoid to lock configurationMtx while conv Mtx is locked
1561 689 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1562 689 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1563 689 : pimpl_->conversations_.clear();
1564 :
1565 : struct Ctx
1566 : {
1567 : std::mutex cvMtx;
1568 : std::condition_variable cv;
1569 : std::mutex toRmMtx;
1570 : std::set<std::string> toRm;
1571 : std::mutex convMtx;
1572 : size_t convNb;
1573 : std::vector<std::map<std::string, std::string>> contacts;
1574 : std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
1575 : };
1576 689 : auto ctx = std::make_shared<Ctx>();
1577 689 : ctx->convNb = conversationsRepositories.size();
1578 689 : ctx->contacts = std::move(contacts);
1579 :
1580 707 : for (auto&& r : conversationsRepositories) {
1581 18 : dht::ThreadPool::io().run([this, ctx, repository=std::move(r), acc] {
1582 : try {
1583 18 : auto sconv = std::make_shared<SyncedConversation>(repository);
1584 18 : auto conv = std::make_shared<Conversation>(acc, repository);
1585 18 : conv->onMessageStatusChanged([this, repository](const auto& status) {
1586 7 : auto msg = std::make_shared<SyncMsg>();
1587 14 : msg->ms = {{repository, status}};
1588 7 : pimpl_->needsSyncingCb_(std::move(msg));
1589 7 : });
1590 54 : conv->onMembersChanged(
1591 36 : [w = pimpl_->weak_from_this(), repository](const auto& members) {
1592 : // Delay in another thread to avoid deadlocks
1593 10 : dht::ThreadPool::io().run([w, repository, members = std::move(members)] {
1594 10 : if (auto sthis = w.lock())
1595 5 : sthis->setConversationMembers(repository, members);
1596 : });
1597 5 : });
1598 18 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1599 18 : auto members = conv->memberUris(acc->getUsername(), {});
1600 : // NOTE: The following if is here to protect against any incorrect state
1601 : // that can be introduced
1602 18 : if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
1603 : // If we got a 1:1 conversation, but not in the contact details, it's rather a
1604 : // duplicate or a weird state
1605 5 : auto otherUri = *members.begin();
1606 5 : auto itContact = std::find_if(ctx->contacts.cbegin(),
1607 5 : ctx->contacts.cend(),
1608 5 : [&](const auto& c) {
1609 5 : return c.at("id") == otherUri;
1610 : });
1611 5 : if (itContact == ctx->contacts.end()) {
1612 0 : JAMI_WARNING("Contact {} not found", otherUri);
1613 0 : std::lock_guard lkCv {ctx->cvMtx};
1614 0 : --ctx->convNb;
1615 0 : ctx->cv.notify_all();
1616 0 : return;
1617 0 : }
1618 5 : const std::string& convFromDetails = itContact->at("conversationId");
1619 5 : auto removed = std::stoul(itContact->at("removed"));
1620 5 : auto added = std::stoul(itContact->at("added"));
1621 5 : auto isRemoved = removed > added;
1622 5 : if (convFromDetails != repository) {
1623 2 : if (convFromDetails.empty()) {
1624 1 : if (isRemoved) {
1625 : // If details is empty, contact is removed and not banned.
1626 0 : JAMI_ERROR("Conversation {} detected for {} and should be removed",
1627 : repository,
1628 : otherUri);
1629 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1630 0 : ctx->toRm.insert(repository);
1631 0 : } else {
1632 3 : JAMI_ERROR("No conversation detected for {} but one exists ({}). "
1633 : "Update details",
1634 : otherUri,
1635 : repository);
1636 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1637 2 : ctx->updateContactConv.emplace_back(
1638 2 : std::make_tuple(otherUri, convFromDetails, repository));
1639 1 : }
1640 : } else {
1641 3 : JAMI_ERROR("Multiple conversation detected for {} but ({} & {})",
1642 : otherUri,
1643 : repository,
1644 : convFromDetails);
1645 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1646 1 : ctx->toRm.insert(repository);
1647 1 : }
1648 : }
1649 5 : }
1650 : {
1651 18 : std::lock_guard lkMtx {ctx->convMtx};
1652 18 : auto convInfo = pimpl_->convInfos_.find(repository);
1653 18 : if (convInfo == pimpl_->convInfos_.end()) {
1654 9 : JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
1655 3 : sconv->info.created = std::time(nullptr);
1656 3 : sconv->info.lastDisplayed
1657 3 : = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
1658 : } else {
1659 15 : sconv->info = convInfo->second;
1660 15 : if (convInfo->second.isRemoved()) {
1661 : // A conversation was removed, but repository still exists
1662 1 : conv->setRemovingFlag();
1663 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1664 1 : ctx->toRm.insert(repository);
1665 1 : }
1666 : }
1667 : // Even if we found the conversation in convInfos_, unable to assume that the list of members
1668 : // stored in `convInfo` is correct (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1025).
1669 : // For this reason, we always use the list we got from the conversation repository to set
1670 : // the value of `sconv->info.members`.
1671 18 : members.emplace(acc->getUsername());
1672 18 : sconv->info.members = std::move(members);
1673 : // convInfosMtx_ is already locked
1674 18 : pimpl_->convInfos_[repository] = sconv->info;
1675 18 : }
1676 18 : auto commits = conv->commitsEndedCalls();
1677 :
1678 18 : if (!commits.empty()) {
1679 : // Note: here, this means that some calls were actives while the
1680 : // daemon finished (can be a crash).
1681 : // Notify other in the conversation that the call is finished
1682 0 : pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
1683 : }
1684 18 : sconv->conversation = conv;
1685 18 : std::lock_guard lkMtx {ctx->convMtx};
1686 18 : pimpl_->conversations_.emplace(repository, std::move(sconv));
1687 18 : } catch (const std::logic_error& e) {
1688 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}",
1689 : pimpl_->accountId_,
1690 : e.what());
1691 0 : }
1692 18 : std::lock_guard lkCv {ctx->cvMtx};
1693 18 : --ctx->convNb;
1694 18 : ctx->cv.notify_all();
1695 18 : });
1696 : }
1697 :
1698 689 : std::unique_lock lkCv(ctx->cvMtx);
1699 1396 : ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; });
1700 :
1701 : // Prune any invalid conversations without members and
1702 : // set the removed flag if needed
1703 689 : std::set<std::string> removed;
1704 721 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1705 32 : const auto& info = itInfo->second;
1706 32 : if (info.members.empty()) {
1707 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1708 0 : continue;
1709 : }
1710 32 : if (info.isRemoved())
1711 2 : removed.insert(info.id);
1712 32 : auto itConv = pimpl_->conversations_.find(info.id);
1713 32 : if (itConv == pimpl_->conversations_.end()) {
1714 : // convInfos_ can contain a conversation that is not yet cloned
1715 : // so we need to add it there.
1716 14 : itConv = pimpl_->conversations_
1717 14 : .emplace(info.id, std::make_shared<SyncedConversation>(info))
1718 : .first;
1719 : }
1720 64 : if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation
1721 64 : && info.isRemoved())
1722 1 : itConv->second->conversation->setRemovingFlag();
1723 32 : if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
1724 : // In this case, the conversation is not synced and we only know ourself
1725 0 : if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
1726 0 : JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.",
1727 : pimpl_->accountId_,
1728 : info.id);
1729 0 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
1730 0 : info.id);
1731 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1732 0 : continue;
1733 0 : }
1734 : }
1735 32 : ++itInfo;
1736 : }
1737 : // On oldest version, removeConversation didn't update "appdata/contacts"
1738 : // causing a potential incorrect state between "appdata/contacts" and "appdata/convInfos"
1739 689 : if (!removed.empty())
1740 2 : acc->unlinkConversations(removed);
1741 : // Save if we've removed some invalid entries
1742 689 : pimpl_->saveConvInfos();
1743 :
1744 689 : ilk.unlock();
1745 689 : lk.unlock();
1746 :
1747 2067 : dht::ThreadPool::io().run([w = pimpl_->weak(),
1748 : acc,
1749 689 : updateContactConv = std::move(ctx->updateContactConv),
1750 689 : toRm = std::move(ctx->toRm)]() {
1751 : // Will lock account manager
1752 689 : if (auto shared = w.lock())
1753 689 : shared->fixStructures(acc, updateContactConv, toRm);
1754 689 : });
1755 689 : }
1756 :
1757 : void
1758 0 : ConversationModule::loadSingleConversation(const std::string& convId)
1759 : {
1760 0 : auto acc = pimpl_->account_.lock();
1761 0 : if (!acc)
1762 0 : return;
1763 0 : JAMI_LOG("[Account {}] Start loading conversation {}", pimpl_->accountId_, convId);
1764 :
1765 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
1766 0 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1767 : // Load convInfos to retrieve requests that have been accepted but not yet synchronized.
1768 0 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1769 0 : pimpl_->conversations_.clear();
1770 :
1771 : try {
1772 0 : auto sconv = std::make_shared<SyncedConversation>(convId);
1773 :
1774 0 : auto conv = std::make_shared<Conversation>(acc, convId);
1775 :
1776 0 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1777 :
1778 0 : sconv->conversation = conv;
1779 0 : pimpl_->conversations_.emplace(convId, std::move(sconv));
1780 0 : } catch (const std::logic_error& e) {
1781 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
1782 0 : }
1783 :
1784 : // Add all other conversations as dummy conversations to indicate their existence so
1785 : // isConversation could detect conversations correctly.
1786 : auto conversationsRepositoryIds = dhtnet::fileutils::readDirectory(
1787 0 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1788 0 : for (auto repositoryId : conversationsRepositoryIds) {
1789 0 : if (repositoryId != convId) {
1790 0 : auto conv = std::make_shared<SyncedConversation>(convId);
1791 0 : pimpl_->conversations_.emplace(repositoryId, conv);
1792 0 : }
1793 0 : }
1794 :
1795 : // Add conversations from convInfos_ so isConversation could detect conversations correctly.
1796 : // This includes conversations that have been accepted but are not yet synchronized.
1797 0 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1798 0 : const auto& info = itInfo->second;
1799 0 : if (info.members.empty()) {
1800 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1801 0 : continue;
1802 : }
1803 0 : auto itConv = pimpl_->conversations_.find(info.id);
1804 0 : if (itConv == pimpl_->conversations_.end()) {
1805 : // convInfos_ can contain a conversation that is not yet cloned
1806 : // so we need to add it there.
1807 0 : pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info)).first;
1808 : }
1809 0 : ++itInfo;
1810 : }
1811 :
1812 0 : ilk.unlock();
1813 0 : lk.unlock();
1814 0 : }
1815 :
1816 : void
1817 848 : ConversationModule::bootstrap(const std::string& convId)
1818 : {
1819 848 : pimpl_->bootstrap(convId);
1820 848 : }
1821 :
1822 : void
1823 0 : ConversationModule::monitor()
1824 : {
1825 0 : for (auto& conv : pimpl_->getConversations())
1826 0 : conv->monitor();
1827 0 : }
1828 :
1829 : void
1830 699 : ConversationModule::clearPendingFetch()
1831 : {
1832 : // Note: This is a workaround. convModule() is kept if account is disabled/re-enabled.
1833 : // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch
1834 : // is not erased (callback not called), it will block the new messages as it will not
1835 : // sync. The best way to debug this is to get logs from the last ICE connection for
1836 : // syncing the conversation. It may have been killed in some un-expected way avoiding to
1837 : // call the callbacks. This should never happen, but if it's the case, this will allow
1838 : // new messages to be synced correctly.
1839 727 : for (auto& conv : pimpl_->getSyncedConversations()) {
1840 28 : std::lock_guard lk(conv->mtx);
1841 28 : if (conv && conv->pending) {
1842 0 : JAMI_ERR("This is a bug, seems to still fetch to some device on initializing");
1843 0 : conv->pending.reset();
1844 : }
1845 727 : }
1846 699 : }
1847 :
1848 : void
1849 0 : ConversationModule::reloadRequests()
1850 : {
1851 0 : pimpl_->conversationsRequests_ = convRequests(pimpl_->accountId_);
1852 0 : }
1853 :
1854 : std::vector<std::string>
1855 12 : ConversationModule::getConversations() const
1856 : {
1857 12 : std::vector<std::string> result;
1858 12 : std::lock_guard lk(pimpl_->convInfosMtx_);
1859 12 : result.reserve(pimpl_->convInfos_.size());
1860 24 : for (const auto& [key, conv] : pimpl_->convInfos_) {
1861 12 : if (conv.isRemoved())
1862 4 : continue;
1863 8 : result.emplace_back(key);
1864 : }
1865 24 : return result;
1866 12 : }
1867 :
1868 : std::string
1869 638 : ConversationModule::getOneToOneConversation(const std::string& uri) const noexcept
1870 : {
1871 638 : return pimpl_->getOneToOneConversation(uri);
1872 : }
1873 :
1874 : bool
1875 8 : ConversationModule::updateConvForContact(const std::string& uri,
1876 : const std::string& oldConv,
1877 : const std::string& newConv)
1878 : {
1879 8 : return pimpl_->updateConvForContact(uri, oldConv, newConv);
1880 : }
1881 :
1882 : std::vector<std::map<std::string, std::string>>
1883 12 : ConversationModule::getConversationRequests() const
1884 : {
1885 12 : std::vector<std::map<std::string, std::string>> requests;
1886 12 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1887 12 : requests.reserve(pimpl_->conversationsRequests_.size());
1888 24 : for (const auto& [id, request] : pimpl_->conversationsRequests_) {
1889 12 : if (request.declined)
1890 6 : continue; // Do not add declined requests
1891 6 : requests.emplace_back(request.toMap());
1892 : }
1893 24 : return requests;
1894 12 : }
1895 :
1896 : void
1897 79 : ConversationModule::onTrustRequest(const std::string& uri,
1898 : const std::string& conversationId,
1899 : const std::vector<uint8_t>& payload,
1900 : time_t received)
1901 : {
1902 79 : auto oldConv = getOneToOneConversation(uri);
1903 79 : if (!oldConv.empty() && pimpl_->isConversation(oldConv)) {
1904 : // If there is already an active one to one conversation here, it's an active
1905 : // contact and the contact will reclone this activeConv, so ignore the request
1906 21 : JAMI_WARNING(
1907 : "Contact is sending a request for a non active conversation. Ignore. They will "
1908 : "clone the old one");
1909 7 : return;
1910 : }
1911 72 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1912 72 : ConversationRequest req;
1913 72 : req.from = uri;
1914 72 : req.conversationId = conversationId;
1915 72 : req.received = std::time(nullptr);
1916 216 : req.metadatas = ConversationRepository::infosFromVCard(vCard::utils::toMap(
1917 144 : std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size())));
1918 72 : auto reqMap = req.toMap();
1919 72 : if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
1920 59 : lk.unlock();
1921 59 : emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_,
1922 : conversationId,
1923 : uri,
1924 : payload,
1925 : received);
1926 59 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1927 : conversationId,
1928 : reqMap);
1929 59 : pimpl_->needsSyncingCb_({});
1930 : } else {
1931 39 : JAMI_DEBUG("[Account {}] Received a request for a conversation "
1932 : "already existing. Ignore",
1933 : pimpl_->accountId_);
1934 : }
1935 79 : }
1936 :
1937 : void
1938 9454 : ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
1939 : {
1940 9454 : ConversationRequest req(value);
1941 9453 : auto isOneToOne = req.isOneToOne();
1942 9454 : std::string oldConv;
1943 9454 : if (isOneToOne) {
1944 129 : oldConv = pimpl_->getOneToOneConversation(from);
1945 : }
1946 9454 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1947 28362 : JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
1948 : pimpl_->accountId_,
1949 : req.conversationId,
1950 : from);
1951 9454 : auto convId = req.conversationId;
1952 :
1953 : // Already accepted request, do nothing
1954 9454 : if (pimpl_->isConversation(convId))
1955 2248 : return;
1956 7206 : auto oldReq = pimpl_->getRequest(convId);
1957 7206 : if (oldReq != std::nullopt) {
1958 21084 : JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. "
1959 : "Ignore. Declined: {}",
1960 : pimpl_->accountId_,
1961 : static_cast<int>(oldReq->declined));
1962 7028 : return;
1963 : }
1964 :
1965 178 : if (!oldConv.empty()) {
1966 45 : lk.unlock();
1967 : // Already a conversation with the contact.
1968 : // If there is already an active one to one conversation here, it's an active
1969 : // contact and the contact will reclone this activeConv, so ignore the request
1970 135 : JAMI_WARNING(
1971 : "Contact is sending a request for a non active conversation. Ignore. They will "
1972 : "clone the old one");
1973 45 : return;
1974 : }
1975 :
1976 133 : req.received = std::time(nullptr);
1977 133 : req.from = from;
1978 133 : auto reqMap = req.toMap();
1979 133 : if (pimpl_->addConversationRequest(convId, std::move(req))) {
1980 133 : lk.unlock();
1981 : // Note: no need to sync here because other connected devices should receive
1982 : // the same conversation request. Will sync when the conversation will be added
1983 133 : if (isOneToOne)
1984 1 : pimpl_->oneToOneRecvCb_(convId, from);
1985 133 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1986 : convId,
1987 : reqMap);
1988 : }
1989 44490 : }
1990 :
1991 : std::string
1992 3 : ConversationModule::peerFromConversationRequest(const std::string& convId) const
1993 : {
1994 3 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1995 3 : auto it = pimpl_->conversationsRequests_.find(convId);
1996 3 : if (it != pimpl_->conversationsRequests_.end()) {
1997 3 : return it->second.from;
1998 : }
1999 0 : return {};
2000 3 : }
2001 :
2002 : void
2003 2790 : ConversationModule::onNeedConversationRequest(const std::string& from,
2004 : const std::string& conversationId)
2005 : {
2006 2790 : pimpl_->withConversation(conversationId, [&](auto& conversation) {
2007 2785 : if (!conversation.isMember(from, true)) {
2008 0 : JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId);
2009 0 : return;
2010 : }
2011 8355 : JAMI_LOG("{} is asking a new invite for {}", from, conversationId);
2012 2785 : pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
2013 : });
2014 2790 : }
2015 :
2016 : void
2017 189 : ConversationModule::acceptConversationRequest(const std::string& conversationId,
2018 : const std::string& deviceId)
2019 : {
2020 : // For all conversation members, try to open a git channel with this conversation ID
2021 189 : std::unique_lock lkCr(pimpl_->conversationsRequestsMtx_);
2022 189 : auto request = pimpl_->getRequest(conversationId);
2023 189 : if (request == std::nullopt) {
2024 21 : lkCr.unlock();
2025 21 : if (auto conv = pimpl_->getConversation(conversationId)) {
2026 11 : std::unique_lock lk(conv->mtx);
2027 11 : if (!conv->conversation) {
2028 4 : lk.unlock();
2029 4 : pimpl_->cloneConversationFrom(conv, deviceId);
2030 : }
2031 32 : }
2032 63 : JAMI_WARNING("[Account {}] Request not found for conversation {}",
2033 : pimpl_->accountId_,
2034 : conversationId);
2035 21 : return;
2036 : }
2037 168 : pimpl_->rmConversationRequest(conversationId);
2038 168 : lkCr.unlock();
2039 168 : pimpl_->accountManager_->acceptTrustRequest(request->from, true);
2040 168 : cloneConversationFrom(conversationId, request->from);
2041 210 : }
2042 :
2043 : void
2044 6 : ConversationModule::declineConversationRequest(const std::string& conversationId)
2045 : {
2046 6 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2047 6 : auto it = pimpl_->conversationsRequests_.find(conversationId);
2048 6 : if (it != pimpl_->conversationsRequests_.end()) {
2049 6 : it->second.declined = std::time(nullptr);
2050 6 : pimpl_->saveConvRequests();
2051 : }
2052 6 : pimpl_->syncingMetadatas_.erase(conversationId);
2053 6 : pimpl_->saveMetadatas();
2054 6 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2055 : conversationId);
2056 6 : pimpl_->needsSyncingCb_({});
2057 6 : }
2058 :
2059 : std::string
2060 181 : ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash& otherMember)
2061 : {
2062 181 : auto acc = pimpl_->account_.lock();
2063 181 : if (!acc)
2064 0 : return {};
2065 181 : std::vector<DeviceId> kd;
2066 1364 : for (const auto& [id, _] : acc->getKnownDevices())
2067 1364 : kd.emplace_back(id);
2068 : // Create the conversation object
2069 181 : std::shared_ptr<Conversation> conversation;
2070 : try {
2071 181 : conversation = std::make_shared<Conversation>(acc, mode, otherMember.toString());
2072 181 : auto conversationId = conversation->id();
2073 181 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
2074 575 : auto msg = std::make_shared<SyncMsg>();
2075 1150 : msg->ms = {{conversationId, status}};
2076 575 : pimpl_->needsSyncingCb_(std::move(msg));
2077 575 : });
2078 181 : conversation->onMembersChanged([w=pimpl_->weak_from_this(), conversationId](const auto& members) {
2079 : // Delay in another thread to avoid deadlocks
2080 1148 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
2081 1148 : if (auto sthis = w.lock())
2082 574 : sthis->setConversationMembers(conversationId, members);
2083 : });
2084 574 : });
2085 181 : conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
2086 : #ifdef LIBJAMI_TESTABLE
2087 181 : conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
2088 : #endif // LIBJAMI_TESTABLE
2089 362 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
2090 181 : pimpl_.get(),
2091 : conversationId),
2092 : kd);
2093 181 : } catch (const std::exception& e) {
2094 0 : JAMI_ERROR("[Account {}] Error while generating a conversation {}",
2095 : pimpl_->accountId_, e.what());
2096 0 : return {};
2097 0 : }
2098 181 : auto convId = conversation->id();
2099 181 : auto conv = pimpl_->startConversation(convId);
2100 181 : std::unique_lock lk(conv->mtx);
2101 181 : conv->info.created = std::time(nullptr);
2102 181 : conv->info.members.emplace(pimpl_->username_);
2103 181 : if (otherMember)
2104 66 : conv->info.members.emplace(otherMember.toString());
2105 181 : conv->conversation = conversation;
2106 181 : addConvInfo(conv->info);
2107 181 : lk.unlock();
2108 :
2109 181 : pimpl_->needsSyncingCb_({});
2110 181 : emitSignal<libjami::ConversationSignal::ConversationReady>(pimpl_->accountId_, convId);
2111 181 : return convId;
2112 181 : }
2113 :
2114 : void
2115 175 : ConversationModule::cloneConversationFrom(const std::string& conversationId,
2116 : const std::string& uri,
2117 : const std::string& oldConvId)
2118 : {
2119 175 : pimpl_->cloneConversationFrom(conversationId, uri, oldConvId);
2120 175 : }
2121 :
2122 : // Message send/load
2123 : void
2124 92 : ConversationModule::sendMessage(const std::string& conversationId,
2125 : std::string message,
2126 : const std::string& replyTo,
2127 : const std::string& type,
2128 : bool announce,
2129 : OnCommitCb&& onCommit,
2130 : OnDoneCb&& cb)
2131 : {
2132 184 : pimpl_->sendMessage(conversationId,
2133 92 : std::move(message),
2134 : replyTo,
2135 : type,
2136 : announce,
2137 92 : std::move(onCommit),
2138 92 : std::move(cb));
2139 92 : }
2140 :
2141 : void
2142 14 : ConversationModule::sendMessage(const std::string& conversationId,
2143 : Json::Value&& value,
2144 : const std::string& replyTo,
2145 : bool announce,
2146 : OnCommitCb&& onCommit,
2147 : OnDoneCb&& cb)
2148 : {
2149 28 : pimpl_->sendMessage(conversationId,
2150 14 : std::move(value),
2151 : replyTo,
2152 : announce,
2153 14 : std::move(onCommit),
2154 14 : std::move(cb));
2155 14 : }
2156 :
2157 : void
2158 7 : ConversationModule::editMessage(const std::string& conversationId,
2159 : const std::string& newBody,
2160 : const std::string& editedId)
2161 : {
2162 7 : pimpl_->editMessage(conversationId, newBody, editedId);
2163 7 : }
2164 :
2165 : void
2166 3 : ConversationModule::reactToMessage(const std::string& conversationId,
2167 : const std::string& newBody,
2168 : const std::string& reactToId)
2169 : {
2170 : // Commit message edition
2171 3 : Json::Value json;
2172 3 : json["body"] = newBody;
2173 3 : json["react-to"] = reactToId;
2174 3 : json["type"] = "text/plain";
2175 3 : pimpl_->sendMessage(conversationId, std::move(json));
2176 3 : }
2177 :
2178 : void
2179 100 : ConversationModule::addCallHistoryMessage(const std::string& uri,
2180 : uint64_t duration_ms,
2181 : const std::string& reason)
2182 : {
2183 100 : auto finalUri = uri.substr(0, uri.find("@ring.dht"));
2184 100 : finalUri = finalUri.substr(0, uri.find("@jami.dht"));
2185 100 : auto convId = getOneToOneConversation(finalUri);
2186 100 : if (!convId.empty()) {
2187 3 : Json::Value value;
2188 3 : value["to"] = finalUri;
2189 3 : value["type"] = "application/call-history+json";
2190 3 : value["duration"] = std::to_string(duration_ms);
2191 3 : if (!reason.empty())
2192 3 : value["reason"] = reason;
2193 3 : sendMessage(convId, std::move(value));
2194 3 : }
2195 100 : }
2196 :
2197 : bool
2198 18 : ConversationModule::onMessageDisplayed(const std::string& peer,
2199 : const std::string& conversationId,
2200 : const std::string& interactionId)
2201 : {
2202 18 : if (auto conv = pimpl_->getConversation(conversationId)) {
2203 18 : std::unique_lock lk(conv->mtx);
2204 18 : if (auto conversation = conv->conversation) {
2205 18 : lk.unlock();
2206 18 : return conversation->setMessageDisplayed(peer, interactionId);
2207 18 : }
2208 36 : }
2209 0 : return false;
2210 : }
2211 :
2212 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
2213 201 : ConversationModule::convMessageStatus() const
2214 : {
2215 201 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
2216 290 : for (const auto& conv : pimpl_->getConversations()) {
2217 89 : auto d = conv->messageStatus();
2218 89 : if (!d.empty())
2219 70 : messageStatus[conv->id()] = std::move(d);
2220 290 : }
2221 201 : return messageStatus;
2222 0 : }
2223 :
2224 : uint32_t
2225 0 : ConversationModule::loadConversationMessages(const std::string& conversationId,
2226 : const std::string& fromMessage,
2227 : size_t n)
2228 : {
2229 0 : auto acc = pimpl_->account_.lock();
2230 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2231 0 : std::lock_guard lk(conv->mtx);
2232 0 : if (conv->conversation) {
2233 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2234 0 : LogOptions options;
2235 0 : options.from = fromMessage;
2236 0 : options.nbOfCommits = n;
2237 0 : conv->conversation->loadMessages(
2238 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2239 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2240 0 : accountId,
2241 0 : conversationId,
2242 : messages);
2243 0 : },
2244 : options);
2245 0 : return id;
2246 0 : }
2247 0 : }
2248 0 : return 0;
2249 0 : }
2250 :
2251 : void
2252 0 : ConversationModule::clearCache(const std::string& conversationId)
2253 : {
2254 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2255 0 : std::lock_guard lk(conv->mtx);
2256 0 : if (conv->conversation) {
2257 0 : conv->conversation->clearCache();
2258 : }
2259 0 : }
2260 0 : }
2261 :
2262 : uint32_t
2263 2 : ConversationModule::loadConversation(const std::string& conversationId,
2264 : const std::string& fromMessage,
2265 : size_t n)
2266 : {
2267 2 : auto acc = pimpl_->account_.lock();
2268 2 : if (auto conv = pimpl_->getConversation(conversationId)) {
2269 2 : std::lock_guard lk(conv->mtx);
2270 2 : if (conv->conversation) {
2271 2 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2272 2 : LogOptions options;
2273 2 : options.from = fromMessage;
2274 2 : options.nbOfCommits = n;
2275 4 : conv->conversation->loadMessages2(
2276 2 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2277 4 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2278 2 : accountId,
2279 2 : conversationId,
2280 : messages);
2281 2 : },
2282 : options);
2283 2 : return id;
2284 2 : }
2285 4 : }
2286 0 : return 0;
2287 2 : }
2288 :
2289 : uint32_t
2290 0 : ConversationModule::loadConversationUntil(const std::string& conversationId,
2291 : const std::string& fromMessage,
2292 : const std::string& toMessage)
2293 : {
2294 0 : auto acc = pimpl_->account_.lock();
2295 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2296 0 : std::lock_guard lk(conv->mtx);
2297 0 : if (conv->conversation) {
2298 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2299 0 : LogOptions options;
2300 0 : options.from = fromMessage;
2301 0 : options.to = toMessage;
2302 0 : options.includeTo = true;
2303 0 : conv->conversation->loadMessages(
2304 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2305 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2306 0 : accountId,
2307 0 : conversationId,
2308 : messages);
2309 0 : },
2310 : options);
2311 0 : return id;
2312 0 : }
2313 0 : }
2314 0 : return 0;
2315 0 : }
2316 :
2317 : uint32_t
2318 0 : ConversationModule::loadSwarmUntil(const std::string& conversationId,
2319 : const std::string& fromMessage,
2320 : const std::string& toMessage)
2321 : {
2322 0 : auto acc = pimpl_->account_.lock();
2323 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2324 0 : std::lock_guard lk(conv->mtx);
2325 0 : if (conv->conversation) {
2326 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2327 0 : LogOptions options;
2328 0 : options.from = fromMessage;
2329 0 : options.to = toMessage;
2330 0 : options.includeTo = true;
2331 0 : conv->conversation->loadMessages2(
2332 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2333 0 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2334 0 : accountId,
2335 0 : conversationId,
2336 : messages);
2337 0 : },
2338 : options);
2339 0 : return id;
2340 0 : }
2341 0 : }
2342 0 : return 0;
2343 0 : }
2344 :
2345 : std::shared_ptr<TransferManager>
2346 78 : ConversationModule::dataTransfer(const std::string& conversationId) const
2347 : {
2348 78 : return pimpl_->withConversation(conversationId,
2349 153 : [](auto& conversation) { return conversation.dataTransfer(); });
2350 : }
2351 :
2352 : bool
2353 13 : ConversationModule::onFileChannelRequest(const std::string& conversationId,
2354 : const std::string& member,
2355 : const std::string& fileId,
2356 : bool verifyShaSum) const
2357 : {
2358 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2359 13 : std::lock_guard lk(conv->mtx);
2360 13 : if (conv->conversation)
2361 13 : return conv->conversation->onFileChannelRequest(member, fileId, verifyShaSum);
2362 26 : }
2363 0 : return false;
2364 : }
2365 :
2366 : bool
2367 11 : ConversationModule::downloadFile(const std::string& conversationId,
2368 : const std::string& interactionId,
2369 : const std::string& fileId,
2370 : const std::string& path,
2371 : size_t start,
2372 : size_t end)
2373 : {
2374 11 : if (auto conv = pimpl_->getConversation(conversationId)) {
2375 11 : std::lock_guard lk(conv->mtx);
2376 11 : if (conv->conversation)
2377 11 : return conv->conversation->downloadFile(interactionId, fileId, path, "", "", start, end);
2378 22 : }
2379 0 : return false;
2380 : }
2381 :
2382 : void
2383 1359 : ConversationModule::syncConversations(const std::string& peer, const std::string& deviceId)
2384 : {
2385 : // Sync conversations where peer is member
2386 1359 : std::set<std::string> toFetch;
2387 1359 : std::set<std::string> toClone;
2388 2049 : for (const auto& conv : pimpl_->getSyncedConversations()) {
2389 690 : std::lock_guard lk(conv->mtx);
2390 690 : if (conv->conversation) {
2391 658 : if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
2392 174 : toFetch.emplace(conv->info.id);
2393 : }
2394 32 : } else if (!conv->info.isRemoved()
2395 82 : && std::find(conv->info.members.begin(), conv->info.members.end(), peer)
2396 82 : != conv->info.members.end()) {
2397 : // In this case the conversation was never cloned (can be after an import)
2398 25 : toClone.emplace(conv->info.id);
2399 : }
2400 2048 : }
2401 1532 : for (const auto& cid : toFetch)
2402 174 : pimpl_->fetchNewCommits(peer, deviceId, cid);
2403 1384 : for (const auto& cid : toClone)
2404 25 : pimpl_->cloneConversation(deviceId, peer, cid);
2405 2718 : if (pimpl_->syncCnt.load() == 0)
2406 915 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_);
2407 1359 : }
2408 :
2409 : void
2410 177 : ConversationModule::onSyncData(const SyncMsg& msg,
2411 : const std::string& peerId,
2412 : const std::string& deviceId)
2413 : {
2414 177 : std::vector<std::string> toClone;
2415 284 : for (const auto& [key, convInfo] : msg.c) {
2416 107 : const auto& convId = convInfo.id;
2417 : {
2418 107 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2419 107 : pimpl_->rmConversationRequest(convId);
2420 107 : }
2421 :
2422 107 : auto conv = pimpl_->startConversation(convInfo);
2423 107 : std::unique_lock lk(conv->mtx);
2424 : // Skip outdated info
2425 107 : if (std::max(convInfo.created, convInfo.removed)
2426 107 : < std::max(conv->info.created, conv->info.removed))
2427 2 : continue;
2428 105 : if (not convInfo.isRemoved()) {
2429 : // If multi devices, it can detect a conversation that was already
2430 : // removed, so just check if the convinfo contains a removed conv
2431 94 : if (conv->info.removed) {
2432 0 : if (conv->info.removed >= convInfo.created) {
2433 : // Only reclone if re-added, else the peer is not synced yet (could be
2434 : // offline before)
2435 0 : continue;
2436 : }
2437 0 : JAMI_DEBUG("Re-add previously removed conversation {:s}", convId);
2438 : }
2439 94 : conv->info = convInfo;
2440 94 : if (!conv->conversation) {
2441 47 : if (deviceId != "") {
2442 47 : pimpl_->cloneConversation(deviceId, peerId, conv);
2443 : } else {
2444 : // In this case, information is from JAMS
2445 : // JAMS does not store the conversation itself, so we
2446 : // must use information to clone the conversation
2447 0 : addConvInfo(convInfo);
2448 0 : toClone.emplace_back(convId);
2449 : }
2450 : }
2451 : } else {
2452 11 : if (conv->conversation && !conv->conversation->isRemoving()) {
2453 3 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
2454 : convId);
2455 3 : conv->conversation->setRemovingFlag();
2456 : }
2457 11 : auto update = false;
2458 11 : if (!conv->info.removed) {
2459 2 : update = true;
2460 2 : conv->info.removed = std::time(nullptr);
2461 : }
2462 11 : if (convInfo.erased && !conv->info.erased) {
2463 1 : conv->info.erased = std::time(nullptr);
2464 1 : pimpl_->addConvInfo(conv->info);
2465 1 : pimpl_->removeRepositoryImpl(*conv, false);
2466 10 : } else if (update) {
2467 2 : pimpl_->addConvInfo(conv->info);
2468 : }
2469 : }
2470 109 : }
2471 :
2472 177 : for (const auto& cid : toClone) {
2473 0 : auto members = getConversationMembers(cid);
2474 0 : for (const auto& member : members) {
2475 0 : if (member.at("uri") != pimpl_->username_)
2476 0 : cloneConversationFrom(cid, member.at("uri"));
2477 : }
2478 0 : }
2479 :
2480 196 : for (const auto& [convId, req] : msg.cr) {
2481 19 : if (req.from == pimpl_->username_) {
2482 0 : JAMI_WARNING("Detected request from ourself, ignore {}.", convId);
2483 12 : continue;
2484 0 : }
2485 19 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
2486 19 : if (pimpl_->isConversation(convId)) {
2487 : // Already handled request
2488 0 : pimpl_->rmConversationRequest(convId);
2489 0 : continue;
2490 : }
2491 :
2492 : // New request
2493 19 : if (!pimpl_->addConversationRequest(convId, req))
2494 7 : continue;
2495 12 : lk.unlock();
2496 :
2497 12 : if (req.declined != 0) {
2498 : // Request declined
2499 15 : JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
2500 : pimpl_->accountId_,
2501 : convId,
2502 : deviceId);
2503 5 : pimpl_->syncingMetadatas_.erase(convId);
2504 5 : pimpl_->saveMetadatas();
2505 5 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2506 : convId);
2507 5 : continue;
2508 5 : }
2509 :
2510 21 : JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})",
2511 : pimpl_->accountId_,
2512 : convId,
2513 : deviceId);
2514 :
2515 7 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
2516 : convId,
2517 14 : req.toMap());
2518 19 : }
2519 :
2520 : // Updates preferences for conversations
2521 181 : for (const auto& [convId, p] : msg.p) {
2522 4 : if (auto conv = pimpl_->getConversation(convId)) {
2523 4 : std::unique_lock lk(conv->mtx);
2524 4 : if (conv->conversation) {
2525 2 : auto conversation = conv->conversation;
2526 2 : lk.unlock();
2527 2 : conversation->updatePreferences(p);
2528 4 : } else if (conv->pending) {
2529 2 : conv->pending->preferences = p;
2530 : }
2531 8 : }
2532 : }
2533 :
2534 : // Updates displayed for conversations
2535 255 : for (const auto& [convId, ms] : msg.ms) {
2536 78 : if (auto conv = pimpl_->getConversation(convId)) {
2537 78 : std::unique_lock lk(conv->mtx);
2538 77 : if (conv->conversation) {
2539 34 : auto conversation = conv->conversation;
2540 34 : lk.unlock();
2541 35 : conversation->updateMessageStatus(ms);
2542 78 : } else if (conv->pending) {
2543 41 : conv->pending->status = ms;
2544 : }
2545 156 : }
2546 : }
2547 177 : }
2548 :
2549 : bool
2550 2 : ConversationModule::needsSyncingWith(const std::string& memberUri, const std::string& deviceId) const
2551 : {
2552 : // Check if a conversation needs to fetch remote or to be cloned
2553 2 : std::lock_guard lk(pimpl_->conversationsMtx_);
2554 2 : for (const auto& [key, ci] : pimpl_->conversations_) {
2555 1 : std::lock_guard lk(ci->mtx);
2556 1 : if (ci->conversation) {
2557 0 : if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false))
2558 0 : return true;
2559 1 : } else if (!ci->info.removed
2560 3 : && std::find(ci->info.members.begin(), ci->info.members.end(), memberUri)
2561 3 : != ci->info.members.end()) {
2562 : // In this case the conversation was never cloned (can be after an import)
2563 1 : return true;
2564 : }
2565 1 : }
2566 1 : return false;
2567 2 : }
2568 :
2569 : void
2570 1109 : ConversationModule::setFetched(const std::string& conversationId,
2571 : const std::string& deviceId,
2572 : const std::string& commitId)
2573 : {
2574 1109 : if (auto conv = pimpl_->getConversation(conversationId)) {
2575 1109 : std::lock_guard lk(conv->mtx);
2576 1109 : if (conv->conversation) {
2577 1109 : bool remove = conv->conversation->isRemoving();
2578 1109 : conv->conversation->hasFetched(deviceId, commitId);
2579 1109 : if (remove)
2580 1 : pimpl_->removeRepositoryImpl(*conv, true);
2581 : }
2582 2218 : }
2583 1109 : }
2584 :
2585 : void
2586 16119 : ConversationModule::fetchNewCommits(const std::string& peer,
2587 : const std::string& deviceId,
2588 : const std::string& conversationId,
2589 : const std::string& commitId)
2590 : {
2591 16119 : pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId);
2592 16123 : }
2593 :
2594 : void
2595 137 : ConversationModule::addConversationMember(const std::string& conversationId,
2596 : const dht::InfoHash& contactUri,
2597 : bool sendRequest)
2598 : {
2599 137 : auto conv = pimpl_->getConversation(conversationId);
2600 137 : if (not conv || not conv->conversation) {
2601 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2602 0 : return;
2603 : }
2604 137 : std::unique_lock lk(conv->mtx);
2605 :
2606 137 : auto contactUriStr = contactUri.toString();
2607 137 : if (conv->conversation->isMember(contactUriStr, true)) {
2608 0 : JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUriStr, conversationId);
2609 : // Note: This should not be necessary, but if for whatever reason the other side didn't
2610 : // join we should not forbid new invites
2611 0 : auto invite = conv->conversation->generateInvitation();
2612 0 : lk.unlock();
2613 0 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2614 0 : return;
2615 0 : }
2616 :
2617 137 : conv->conversation->addMember(
2618 : contactUriStr,
2619 538 : [this, conv, conversationId, sendRequest, contactUriStr](bool ok, const std::string& commitId) {
2620 137 : if (ok) {
2621 135 : std::unique_lock lk(conv->mtx);
2622 135 : pimpl_->sendMessageNotification(*conv->conversation,
2623 : true,
2624 : commitId); // For the other members
2625 135 : if (sendRequest) {
2626 131 : auto invite = conv->conversation->generateInvitation();
2627 131 : lk.unlock();
2628 131 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2629 131 : }
2630 135 : }
2631 137 : });
2632 137 : }
2633 :
2634 : void
2635 15 : ConversationModule::removeConversationMember(const std::string& conversationId,
2636 : const dht::InfoHash& contactUri,
2637 : bool isDevice)
2638 : {
2639 15 : auto contactUriStr = contactUri.toString();
2640 15 : if (auto conv = pimpl_->getConversation(conversationId)) {
2641 15 : std::lock_guard lk(conv->mtx);
2642 15 : if (conv->conversation)
2643 15 : return conv->conversation->removeMember(
2644 28 : contactUriStr, isDevice, [this, conversationId](bool ok, const std::string& commitId) {
2645 15 : if (ok) {
2646 13 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2647 : }
2648 30 : });
2649 30 : }
2650 15 : }
2651 :
2652 : std::vector<std::map<std::string, std::string>>
2653 85 : ConversationModule::getConversationMembers(const std::string& conversationId,
2654 : bool includeBanned) const
2655 : {
2656 85 : return pimpl_->getConversationMembers(conversationId, includeBanned);
2657 : }
2658 :
2659 : uint32_t
2660 4 : ConversationModule::countInteractions(const std::string& convId,
2661 : const std::string& toId,
2662 : const std::string& fromId,
2663 : const std::string& authorUri) const
2664 : {
2665 4 : if (auto conv = pimpl_->getConversation(convId)) {
2666 4 : std::lock_guard lk(conv->mtx);
2667 4 : if (conv->conversation)
2668 4 : return conv->conversation->countInteractions(toId, fromId, authorUri);
2669 8 : }
2670 0 : return 0;
2671 : }
2672 :
2673 : void
2674 4 : ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const
2675 : {
2676 4 : if (convId.empty()) {
2677 0 : auto convs = pimpl_->getConversations();
2678 0 : if (convs.empty()) {
2679 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2680 : req,
2681 0 : pimpl_->accountId_,
2682 0 : std::string {},
2683 0 : std::vector<std::map<std::string, std::string>> {});
2684 0 : return;
2685 : }
2686 0 : auto finishedFlag = std::make_shared<std::atomic_int>(convs.size());
2687 0 : for (const auto& conv : convs) {
2688 0 : conv->search(req, filter, finishedFlag);
2689 : }
2690 4 : } else if (auto conv = pimpl_->getConversation(convId)) {
2691 4 : std::lock_guard lk(conv->mtx);
2692 4 : if (conv->conversation)
2693 4 : conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1));
2694 8 : }
2695 : }
2696 :
2697 : void
2698 9 : ConversationModule::updateConversationInfos(const std::string& conversationId,
2699 : const std::map<std::string, std::string>& infos,
2700 : bool sync)
2701 : {
2702 9 : auto conv = pimpl_->getConversation(conversationId);
2703 9 : if (not conv or not conv->conversation) {
2704 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2705 0 : return;
2706 : }
2707 9 : std::lock_guard lk(conv->mtx);
2708 9 : conv->conversation
2709 9 : ->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) {
2710 9 : if (ok && sync) {
2711 8 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2712 1 : } else if (sync)
2713 3 : JAMI_WARNING("Unable to update info on {:s}", conversationId);
2714 9 : });
2715 9 : }
2716 :
2717 : std::map<std::string, std::string>
2718 5 : ConversationModule::conversationInfos(const std::string& conversationId) const
2719 : {
2720 : {
2721 5 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2722 5 : auto itReq = pimpl_->conversationsRequests_.find(conversationId);
2723 5 : if (itReq != pimpl_->conversationsRequests_.end())
2724 1 : return itReq->second.metadatas;
2725 5 : }
2726 4 : if (auto conv = pimpl_->getConversation(conversationId)) {
2727 4 : std::lock_guard lk(conv->mtx);
2728 4 : std::map<std::string, std::string> md;
2729 : {
2730 4 : auto syncingMetadatasIt = pimpl_->syncingMetadatas_.find(conversationId);
2731 4 : if (syncingMetadatasIt != pimpl_->syncingMetadatas_.end()) {
2732 2 : if (conv->conversation) {
2733 0 : pimpl_->syncingMetadatas_.erase(syncingMetadatasIt);
2734 0 : pimpl_->saveMetadatas();
2735 : } else {
2736 2 : md = syncingMetadatasIt->second;
2737 : }
2738 : }
2739 : }
2740 4 : if (conv->conversation)
2741 2 : return conv->conversation->infos();
2742 : else
2743 2 : return md;
2744 8 : }
2745 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2746 0 : return {};
2747 : }
2748 :
2749 : void
2750 5 : ConversationModule::setConversationPreferences(const std::string& conversationId,
2751 : const std::map<std::string, std::string>& prefs)
2752 : {
2753 5 : if (auto conv = pimpl_->getConversation(conversationId)) {
2754 5 : std::unique_lock lk(conv->mtx);
2755 5 : if (not conv->conversation) {
2756 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2757 0 : return;
2758 : }
2759 5 : auto conversation = conv->conversation;
2760 5 : lk.unlock();
2761 5 : conversation->updatePreferences(prefs);
2762 5 : auto msg = std::make_shared<SyncMsg>();
2763 10 : msg->p = {{conversationId, conversation->preferences(true)}};
2764 5 : pimpl_->needsSyncingCb_(std::move(msg));
2765 10 : }
2766 : }
2767 :
2768 : std::map<std::string, std::string>
2769 14 : ConversationModule::getConversationPreferences(const std::string& conversationId,
2770 : bool includeCreated) const
2771 : {
2772 14 : if (auto conv = pimpl_->getConversation(conversationId)) {
2773 14 : std::lock_guard lk(conv->mtx);
2774 14 : if (conv->conversation)
2775 13 : return conv->conversation->preferences(includeCreated);
2776 28 : }
2777 1 : return {};
2778 : }
2779 :
2780 : std::map<std::string, std::map<std::string, std::string>>
2781 201 : ConversationModule::convPreferences() const
2782 : {
2783 201 : std::map<std::string, std::map<std::string, std::string>> p;
2784 290 : for (const auto& conv : pimpl_->getConversations()) {
2785 89 : auto prefs = conv->preferences(true);
2786 89 : if (!prefs.empty())
2787 3 : p[conv->id()] = std::move(prefs);
2788 290 : }
2789 201 : return p;
2790 0 : }
2791 :
2792 : std::vector<uint8_t>
2793 0 : ConversationModule::conversationVCard(const std::string& conversationId) const
2794 : {
2795 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2796 0 : std::lock_guard lk(conv->mtx);
2797 0 : if (conv->conversation)
2798 0 : return conv->conversation->vCard();
2799 0 : }
2800 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2801 0 : return {};
2802 : }
2803 :
2804 : bool
2805 4570 : ConversationModule::isBanned(const std::string& convId, const std::string& uri) const
2806 : {
2807 4570 : if (auto conv = pimpl_->getConversation(convId)) {
2808 4555 : std::lock_guard lk(conv->mtx);
2809 4556 : if (!conv->conversation)
2810 82 : return true;
2811 4473 : if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE)
2812 3814 : return conv->conversation->isBanned(uri);
2813 9127 : }
2814 : // If 1:1 we check the certificate status
2815 675 : std::lock_guard lk(pimpl_->conversationsMtx_);
2816 675 : return pimpl_->accountManager_->getCertificateStatus(uri) == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
2817 675 : }
2818 :
2819 : void
2820 27 : ConversationModule::removeContact(const std::string& uri, bool banned)
2821 : {
2822 : // Remove linked conversation's requests
2823 : {
2824 27 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2825 27 : auto update = false;
2826 27 : for (auto it = pimpl_->conversationsRequests_.begin();
2827 31 : it != pimpl_->conversationsRequests_.end();
2828 4 : ++it) {
2829 4 : if (it->second.from == uri && !it->second.declined) {
2830 12 : JAMI_DEBUG("Declining conversation request {:s} from {:s}", it->first, uri);
2831 4 : pimpl_->syncingMetadatas_.erase(it->first);
2832 4 : pimpl_->saveMetadatas();
2833 4 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(
2834 4 : pimpl_->accountId_, it->first);
2835 4 : update = true;
2836 4 : it->second.declined = std::time(nullptr);
2837 : }
2838 : }
2839 27 : if (update) {
2840 4 : pimpl_->saveConvRequests();
2841 4 : pimpl_->needsSyncingCb_({});
2842 : }
2843 27 : }
2844 27 : if (banned) {
2845 8 : auto conversationId = getOneToOneConversation(uri);
2846 13 : pimpl_->withConversation(conversationId, [&](auto& conv) { conv.shutdownConnections(); });
2847 8 : return; // Keep the conversation in banned model but stop connections
2848 8 : }
2849 : // Remove related conversation
2850 19 : auto isSelf = uri == pimpl_->username_;
2851 19 : std::vector<std::string> toRm;
2852 15 : auto updateClient = [&](const auto& convId) {
2853 15 : pimpl_->updateConvForContact(uri, convId, "");
2854 15 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, convId);
2855 15 : };
2856 18 : auto removeConvInfo = [&](const auto& conv, const auto& members) {
2857 1 : if ((isSelf && members.size() == 1)
2858 19 : || (!isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
2859 : // Mark the conversation as removed if it wasn't already
2860 17 : if (!conv->info.isRemoved()) {
2861 15 : conv->info.removed = std::time(nullptr);
2862 15 : updateClient(conv->info.id);
2863 15 : pimpl_->addConvInfo(conv->info);
2864 15 : return true;
2865 : }
2866 : }
2867 3 : return false;
2868 19 : };
2869 : {
2870 19 : std::lock_guard lk(pimpl_->conversationsMtx_);
2871 37 : for (auto& [convId, conv] : pimpl_->conversations_) {
2872 18 : std::lock_guard lk(conv->mtx);
2873 18 : if (conv->conversation) {
2874 : try {
2875 : // Note it's important to check getUsername(), else
2876 : // removing self can remove all conversations
2877 15 : if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) {
2878 15 : auto initMembers = conv->conversation->getInitialMembers();
2879 15 : if (removeConvInfo(conv, initMembers))
2880 14 : toRm.emplace_back(convId);
2881 15 : }
2882 0 : } catch (const std::exception& e) {
2883 0 : JAMI_WARN("%s", e.what());
2884 0 : }
2885 : } else {
2886 3 : removeConvInfo(conv, conv->info.members);
2887 : }
2888 18 : }
2889 19 : }
2890 : // Note, if we ban the device, we don't send the leave cause the other peer will just
2891 : // never got the notifications, so just erase the datas
2892 33 : for (const auto& id : toRm)
2893 14 : pimpl_->removeRepository(id, true, true);
2894 19 : }
2895 :
2896 : bool
2897 9 : ConversationModule::removeConversation(const std::string& conversationId)
2898 : {
2899 9 : return pimpl_->removeConversation(conversationId);
2900 : }
2901 :
2902 : void
2903 7 : ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId)
2904 : {
2905 7 : if (auto conv = pimpl_->getConversation(oldConvId)) {
2906 7 : std::lock_guard lk(conv->mtx);
2907 7 : if (conv->conversation) {
2908 7 : std::promise<bool> waitLoad;
2909 7 : std::future<bool> fut = waitLoad.get_future();
2910 : // we should wait for loadMessage, because it will be deleted after this.
2911 7 : conv->conversation->loadMessages(
2912 7 : [&](auto&& messages) {
2913 7 : std::reverse(messages.begin(),
2914 : messages.end()); // Log is inverted as we want to replay
2915 7 : std::lock_guard lk(pimpl_->replayMtx_);
2916 7 : pimpl_->replay_[newConvId] = std::move(messages);
2917 7 : waitLoad.set_value(true);
2918 7 : },
2919 : {});
2920 7 : fut.wait();
2921 7 : }
2922 14 : }
2923 7 : }
2924 :
2925 : bool
2926 62 : ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const
2927 : {
2928 62 : if (conversationId.empty()) {
2929 52 : std::lock_guard lk(pimpl_->conversationsMtx_);
2930 52 : return std::find_if(pimpl_->conversations_.cbegin(),
2931 52 : pimpl_->conversations_.cend(),
2932 10 : [&](const auto& conv) {
2933 10 : return conv.second->conversation
2934 10 : && conv.second->conversation->isHosting(confId);
2935 : })
2936 104 : != pimpl_->conversations_.cend();
2937 62 : } else if (auto conv = pimpl_->getConversation(conversationId)) {
2938 10 : if (conv->conversation) {
2939 10 : return conv->conversation->isHosting(confId);
2940 : }
2941 10 : }
2942 0 : return false;
2943 : }
2944 :
2945 : std::vector<std::map<std::string, std::string>>
2946 17 : ConversationModule::getActiveCalls(const std::string& conversationId) const
2947 : {
2948 34 : return pimpl_->withConversation(conversationId, [](const auto& conversation) {
2949 17 : return conversation.currentCalls();
2950 17 : });
2951 : }
2952 :
2953 : std::shared_ptr<SIPCall>
2954 22 : ConversationModule::call(const std::string& url,
2955 : const std::vector<libjami::MediaMap>& mediaList,
2956 : std::function<void(const std::string&, const DeviceId&, const std::shared_ptr<SIPCall>&)>&& cb)
2957 : {
2958 88 : std::string conversationId = "", confId = "", uri = "", deviceId = "";
2959 22 : if (url.find('/') == std::string::npos) {
2960 13 : conversationId = url;
2961 : } else {
2962 9 : auto parameters = jami::split_string(url, '/');
2963 9 : if (parameters.size() != 4) {
2964 0 : JAMI_ERROR("Incorrect url {:s}", url);
2965 0 : return {};
2966 : }
2967 9 : conversationId = parameters[0];
2968 9 : uri = parameters[1];
2969 9 : deviceId = parameters[2];
2970 9 : confId = parameters[3];
2971 9 : }
2972 :
2973 :
2974 22 : auto conv = pimpl_->getConversation(conversationId);
2975 22 : if (!conv)
2976 0 : return {};
2977 22 : std::unique_lock lk(conv->mtx);
2978 22 : if (!conv->conversation) {
2979 0 : JAMI_ERROR("Conversation {:s} not found", conversationId);
2980 0 : return {};
2981 : }
2982 :
2983 : // Check if we want to join a specific conference
2984 : // So, if confId is specified or if there is some activeCalls
2985 : // or if we are the default host.
2986 22 : auto activeCalls = conv->conversation->currentCalls();
2987 22 : auto infos = conv->conversation->infos();
2988 22 : auto itRdvAccount = infos.find("rdvAccount");
2989 22 : auto itRdvDevice = infos.find("rdvDevice");
2990 22 : auto sendCallRequest = false;
2991 22 : if (!confId.empty()) {
2992 9 : sendCallRequest = true;
2993 27 : JAMI_DEBUG("Calling self, join conference");
2994 13 : } else if (!activeCalls.empty()) {
2995 : // Else, we try to join active calls
2996 0 : sendCallRequest = true;
2997 0 : auto& ac = *activeCalls.rbegin();
2998 0 : confId = ac.at("id");
2999 0 : uri = ac.at("uri");
3000 0 : deviceId = ac.at("device");
3001 16 : } else if (itRdvAccount != infos.end() && itRdvDevice != infos.end()
3002 16 : && !itRdvAccount->second.empty()) {
3003 : // Else, creates "to" (accountId/deviceId/conversationId/confId) and ask remote host
3004 3 : sendCallRequest = true;
3005 3 : uri = itRdvAccount->second;
3006 3 : deviceId = itRdvDevice->second;
3007 3 : confId = "0";
3008 9 : JAMI_DEBUG("Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
3009 : }
3010 22 : lk.unlock();
3011 :
3012 22 : auto account = pimpl_->account_.lock();
3013 22 : std::vector<libjami::MediaMap> mediaMap = mediaList.empty()
3014 : ? MediaAttribute::mediaAttributesToMediaMaps(
3015 41 : pimpl_->account_.lock()->createDefaultMediaList(
3016 60 : pimpl_->account_.lock()->isVideoEnabled()))
3017 41 : : mediaList;
3018 :
3019 44 : if (!sendCallRequest
3020 22 : || (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_)) {
3021 11 : confId = confId == "0" ? Manager::instance().callFactory.getNewCallID() : confId;
3022 : // TODO attach host with media list
3023 11 : hostConference(conversationId, confId, "", mediaMap);
3024 11 : return {};
3025 : }
3026 :
3027 : // Else we need to create a call
3028 11 : auto& manager = Manager::instance();
3029 11 : std::shared_ptr<SIPCall> call = manager.callFactory.newSipCall(account, Call::CallType::OUTGOING, mediaMap);
3030 :
3031 11 : if (not call)
3032 0 : return {};
3033 :
3034 0 : auto callUri = fmt::format("{}/{}/{}/{}", conversationId, uri, deviceId, confId);
3035 11 : account->getIceOptions([call, accountId = account->getAccountID(), callUri, uri = std::move(uri), conversationId, deviceId, cb=std::move(cb)](auto&& opts) {
3036 11 : if (call->isIceEnabled()) {
3037 11 : if (not call->createIceMediaTransport(false)
3038 22 : or not call->initIceMediaTransport(true,
3039 : std::forward<dhtnet::IceTransportOptions>(opts))) {
3040 0 : return;
3041 : }
3042 : }
3043 33 : JAMI_DEBUG("New outgoing call with {}", uri);
3044 11 : call->setPeerNumber(uri);
3045 11 : call->setPeerUri("swarm:" + uri);
3046 :
3047 33 : JAMI_DEBUG("Calling: {:s}", callUri);
3048 11 : call->setState(Call::ConnectionState::TRYING);
3049 11 : call->setPeerNumber(callUri);
3050 11 : call->setPeerUri("rdv:" + callUri);
3051 11 : call->addStateListener([accountId, conversationId](Call::CallState call_state,
3052 : Call::ConnectionState cnx_state,
3053 : int) {
3054 63 : if (cnx_state == Call::ConnectionState::DISCONNECTED
3055 23 : && call_state == Call::CallState::MERROR) {
3056 2 : emitSignal<libjami::ConfigurationSignal::NeedsHost>(accountId, conversationId);
3057 2 : return true;
3058 : }
3059 61 : return true;
3060 : });
3061 11 : cb(callUri, DeviceId(deviceId), call);
3062 : });
3063 :
3064 11 : return call;
3065 22 : }
3066 :
3067 : void
3068 14 : ConversationModule::hostConference(const std::string& conversationId,
3069 : const std::string& confId,
3070 : const std::string& callId,
3071 : const std::vector<libjami::MediaMap>& mediaList)
3072 : {
3073 14 : auto acc = pimpl_->account_.lock();
3074 14 : if (!acc)
3075 0 : return;
3076 14 : auto conf = acc->getConference(confId);
3077 14 : auto createConf = !conf;
3078 14 : std::shared_ptr<SIPCall> call;
3079 14 : if (!callId.empty()) {
3080 3 : call = std::dynamic_pointer_cast<SIPCall>(acc->getCall(callId));
3081 3 : if (!call) {
3082 0 : JAMI_WARNING("No call with id {} found", callId);
3083 0 : return;
3084 : }
3085 : }
3086 14 : if (createConf) {
3087 14 : conf = std::make_shared<Conference>(acc, confId);
3088 14 : acc->attach(conf);
3089 : }
3090 :
3091 14 : if (!callId.empty())
3092 3 : conf->addSubCall(callId);
3093 :
3094 14 : if (callId.empty())
3095 11 : conf->attachHost(mediaList);
3096 :
3097 14 : if (createConf) {
3098 14 : emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(), conversationId, conf->getConfId());
3099 : } else {
3100 0 : conf->reportMediaNegotiationStatus();
3101 0 : emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(),
3102 0 : conf->getConfId(),
3103 : conf->getStateStr());
3104 0 : return;
3105 : }
3106 :
3107 14 : auto conv = pimpl_->getConversation(conversationId);
3108 14 : if (!conv)
3109 0 : return;
3110 14 : std::unique_lock lk(conv->mtx);
3111 14 : if (!conv->conversation) {
3112 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3113 0 : return;
3114 : }
3115 : // Add commit to conversation
3116 14 : Json::Value value;
3117 14 : value["uri"] = pimpl_->username_;
3118 14 : value["device"] = pimpl_->deviceId_;
3119 14 : value["confId"] = conf->getConfId();
3120 14 : value["type"] = "application/call-history+json";
3121 28 : conv->conversation->hostConference(std::move(value),
3122 14 : [w = pimpl_->weak(),
3123 : conversationId](bool ok, const std::string& commitId) {
3124 14 : if (ok) {
3125 14 : if (auto shared = w.lock())
3126 14 : shared->sendMessageNotification(conversationId,
3127 : true,
3128 14 : commitId);
3129 : } else {
3130 0 : JAMI_ERR("Failed to send message to conversation %s",
3131 : conversationId.c_str());
3132 : }
3133 14 : });
3134 :
3135 : // When conf finished = remove host & commit
3136 : // Master call, so when it's stopped, the conference will be stopped (as we use the hold
3137 : // state for detaching the call)
3138 42 : conf->onShutdown(
3139 28 : [w = pimpl_->weak(), accountUri = pimpl_->username_, confId=conf->getConfId(), conversationId, conv](
3140 : int duration) {
3141 14 : auto shared = w.lock();
3142 14 : if (shared) {
3143 11 : Json::Value value;
3144 11 : value["uri"] = accountUri;
3145 11 : value["device"] = shared->deviceId_;
3146 11 : value["confId"] = confId;
3147 11 : value["type"] = "application/call-history+json";
3148 11 : value["duration"] = std::to_string(duration);
3149 :
3150 11 : std::lock_guard lk(conv->mtx);
3151 11 : if (!conv->conversation) {
3152 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3153 0 : return;
3154 : }
3155 33 : conv->conversation->removeActiveConference(
3156 22 : std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
3157 11 : if (ok) {
3158 11 : if (auto shared = w.lock()) {
3159 11 : shared->sendMessageNotification(conversationId, true, commitId);
3160 11 : }
3161 : } else {
3162 0 : JAMI_ERROR("Failed to send message to conversation {}", conversationId);
3163 : }
3164 11 : });
3165 11 : }
3166 14 : });
3167 14 : }
3168 :
3169 : std::map<std::string, ConvInfo>
3170 890 : ConversationModule::convInfos(const std::string& accountId)
3171 : {
3172 1780 : return convInfosFromPath(fileutils::get_data_dir() / accountId);
3173 : }
3174 :
3175 : std::map<std::string, ConvInfo>
3176 931 : ConversationModule::convInfosFromPath(const std::filesystem::path& path)
3177 : {
3178 931 : std::map<std::string, ConvInfo> convInfos;
3179 : try {
3180 : // read file
3181 1862 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
3182 931 : auto file = fileutils::loadFile("convInfo", path);
3183 : // load values
3184 931 : msgpack::unpacked result;
3185 931 : msgpack::unpack(result, (const char*) file.data(), file.size());
3186 926 : result.get().convert(convInfos);
3187 946 : } catch (const std::exception& e) {
3188 5 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3189 5 : }
3190 931 : return convInfos;
3191 0 : }
3192 :
3193 : std::map<std::string, ConversationRequest>
3194 878 : ConversationModule::convRequests(const std::string& accountId)
3195 : {
3196 878 : auto path = fileutils::get_data_dir() / accountId;
3197 1756 : return convRequestsFromPath(path.string());
3198 878 : }
3199 :
3200 : std::map<std::string, ConversationRequest>
3201 919 : ConversationModule::convRequestsFromPath(const std::filesystem::path& path)
3202 : {
3203 919 : std::map<std::string, ConversationRequest> convRequests;
3204 : try {
3205 : // read file
3206 1837 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convRequests"));
3207 919 : auto file = fileutils::loadFile("convRequests", path);
3208 : // load values
3209 919 : msgpack::unpacked result;
3210 919 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
3211 919 : result.get().convert(convRequests);
3212 919 : } catch (const std::exception& e) {
3213 0 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3214 0 : }
3215 918 : return convRequests;
3216 0 : }
3217 :
3218 : void
3219 181 : ConversationModule::addConvInfo(const ConvInfo& info)
3220 : {
3221 181 : pimpl_->addConvInfo(info);
3222 181 : }
3223 :
3224 : void
3225 2181 : ConversationModule::Impl::setConversationMembers(const std::string& convId,
3226 : const std::set<std::string>& members)
3227 : {
3228 2181 : if (auto conv = getConversation(convId)) {
3229 2181 : std::lock_guard lk(conv->mtx);
3230 2181 : conv->info.members = members;
3231 2181 : addConvInfo(conv->info);
3232 4362 : }
3233 2181 : }
3234 :
3235 : std::shared_ptr<Conversation>
3236 0 : ConversationModule::getConversation(const std::string& convId)
3237 : {
3238 0 : if (auto conv = pimpl_->getConversation(convId)) {
3239 0 : std::lock_guard lk(conv->mtx);
3240 0 : return conv->conversation;
3241 0 : }
3242 0 : return nullptr;
3243 : }
3244 :
3245 : std::shared_ptr<dhtnet::ChannelSocket>
3246 5509 : ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const
3247 : {
3248 5509 : if (auto conv = pimpl_->getConversation(convId)) {
3249 5507 : std::lock_guard lk(conv->mtx);
3250 5506 : if (conv->conversation)
3251 10223 : return conv->conversation->gitSocket(DeviceId(deviceId));
3252 396 : else if (conv->pending)
3253 396 : return conv->pending->socket;
3254 11017 : }
3255 1 : return nullptr;
3256 : }
3257 :
3258 : void
3259 0 : ConversationModule::addGitSocket(std::string_view deviceId,
3260 : std::string_view convId,
3261 : const std::shared_ptr<dhtnet::ChannelSocket>& channel)
3262 : {
3263 0 : if (auto conv = pimpl_->getConversation(convId)) {
3264 0 : std::lock_guard lk(conv->mtx);
3265 0 : conv->conversation->addGitSocket(DeviceId(deviceId), channel);
3266 0 : } else
3267 0 : JAMI_WARNING("addGitSocket: Unable to find conversation {:s}", convId);
3268 0 : }
3269 :
3270 : void
3271 844 : ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
3272 : {
3273 1667 : pimpl_->withConversation(convId, [&](auto& conv) { conv.removeGitSocket(DeviceId(deviceId)); });
3274 844 : }
3275 :
3276 : void
3277 724 : ConversationModule::shutdownConnections()
3278 : {
3279 1146 : for (const auto& c : pimpl_->getSyncedConversations()) {
3280 422 : std::lock_guard lkc(c->mtx);
3281 422 : if (c->conversation)
3282 389 : c->conversation->shutdownConnections();
3283 422 : if (c->pending)
3284 22 : c->pending->socket = {};
3285 1146 : }
3286 724 : }
3287 : void
3288 999 : ConversationModule::addSwarmChannel(const std::string& conversationId,
3289 : std::shared_ptr<dhtnet::ChannelSocket> channel)
3290 : {
3291 999 : pimpl_->withConversation(conversationId,
3292 992 : [&](auto& conv) { conv.addSwarmChannel(std::move(channel)); });
3293 1000 : }
3294 :
3295 : void
3296 0 : ConversationModule::connectivityChanged()
3297 : {
3298 0 : for (const auto& conv : pimpl_->getConversations())
3299 0 : conv->connectivityChanged();
3300 0 : }
3301 :
3302 : std::shared_ptr<Typers>
3303 9 : ConversationModule::getTypers(const std::string& convId)
3304 : {
3305 9 : if (auto c = pimpl_->getConversation(convId)) {
3306 9 : std::lock_guard lk(c->mtx);
3307 9 : if (c->conversation)
3308 9 : return c->conversation->typers();
3309 18 : }
3310 0 : return nullptr;
3311 : }
3312 :
3313 : } // namespace jami
|