Line data Source code
1 : /*
2 : * Copyright (C) 2021-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "conversation_module.h"
22 :
23 : #include <algorithm>
24 : #include <fstream>
25 :
26 : #include <opendht/thread_pool.h>
27 :
28 : #include "account_const.h"
29 : #include "call.h"
30 : #include "client/ring_signal.h"
31 : #include "fileutils.h"
32 : #include "jamidht/account_manager.h"
33 : #include "jamidht/jamiaccount.h"
34 : #include "manager.h"
35 : #include "sip/sipcall.h"
36 : #include "vcard.h"
37 :
38 : namespace jami {
39 :
40 : using ConvInfoMap = std::map<std::string, ConvInfo>;
41 :
42 : struct PendingConversationFetch
43 : {
44 : bool ready {false};
45 : bool cloning {false};
46 : std::string deviceId {};
47 : std::string removeId {};
48 : std::map<std::string, std::string> preferences {};
49 : std::map<std::string, std::map<std::string, std::string>> status {};
50 : std::set<std::string> connectingTo {};
51 : std::shared_ptr<dhtnet::ChannelSocket> socket {};
52 : };
53 :
54 : struct SyncedConversation
55 : {
56 : std::mutex mtx;
57 : ConvInfo info;
58 : std::unique_ptr<PendingConversationFetch> pending;
59 : std::shared_ptr<Conversation> conversation;
60 :
61 266 : SyncedConversation(const std::string& convId)
62 266 : : info {convId}
63 266 : {}
64 22 : SyncedConversation(const ConvInfo& info)
65 22 : : info {info}
66 22 : {}
67 :
68 3484 : bool startFetch(const std::string& deviceId, bool checkIfConv = false)
69 : {
70 : // conversation mtx must be locked
71 3484 : if (checkIfConv && conversation)
72 0 : return false; // Already a conversation
73 3484 : if (pending) {
74 1520 : if (pending->ready)
75 3 : return false; // Already doing stuff
76 : // if (pending->deviceId == deviceId)
77 : // return false; // Already fetching
78 1517 : if (pending->connectingTo.find(deviceId) != pending->connectingTo.end())
79 811 : return false; // Already connecting to this device
80 : } else {
81 1963 : pending = std::make_unique<PendingConversationFetch>();
82 1964 : pending->connectingTo.insert(deviceId);
83 1964 : return true;
84 : }
85 706 : return true;
86 : }
87 :
88 1902 : void stopFetch(const std::string& deviceId)
89 : {
90 : // conversation mtx must be locked
91 1902 : if (!pending)
92 109 : return;
93 1793 : pending->connectingTo.erase(deviceId);
94 1793 : if (pending->connectingTo.empty())
95 1336 : pending.reset();
96 : }
97 :
98 89 : std::vector<std::map<std::string, std::string>> getMembers(bool includeLeft,
99 : bool includeBanned) const
100 : {
101 : // conversation mtx must be locked
102 89 : if (conversation)
103 83 : return conversation->getMembers(true, includeLeft, includeBanned);
104 : // If we're cloning, we can return the initial members
105 6 : std::vector<std::map<std::string, std::string>> result;
106 6 : result.reserve(info.members.size());
107 17 : for (const auto& uri : info.members) {
108 22 : result.emplace_back(std::map<std::string, std::string> {{"uri", uri}});
109 : }
110 6 : return result;
111 6 : }
112 : };
113 :
114 : class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
115 : {
116 : public:
117 : Impl(std::weak_ptr<JamiAccount>&& account,
118 : NeedsSyncingCb&& needsSyncingCb,
119 : SengMsgCb&& sendMsgCb,
120 : NeedSocketCb&& onNeedSocket,
121 : NeedSocketCb&& onNeedSwarmSocket,
122 : UpdateConvReq&& updateConvReqCb,
123 : OneToOneRecvCb&& oneToOneRecvCb);
124 :
125 : template<typename S, typename T>
126 89 : inline auto withConv(const S& convId, T&& cb) const
127 : {
128 178 : if (auto conv = getConversation(convId)) {
129 89 : std::lock_guard lk(conv->mtx);
130 89 : return cb(*conv);
131 89 : } else {
132 0 : JAMI_WARNING("Conversation {} not found", convId);
133 : }
134 0 : return decltype(cb(std::declval<SyncedConversation&>()))();
135 : }
136 : template<typename S, typename T>
137 2167 : inline auto withConversation(const S& convId, T&& cb)
138 : {
139 4336 : if (auto conv = getConversation(convId)) {
140 2163 : std::lock_guard lk(conv->mtx);
141 2162 : if (conv->conversation)
142 2142 : return cb(*conv->conversation);
143 2163 : } else {
144 18 : JAMI_WARNING("Conversation {} not found", convId);
145 : }
146 26 : return decltype(cb(std::declval<Conversation&>()))();
147 : }
148 :
149 : // Retrieving recent commits
150 : /**
151 : * Clone a conversation (initial) from device
152 : * @param deviceId
153 : * @param convId
154 : */
155 : void cloneConversation(const std::string& deviceId,
156 : const std::string& peer,
157 : const std::string& convId);
158 : void cloneConversation(const std::string& deviceId,
159 : const std::string& peer,
160 : const std::shared_ptr<SyncedConversation>& conv);
161 :
162 : /**
163 : * Pull remote device
164 : * @param peer Contact URI
165 : * @param deviceId Contact's device
166 : * @param conversationId
167 : * @param commitId (optional)
168 : */
169 : void fetchNewCommits(const std::string& peer,
170 : const std::string& deviceId,
171 : const std::string& conversationId,
172 : const std::string& commitId = "");
173 : /**
174 : * Handle events to receive new commits
175 : */
176 : void handlePendingConversation(const std::string& conversationId, const std::string& deviceId);
177 :
178 : // Requests
179 : std::optional<ConversationRequest> getRequest(const std::string& id) const;
180 :
181 : // Conversations
182 : /**
183 : * Get members
184 : * @param conversationId
185 : * @param includeBanned
186 : * @return a map of members with their role and details
187 : */
188 : std::vector<std::map<std::string, std::string>> getConversationMembers(
189 : const std::string& conversationId, bool includeBanned = false) const;
190 : void setConversationMembers(const std::string& convId, const std::set<std::string>& members);
191 :
192 : /**
193 : * Remove a repository and all files
194 : * @param convId
195 : * @param sync If we send an update to other account's devices
196 : * @param force True if ignore the removing flag
197 : */
198 : void removeRepository(const std::string& convId, bool sync, bool force = false);
199 : void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false);
200 : /**
201 : * Remove a conversation
202 : * @param conversationId
203 : */
204 : bool removeConversation(const std::string& conversationId);
205 : bool removeConversationImpl(SyncedConversation& conv);
206 :
207 : /**
208 : * Send a message notification to all members
209 : * @param conversation
210 : * @param commit
211 : * @param sync If we send an update to other account's devices
212 : * @param deviceId If we need to filter a specific device
213 : */
214 : void sendMessageNotification(const std::string& conversationId,
215 : bool sync,
216 : const std::string& commitId = "",
217 : const std::string& deviceId = "");
218 : void sendMessageNotification(Conversation& conversation,
219 : bool sync,
220 : const std::string& commitId = "",
221 : const std::string& deviceId = "");
222 :
223 : /**
224 : * @return if a convId is a valid conversation (repository cloned & usable)
225 : */
226 712 : bool isConversation(const std::string& convId) const
227 : {
228 712 : std::lock_guard lk(conversationsMtx_);
229 712 : auto c = conversations_.find(convId);
230 1424 : return c != conversations_.end() && c->second;
231 712 : }
232 :
233 1232 : void addConvInfo(const ConvInfo& info)
234 : {
235 1232 : std::lock_guard lk(convInfosMtx_);
236 1232 : convInfos_[info.id] = info;
237 1231 : saveConvInfos();
238 1232 : }
239 :
240 : std::string getOneToOneConversation(const std::string& uri) const noexcept;
241 :
242 89 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const
243 : {
244 89 : std::lock_guard lk(conversationsMtx_);
245 89 : auto c = conversations_.find(convId);
246 178 : return c != conversations_.end() ? c->second : nullptr;
247 89 : }
248 42495 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId)
249 : {
250 42495 : std::lock_guard lk(conversationsMtx_);
251 42497 : auto c = conversations_.find(convId);
252 84991 : return c != conversations_.end() ? c->second : nullptr;
253 42493 : }
254 290 : std::shared_ptr<SyncedConversation> startConversation(const std::string& convId)
255 : {
256 290 : std::lock_guard lk(conversationsMtx_);
257 290 : auto& c = conversations_[convId];
258 290 : if (!c)
259 259 : c = std::make_shared<SyncedConversation>(convId);
260 580 : return c;
261 290 : }
262 106 : std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info)
263 : {
264 106 : std::lock_guard lk(conversationsMtx_);
265 106 : auto& c = conversations_[info.id];
266 106 : if (!c)
267 10 : c = std::make_shared<SyncedConversation>(info);
268 212 : return c;
269 106 : }
270 :
271 : // Message send/load
272 : void sendMessage(const std::string& conversationId,
273 : Json::Value&& value,
274 : const std::string& replyTo = "",
275 : bool announce = true,
276 : OnCommitCb&& onCommit = {},
277 : OnDoneCb&& cb = {});
278 :
279 : void sendMessage(const std::string& conversationId,
280 : std::string message,
281 : const std::string& replyTo = "",
282 : const std::string& type = "text/plain",
283 : bool announce = true,
284 : OnCommitCb&& onCommit = {},
285 : OnDoneCb&& cb = {});
286 :
287 : void editMessage(const std::string& conversationId,
288 : const std::string& newBody,
289 : const std::string& editedId);
290 :
291 : void bootstrapCb(std::string convId);
292 :
293 : // The following methods modify what is stored on the disk
294 : /**
295 : * @note convInfosMtx_ should be locked
296 : */
297 1693 : void saveConvInfos() const { ConversationModule::saveConvInfos(accountId_, convInfos_); }
298 : /**
299 : * @note conversationsRequestsMtx_ should be locked
300 : */
301 413 : void saveConvRequests() const
302 : {
303 413 : ConversationModule::saveConvRequests(accountId_, conversationsRequests_);
304 413 : }
305 : void declineOtherConversationWith(const std::string& uri) noexcept;
306 179 : bool addConversationRequest(const std::string& id, const ConversationRequest& req)
307 : {
308 : // conversationsRequestsMtx_ MUST BE LOCKED
309 179 : if (isConversation(id))
310 0 : return false;
311 179 : auto it = conversationsRequests_.find(id);
312 179 : if (it != conversationsRequests_.end()) {
313 : // We only remove requests (if accepted) or change .declined
314 21 : if (!req.declined)
315 16 : return false;
316 158 : } else if (req.isOneToOne()) {
317 : // Check that we're not adding a second one to one trust request
318 : // NOTE: If a new one to one request is received, we can decline the previous one.
319 54 : declineOtherConversationWith(req.from);
320 : }
321 489 : JAMI_DEBUG("Adding conversation request from {} ({})", req.from, id);
322 163 : conversationsRequests_[id] = req;
323 163 : saveConvRequests();
324 163 : return true;
325 : }
326 239 : void rmConversationRequest(const std::string& id)
327 : {
328 : // conversationsRequestsMtx_ MUST BE LOCKED
329 239 : auto it = conversationsRequests_.find(id);
330 240 : if (it != conversationsRequests_.end()) {
331 138 : auto& md = syncingMetadatas_[id];
332 138 : md = it->second.metadatas;
333 138 : md["syncing"] = "true";
334 138 : md["created"] = std::to_string(it->second.received);
335 : }
336 240 : saveMetadatas();
337 240 : conversationsRequests_.erase(id);
338 240 : saveConvRequests();
339 240 : }
340 :
341 : std::weak_ptr<JamiAccount> account_;
342 : NeedsSyncingCb needsSyncingCb_;
343 : SengMsgCb sendMsgCb_;
344 : NeedSocketCb onNeedSocket_;
345 : NeedSocketCb onNeedSwarmSocket_;
346 : UpdateConvReq updateConvReqCb_;
347 : OneToOneRecvCb oneToOneRecvCb_;
348 :
349 : std::string accountId_ {};
350 : std::string deviceId_ {};
351 : std::string username_ {};
352 :
353 : // Requests
354 : mutable std::mutex conversationsRequestsMtx_;
355 : std::map<std::string, ConversationRequest> conversationsRequests_;
356 :
357 : // Conversations
358 : mutable std::mutex conversationsMtx_ {};
359 : std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_;
360 :
361 : // The following informations are stored on the disk
362 : mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed
363 : std::map<std::string, ConvInfo> convInfos_;
364 :
365 : // When sending a new message, we need to send the notification to some peers of the
366 : // conversation However, the conversation may be not bootstraped, so the list will be empty.
367 : // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync
368 : // with.
369 : std::mutex notSyncedNotificationMtx_;
370 : std::map<std::string, std::string> notSyncedNotification_;
371 :
372 3632 : std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
373 :
374 : // Replay conversations (after erasing/re-adding)
375 : std::mutex replayMtx_;
376 : std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_;
377 : std::map<std::string, uint64_t> refreshMessage;
378 : std::atomic_int syncCnt {0};
379 :
380 : #ifdef LIBJAMI_TESTABLE
381 : std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_;
382 : #endif
383 :
384 : void fixStructures(
385 : std::shared_ptr<JamiAccount> account,
386 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
387 : const std::set<std::string>& toRm);
388 :
389 : void cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
390 : const std::string& deviceId,
391 : const std::string& oldConvId = "");
392 : void bootstrap(const std::string& convId);
393 : void cloneConversationFrom(const std::string& conversationId,
394 : const std::string& uri,
395 : const std::string& oldConvId = "");
396 :
397 : // While syncing, we do not want to lose metadata (avatar/title and mode)
398 : std::map<std::string, std::map<std::string, std::string>> syncingMetadatas_;
399 401 : void saveMetadatas()
400 : {
401 401 : auto path = fileutils::get_data_dir() / accountId_;
402 801 : std::ofstream file(path / "syncingMetadatas", std::ios::trunc | std::ios::binary);
403 401 : msgpack::pack(file, syncingMetadatas_);
404 400 : }
405 :
406 456 : void loadMetadatas()
407 : {
408 : try {
409 : // read file
410 456 : auto path = fileutils::get_data_dir() / accountId_;
411 912 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
412 912 : auto file = fileutils::loadFile("syncingMetadatas", path);
413 : // load values
414 0 : msgpack::unpacked result;
415 0 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
416 0 : result.get().convert(syncingMetadatas_);
417 1368 : } catch (const std::exception& e) {
418 1368 : JAMI_WARNING("[ConversationModule] error loading syncingMetadatas_: {}", e.what());
419 456 : }
420 456 : }
421 : };
422 :
423 456 : ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account,
424 : NeedsSyncingCb&& needsSyncingCb,
425 : SengMsgCb&& sendMsgCb,
426 : NeedSocketCb&& onNeedSocket,
427 : NeedSocketCb&& onNeedSwarmSocket,
428 : UpdateConvReq&& updateConvReqCb,
429 456 : OneToOneRecvCb&& oneToOneRecvCb)
430 456 : : account_(account)
431 456 : , needsSyncingCb_(needsSyncingCb)
432 456 : , sendMsgCb_(sendMsgCb)
433 456 : , onNeedSocket_(onNeedSocket)
434 456 : , onNeedSwarmSocket_(onNeedSwarmSocket)
435 456 : , updateConvReqCb_(updateConvReqCb)
436 912 : , oneToOneRecvCb_(oneToOneRecvCb)
437 : {
438 456 : if (auto shared = account.lock()) {
439 456 : accountId_ = shared->getAccountID();
440 456 : deviceId_ = shared->currentDeviceId();
441 456 : if (auto accm = shared->accountManager())
442 456 : if (const auto* info = accm->getInfo())
443 456 : username_ = info->accountId;
444 456 : }
445 456 : conversationsRequests_ = convRequests(accountId_);
446 456 : loadMetadatas();
447 456 : }
448 :
449 : void
450 22 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
451 : const std::string& peerUri,
452 : const std::string& convId)
453 : {
454 66 : JAMI_DEBUG("[Account {}] Clone conversation on device {}", accountId_, deviceId);
455 :
456 22 : auto conv = startConversation(convId);
457 22 : std::unique_lock lk(conv->mtx);
458 22 : cloneConversation(deviceId, peerUri, conv);
459 22 : }
460 :
461 : void
462 72 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
463 : const std::string& peerUri,
464 : const std::shared_ptr<SyncedConversation>& conv)
465 : {
466 : // conv->mtx must be locked
467 72 : if (!conv->conversation) {
468 : // Note: here we don't return and connect to all members
469 : // the first that will successfully connect will be used for
470 : // cloning.
471 : // This avoid the case when we try to clone from convInfos + sync message
472 : // at the same time.
473 72 : if (!conv->startFetch(deviceId, true)) {
474 153 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conv->info.id);
475 51 : addConvInfo(conv->info);
476 51 : return;
477 : }
478 42 : onNeedSocket_(
479 21 : conv->info.id,
480 : deviceId,
481 21 : [=](const auto& channel) {
482 21 : std::lock_guard lk(conv->mtx);
483 21 : if (conv->pending && !conv->pending->ready) {
484 21 : if (channel) {
485 21 : conv->pending->ready = true;
486 21 : conv->pending->deviceId = channel->deviceId().toString();
487 21 : conv->pending->socket = channel;
488 21 : if (!conv->pending->cloning) {
489 21 : conv->pending->cloning = true;
490 63 : dht::ThreadPool::io().run([w = weak(),
491 21 : convId = conv->info.id,
492 21 : deviceId = conv->pending->deviceId]() {
493 42 : if (auto sthis = w.lock())
494 21 : sthis->handlePendingConversation(convId, deviceId);
495 : });
496 : }
497 21 : return true;
498 : } else {
499 0 : conv->stopFetch(deviceId);
500 : }
501 : }
502 0 : return false;
503 21 : },
504 : MIME_TYPE_GIT);
505 :
506 63 : JAMI_LOG("[Account {}] New conversation detected: {}. Ask device {} to clone it",
507 : accountId_,
508 : conv->info.id,
509 : deviceId);
510 21 : conv->info.members.emplace(username_);
511 21 : conv->info.members.emplace(peerUri);
512 21 : addConvInfo(conv->info);
513 : } else {
514 0 : JAMI_DEBUG("[Account {}] Already have conversation {}", accountId_, conv->info.id);
515 : }
516 : }
517 :
518 : void
519 27723 : ConversationModule::Impl::fetchNewCommits(const std::string& peer,
520 : const std::string& deviceId,
521 : const std::string& conversationId,
522 : const std::string& commitId)
523 : {
524 : {
525 27723 : std::lock_guard lk(convInfosMtx_);
526 27722 : auto itConv = convInfos_.find(conversationId);
527 27725 : if (itConv != convInfos_.end() && itConv->second.isRemoved()) {
528 : // If the conversation is removed and we receives a new commit,
529 : // it means that the contact was removed but not banned.
530 : // If he wants a new conversation, they must removes/re-add the contact who declined.
531 9 : JAMI_WARNING("[Account {:s}] Received a commit for {}, but conversation is removed",
532 : accountId_,
533 : conversationId);
534 3 : return;
535 : }
536 27722 : }
537 27721 : std::optional<ConversationRequest> oldReq;
538 : {
539 27721 : std::lock_guard lk(conversationsRequestsMtx_);
540 27723 : oldReq = getRequest(conversationId);
541 27719 : if (oldReq != std::nullopt && oldReq->declined) {
542 0 : JAMI_DEBUG("[Account {}] Received a request for a conversation already declined.",
543 : accountId_);
544 0 : return;
545 : }
546 27721 : }
547 83161 : JAMI_DEBUG("[Account {:s}] fetch commits from {:s}, for {:s}, commit {:s}",
548 : accountId_,
549 : peer,
550 : conversationId,
551 : commitId);
552 :
553 27724 : auto conv = getConversation(conversationId);
554 27724 : if (!conv) {
555 1212 : JAMI_WARNING("[Account {}] Could not find conversation {}, ask for an invite",
556 : accountId_,
557 : conversationId);
558 404 : sendMsgCb_(peer,
559 : {},
560 1212 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
561 : 0);
562 404 : return;
563 : }
564 27320 : std::unique_lock lk(conv->mtx);
565 :
566 27318 : if (conv->conversation) {
567 : // Check if we already have the commit
568 27302 : if (not commitId.empty() && conv->conversation->getCommit(commitId) != std::nullopt) {
569 24784 : return;
570 : }
571 3275 : if (conv->conversation->isRemoving()) {
572 0 : JAMI_WARNING("[Account {}] Conversation {} is being removed",
573 : accountId_,
574 : conversationId);
575 0 : return;
576 : }
577 3275 : if (!conv->conversation->isMember(peer, true)) {
578 3 : JAMI_WARNING("[Account {}] {} is not a member of {}", accountId_, peer, conversationId);
579 1 : return;
580 : }
581 3273 : if (conv->conversation->isBanned(deviceId)) {
582 0 : JAMI_WARNING("[Account {}] {} is a banned device in conversation {}",
583 : accountId_,
584 : deviceId,
585 : conversationId);
586 0 : return;
587 : }
588 :
589 : // Retrieve current last message
590 3273 : auto lastMessageId = conv->conversation->lastCommitId();
591 3273 : if (lastMessageId.empty()) {
592 0 : JAMI_ERROR("[Account {}] No message detected. This is a bug", accountId_);
593 0 : return;
594 : }
595 :
596 3274 : if (!conv->startFetch(deviceId)) {
597 2274 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
598 758 : return;
599 : }
600 :
601 2513 : syncCnt.fetch_add(1);
602 12572 : onNeedSocket_(
603 : conversationId,
604 : deviceId,
605 4388 : [w = weak(),
606 : conv,
607 2515 : conversationId = std::move(conversationId),
608 2516 : peer = std::move(peer),
609 2516 : deviceId = std::move(deviceId),
610 2514 : commitId = std::move(commitId)](const auto& channel) {
611 4388 : auto sthis = w.lock();
612 4387 : auto acc = sthis ? sthis->account_.lock() : nullptr;
613 4387 : std::unique_lock lk(conv->mtx);
614 4386 : auto conversation = conv->conversation;
615 4387 : if (!channel || !acc || !conversation) {
616 1896 : conv->stopFetch(deviceId);
617 1896 : if (sthis)
618 1896 : sthis->syncCnt.fetch_sub(1);
619 1896 : return false;
620 : }
621 2492 : conversation->addGitSocket(channel->deviceId(), channel);
622 2492 : lk.unlock();
623 14946 : conversation->sync(
624 2491 : peer,
625 2491 : deviceId,
626 2492 : [w,
627 2492 : conv,
628 2491 : conversationId = std::move(conversationId),
629 2490 : peer = std::move(peer),
630 2490 : deviceId = std::move(deviceId),
631 2491 : commitId = std::move(commitId)](bool ok) {
632 2492 : auto shared = w.lock();
633 2491 : if (!shared)
634 0 : return;
635 2492 : if (!ok) {
636 123 : JAMI_WARNING("[Account {}] Could not fetch new commit from "
637 : "{} for {}, other "
638 : "peer may be disconnected",
639 : shared->accountId_,
640 : deviceId,
641 : conversationId);
642 123 : JAMI_LOG("[Account {}] Relaunch sync with {} for {}",
643 : shared->accountId_,
644 : deviceId,
645 : conversationId);
646 : }
647 :
648 : {
649 2492 : std::lock_guard lk(conv->mtx);
650 2492 : conv->pending.reset();
651 : // Notify peers that a new commit is there (DRT)
652 2492 : if (not commitId.empty() && ok) {
653 4766 : shared->sendMessageNotification(*conv->conversation,
654 : false,
655 2383 : commitId,
656 2383 : deviceId);
657 : }
658 2492 : }
659 4984 : if (shared->syncCnt.fetch_sub(1) == 1) {
660 330 : if (auto account = shared->account_.lock())
661 165 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(
662 165 : account->getAccountID().c_str());
663 : }
664 2492 : },
665 2492 : commitId);
666 2492 : return true;
667 4388 : },
668 : "");
669 3274 : } else {
670 17 : if (oldReq != std::nullopt)
671 0 : return;
672 17 : if (conv->pending)
673 17 : return;
674 0 : bool clone = !conv->info.isRemoved();
675 0 : if (clone) {
676 0 : cloneConversation(deviceId, peer, conv);
677 0 : return;
678 : }
679 0 : lk.unlock();
680 0 : JAMI_WARNING("[Account {}] Could not find conversation {}, ask for an invite",
681 : accountId_,
682 : conversationId);
683 0 : sendMsgCb_(peer,
684 : {},
685 0 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
686 : 0);
687 : }
688 77729 : }
689 :
690 : // Clone and store conversation
691 : void
692 147 : ConversationModule::Impl::handlePendingConversation(const std::string& conversationId,
693 : const std::string& deviceId)
694 : {
695 147 : auto acc = account_.lock();
696 147 : if (!acc)
697 0 : return;
698 147 : std::vector<DeviceId> kd;
699 326 : for (const auto& [id, _] : acc->getKnownDevices())
700 326 : kd.emplace_back(id);
701 :
702 147 : auto conv = getConversation(conversationId);
703 147 : if (!conv)
704 0 : return;
705 147 : std::unique_lock lk(conv->mtx, std::defer_lock);
706 291 : auto erasePending = [&] {
707 291 : std::string toRm;
708 291 : if (conv->pending && !conv->pending->removeId.empty())
709 5 : toRm = std::move(conv->pending->removeId);
710 291 : conv->pending.reset();
711 291 : lk.unlock();
712 291 : if (!toRm.empty())
713 5 : removeConversation(toRm);
714 291 : };
715 : try {
716 147 : auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
717 144 : conversation->onMembersChanged([this, conversationId](const auto& members) {
718 625 : setConversationMembers(conversationId, members);
719 625 : });
720 144 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
721 462 : auto msg = std::make_shared<SyncMsg>();
722 924 : msg->ms = {{conversationId, status}};
723 462 : needsSyncingCb_(std::move(msg));
724 462 : });
725 144 : conversation->onNeedSocket(onNeedSwarmSocket_);
726 144 : if (!conversation->isMember(username_, true)) {
727 0 : JAMI_ERR("Conversation cloned but doesn't seems to be a valid member");
728 0 : conversation->erase();
729 0 : lk.lock();
730 0 : erasePending();
731 0 : return;
732 : }
733 :
734 144 : lk.lock();
735 :
736 144 : if (conv->pending && conv->pending->socket)
737 144 : conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket));
738 144 : auto removeRepo = false;
739 : // Note: a removeContact while cloning. In this case, the conversation
740 : // must not be announced and removed.
741 144 : if (conv->info.isRemoved())
742 0 : removeRepo = true;
743 144 : std::map<std::string, std::string> preferences;
744 144 : std::map<std::string, std::map<std::string, std::string>> status;
745 144 : if (conv->pending) {
746 144 : preferences = std::move(conv->pending->preferences);
747 144 : status = std::move(conv->pending->status);
748 : }
749 144 : conv->conversation = conversation;
750 144 : if (removeRepo) {
751 0 : removeRepositoryImpl(*conv, false, true);
752 0 : erasePending();
753 0 : return;
754 : }
755 :
756 144 : auto commitId = conversation->join();
757 144 : std::vector<std::map<std::string, std::string>> messages;
758 : {
759 144 : std::lock_guard lk(replayMtx_);
760 144 : auto replayIt = replay_.find(conversationId);
761 144 : if (replayIt != replay_.end()) {
762 5 : messages = std::move(replayIt->second);
763 5 : replay_.erase(replayIt);
764 : }
765 144 : }
766 144 : if (!commitId.empty())
767 117 : sendMessageNotification(*conversation, false, commitId);
768 144 : erasePending(); // Will unlock
769 :
770 : #ifdef LIBJAMI_TESTABLE
771 144 : conversation->onBootstrapStatus(bootstrapCbTest_);
772 : #endif // LIBJAMI_TESTABLE
773 288 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
774 144 : this,
775 288 : conversation->id()),
776 : kd);
777 :
778 144 : if (!preferences.empty())
779 0 : conversation->updatePreferences(preferences);
780 144 : if (!status.empty())
781 19 : conversation->updateMessageStatus(status);
782 144 : syncingMetadatas_.erase(conversationId);
783 144 : saveMetadatas();
784 :
785 : // Inform user that the conversation is ready
786 144 : emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
787 144 : needsSyncingCb_({});
788 144 : std::vector<Json::Value> values;
789 144 : values.reserve(messages.size());
790 149 : for (const auto& message : messages) {
791 : // For now, only replay text messages.
792 : // File transfers will need more logic, and don't care about calls for now.
793 5 : if (message.at("type") == "text/plain" && message.at("author") == username_) {
794 0 : Json::Value json;
795 0 : json["body"] = message.at("body");
796 0 : json["type"] = "text/plain";
797 0 : values.emplace_back(std::move(json));
798 0 : }
799 : }
800 144 : if (!values.empty())
801 0 : conversation->sendMessages(std::move(values),
802 0 : [w = weak(), conversationId](const auto& commits) {
803 0 : auto shared = w.lock();
804 0 : if (shared and not commits.empty())
805 0 : shared->sendMessageNotification(conversationId,
806 : true,
807 0 : *commits.rbegin());
808 0 : });
809 : // Download members profile on first sync
810 144 : auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE;
811 144 : auto askForProfile = isOneOne;
812 144 : if (!isOneOne) {
813 : // If not 1:1 only download profiles from self (to avoid non checked files)
814 99 : if (auto acc = account_.lock()) {
815 99 : auto cert = acc->certStore().getCertificate(deviceId);
816 99 : askForProfile = cert && cert->issuer
817 198 : && cert->issuer->getId().toString() == username_;
818 198 : }
819 : }
820 144 : if (askForProfile) {
821 56 : if (auto acc = account_.lock()) {
822 100 : for (const auto& member : conversation->memberUris(username_)) {
823 44 : acc->askForProfile(conversationId, deviceId, member);
824 56 : }
825 56 : }
826 : }
827 147 : } catch (const std::exception& e) {
828 3 : JAMI_WARN("Something went wrong when cloning conversation: %s", e.what());
829 3 : }
830 147 : lk.lock();
831 147 : erasePending();
832 147 : }
833 :
834 : std::optional<ConversationRequest>
835 28320 : ConversationModule::Impl::getRequest(const std::string& id) const
836 : {
837 : // ConversationsRequestsMtx MUST BE LOCKED
838 28320 : auto it = conversationsRequests_.find(id);
839 28318 : if (it != conversationsRequests_.end())
840 779 : return it->second;
841 27538 : return std::nullopt;
842 : }
843 :
844 : std::string
845 519 : ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
846 : {
847 519 : auto acc = account_.lock();
848 519 : if (!acc)
849 0 : return {};
850 519 : auto details = acc->getContactDetails(uri);
851 519 : auto itRemoved = details.find("removed");
852 : // If contact is removed there is no conversation
853 519 : if (itRemoved != details.end() && itRemoved->second != "0") {
854 40 : auto itBanned = details.find("banned");
855 : // If banned, conversation is still on disk
856 40 : if (itBanned == details.end() || itBanned->second == "0") {
857 : // Check if contact is removed
858 32 : auto itAdded = details.find("added");
859 32 : if (std::stoi(itRemoved->second) > std::stoi(itAdded->second))
860 31 : return {};
861 : }
862 : }
863 488 : auto it = details.find(libjami::Account::TrustRequest::CONVERSATIONID);
864 488 : if (it != details.end())
865 255 : return it->second;
866 233 : return {};
867 519 : }
868 :
869 : void
870 54 : ConversationModule::Impl::declineOtherConversationWith(const std::string& uri) noexcept
871 : {
872 : // conversationsRequestsMtx_ MUST BE LOCKED
873 55 : for (auto& [id, request] : conversationsRequests_) {
874 1 : if (request.declined)
875 0 : continue; // Ignore already declined requests
876 1 : if (request.isOneToOne() && request.from == uri) {
877 3 : JAMI_WARNING("Decline conversation request ({}) from {}", id, uri);
878 1 : request.declined = std::time(nullptr);
879 1 : syncingMetadatas_.erase(id);
880 1 : saveMetadatas();
881 1 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(accountId_, id);
882 : }
883 : }
884 54 : }
885 :
886 : std::vector<std::map<std::string, std::string>>
887 78 : ConversationModule::Impl::getConversationMembers(const std::string& conversationId,
888 : bool includeBanned) const
889 : {
890 : return withConv(conversationId,
891 156 : [&](const auto& conv) { return conv.getMembers(true, includeBanned); });
892 : }
893 :
894 : void
895 11 : ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force)
896 : {
897 11 : auto conv = getConversation(conversationId);
898 11 : if (!conv)
899 0 : return;
900 11 : std::unique_lock lk(conv->mtx);
901 11 : removeRepositoryImpl(*conv, sync, force);
902 11 : }
903 :
904 : void
905 22 : ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force)
906 : {
907 22 : if (conv.conversation && (force || conv.conversation->isRemoving())) {
908 : // Stop fetch!
909 17 : conv.pending.reset();
910 :
911 51 : JAMI_LOG("Remove conversation: {}", conv.info.id);
912 : try {
913 17 : if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
914 39 : for (const auto& member : conv.conversation->getInitialMembers()) {
915 26 : auto account = account_.lock();
916 26 : if (member != account->getUsername()) {
917 : // Note: this can happen while re-adding a contact.
918 : // In this case, check that we are removing the linked conversation.
919 13 : if (conv.info.id == getOneToOneConversation(member)) {
920 0 : if (auto am = account->accountManager())
921 0 : am->removeContactConversation(member);
922 : }
923 : }
924 39 : }
925 : }
926 0 : } catch (const std::exception& e) {
927 0 : JAMI_ERR() << e.what();
928 0 : }
929 17 : conv.conversation->erase();
930 17 : conv.conversation.reset();
931 :
932 17 : if (!sync)
933 1 : return;
934 :
935 16 : conv.info.erased = std::time(nullptr);
936 16 : needsSyncingCb_({});
937 16 : addConvInfo(conv.info);
938 : }
939 : }
940 :
941 : bool
942 11 : ConversationModule::Impl::removeConversation(const std::string& conversationId)
943 : {
944 22 : return withConv(conversationId, [this](auto& conv) { return removeConversationImpl(conv); });
945 : }
946 :
947 : bool
948 11 : ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv)
949 : {
950 11 : auto members = conv.getMembers(false, false);
951 11 : auto isSyncing = !conv.conversation;
952 11 : auto hasMembers = !isSyncing // If syncing there is no member to inform
953 20 : && std::find_if(members.begin(),
954 : members.end(),
955 11 : [&](const auto& member) {
956 11 : return member.at("uri") == username_;
957 : })
958 20 : != members.end() // We must be still a member
959 21 : && members.size() != 1; // If there is only ourself
960 11 : conv.info.removed = std::time(nullptr);
961 11 : if (isSyncing)
962 1 : conv.info.erased = std::time(nullptr);
963 : // Sync now, because it can take some time to really removes the datas
964 11 : needsSyncingCb_({});
965 11 : addConvInfo(conv.info);
966 11 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id);
967 11 : if (isSyncing)
968 1 : return true;
969 10 : if (conv.conversation->mode() != ConversationMode::ONE_TO_ONE) {
970 : // For one to one, we do not notify the leave. The other can still generate request
971 : // and this is managed by the banned part. If we re-accept, the old conversation will be
972 : // retrieved
973 4 : auto commitId = conv.conversation->leave();
974 4 : if (hasMembers) {
975 3 : JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id);
976 : // Commit that we left
977 1 : if (!commitId.empty()) {
978 : // Do not sync as it's synched by convInfos
979 1 : sendMessageNotification(*conv.conversation, false, commitId);
980 : } else {
981 0 : JAMI_ERROR("Failed to send message to conversation {}", conv.info.id);
982 : }
983 : // In this case, we wait that another peer sync the conversation
984 : // to definitely remove it from the device. This is to inform the
985 : // peer that we left the conversation and never want to receive
986 : // any messages
987 1 : return true;
988 : }
989 4 : } else {
990 18 : for (const auto& m : members)
991 12 : if (username_ != m.at("uri") && updateConvReqCb_)
992 6 : updateConvReqCb_(conv.info.id, m.at("uri"), false);
993 : }
994 : // Else we are the last member, so we can remove
995 9 : removeRepositoryImpl(conv, true);
996 9 : return true;
997 11 : }
998 :
999 : void
1000 439 : ConversationModule::Impl::sendMessageNotification(const std::string& conversationId,
1001 : bool sync,
1002 : const std::string& commitId,
1003 : const std::string& deviceId)
1004 : {
1005 439 : if (auto conv = getConversation(conversationId)) {
1006 439 : std::lock_guard lk(conv->mtx);
1007 438 : if (conv->conversation)
1008 438 : sendMessageNotification(*conv->conversation, sync, commitId, deviceId);
1009 878 : }
1010 439 : }
1011 :
1012 : void
1013 3044 : ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
1014 : bool sync,
1015 : const std::string& commitId,
1016 : const std::string& deviceId)
1017 : {
1018 3044 : auto acc = account_.lock();
1019 3044 : if (!acc)
1020 0 : return;
1021 3044 : Json::Value message;
1022 3044 : auto commit = commitId == "" ? conversation.lastCommitId() : commitId;
1023 3044 : message["id"] = conversation.id();
1024 3044 : message["commit"] = commit;
1025 3043 : message["deviceId"] = deviceId_;
1026 3044 : Json::StreamWriterBuilder builder;
1027 3044 : const auto text = Json::writeString(builder, message);
1028 :
1029 : // Send message notification will announce the new commit in 3 steps.
1030 :
1031 : // First, because our account can have several devices, announce to other devices
1032 3044 : if (sync) {
1033 : // Announce to our devices
1034 543 : refreshMessage[username_] = sendMsgCb_(username_,
1035 : {},
1036 2172 : std::map<std::string, std::string> {
1037 1086 : {MIME_TYPE_GIT, text}},
1038 543 : refreshMessage[username_]);
1039 : }
1040 :
1041 : // Then, we announce to 2 random members in the conversation that aren't in the DRT
1042 : // This allow new devices without the ability to sync to their other devices to sync with us.
1043 : // Or they can also use an old backup.
1044 3044 : std::vector<std::string> nonConnectedMembers;
1045 3044 : std::vector<NodeId> devices;
1046 : {
1047 3044 : std::lock_guard lk(notSyncedNotificationMtx_);
1048 3044 : devices = conversation.peersToSyncWith();
1049 6088 : auto members = conversation.memberUris(username_, {MemberRole::BANNED});
1050 3044 : std::vector<std::string> connectedMembers;
1051 : // print all members
1052 3044 : if (auto acc = account_.lock()) {
1053 45947 : for (const auto& device : devices) {
1054 42902 : auto cert = acc->certStore().getCertificate(device.toString());
1055 42907 : if (cert && cert->issuer)
1056 42905 : connectedMembers.emplace_back(cert->issuer->getId().toString());
1057 42903 : }
1058 3043 : }
1059 3044 : std::sort(std::begin(connectedMembers), std::end(connectedMembers));
1060 3044 : std::set_difference(members.begin(),
1061 : members.end(),
1062 : connectedMembers.begin(),
1063 : connectedMembers.end(),
1064 : std::inserter(nonConnectedMembers, nonConnectedMembers.begin()));
1065 3044 : std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand);
1066 3044 : if (nonConnectedMembers.size() > 2)
1067 98 : nonConnectedMembers.resize(2);
1068 3044 : if (!conversation.isBootstraped()) {
1069 720 : JAMI_DEBUG("[Conversation {}] Not yet bootstraped, save notification",
1070 : conversation.id());
1071 : // Because we can get some git channels but not bootstraped, we should keep this
1072 : // to refresh when bootstraped.
1073 240 : notSyncedNotification_[conversation.id()] = commit;
1074 : }
1075 3044 : }
1076 :
1077 4224 : for (const auto& member : nonConnectedMembers) {
1078 1180 : refreshMessage[member] = sendMsgCb_(member,
1079 : {},
1080 4720 : std::map<std::string, std::string> {
1081 2360 : {MIME_TYPE_GIT, text}},
1082 1180 : refreshMessage[member]);
1083 : }
1084 :
1085 : // Finally we send to devices that the DRT choose.
1086 45952 : for (const auto& device : devices) {
1087 42908 : auto deviceIdStr = device.toString();
1088 42907 : auto memberUri = conversation.uriFromDevice(deviceIdStr);
1089 42903 : if (memberUri.empty() || deviceIdStr == deviceId)
1090 2381 : continue;
1091 40523 : refreshMessage[deviceIdStr] = sendMsgCb_(memberUri,
1092 : device,
1093 162099 : std::map<std::string, std::string> {
1094 81050 : {MIME_TYPE_GIT, text}},
1095 40524 : refreshMessage[deviceIdStr]);
1096 45288 : }
1097 3044 : }
1098 :
1099 : void
1100 55 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1101 : std::string message,
1102 : const std::string& replyTo,
1103 : const std::string& type,
1104 : bool announce,
1105 : OnCommitCb&& onCommit,
1106 : OnDoneCb&& cb)
1107 : {
1108 55 : Json::Value json;
1109 55 : json["body"] = std::move(message);
1110 55 : json["type"] = type;
1111 55 : sendMessage(conversationId,
1112 55 : std::move(json),
1113 : replyTo,
1114 : announce,
1115 55 : std::move(onCommit),
1116 55 : std::move(cb));
1117 55 : }
1118 :
1119 : void
1120 68 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1121 : Json::Value&& value,
1122 : const std::string& replyTo,
1123 : bool announce,
1124 : OnCommitCb&& onCommit,
1125 : OnDoneCb&& cb)
1126 : {
1127 68 : if (auto conv = getConversation(conversationId)) {
1128 68 : std::lock_guard lk(conv->mtx);
1129 68 : if (conv->conversation)
1130 68 : conv->conversation
1131 204 : ->sendMessage(std::move(value),
1132 : replyTo,
1133 68 : std::move(onCommit),
1134 68 : [this,
1135 : conversationId,
1136 : announce,
1137 68 : cb = std::move(cb)](bool ok, const std::string& commitId) {
1138 68 : if (cb)
1139 0 : cb(ok, commitId);
1140 68 : if (!announce)
1141 0 : return;
1142 68 : if (ok)
1143 67 : sendMessageNotification(conversationId, true, commitId);
1144 : else
1145 1 : JAMI_ERR("Failed to send message to conversation %s",
1146 : conversationId.c_str());
1147 : });
1148 136 : }
1149 68 : }
1150 :
1151 : void
1152 0 : ConversationModule::Impl::editMessage(const std::string& conversationId,
1153 : const std::string& newBody,
1154 : const std::string& editedId)
1155 : {
1156 : // Check that editedId is a valid commit, from ourself and plain/text
1157 0 : auto validCommit = false;
1158 0 : if (auto conv = getConversation(conversationId)) {
1159 0 : std::lock_guard lk(conv->mtx);
1160 0 : if (conv->conversation) {
1161 0 : auto commit = conv->conversation->getCommit(editedId);
1162 0 : if (commit != std::nullopt) {
1163 0 : validCommit = commit->at("author") == username_
1164 0 : && commit->at("type") == "text/plain";
1165 : }
1166 0 : }
1167 0 : }
1168 0 : if (!validCommit) {
1169 0 : JAMI_ERROR("Cannot edit commit {:s}", editedId);
1170 0 : return;
1171 : }
1172 : // Commit message edition
1173 0 : Json::Value json;
1174 0 : json["body"] = newBody;
1175 0 : json["edit"] = editedId;
1176 0 : json["type"] = "application/edited-message";
1177 0 : sendMessage(conversationId, std::move(json));
1178 0 : }
1179 :
1180 : void
1181 329 : ConversationModule::Impl::bootstrapCb(std::string convId)
1182 : {
1183 329 : std::string commitId;
1184 : {
1185 329 : std::lock_guard lk(notSyncedNotificationMtx_);
1186 329 : auto it = notSyncedNotification_.find(convId);
1187 329 : if (it != notSyncedNotification_.end()) {
1188 163 : commitId = it->second;
1189 163 : notSyncedNotification_.erase(it);
1190 : }
1191 329 : }
1192 987 : JAMI_DEBUG("[Conversation {}] Resend last message notification", convId);
1193 329 : dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] {
1194 329 : if (auto sthis = w.lock())
1195 329 : sthis->sendMessageNotification(convId, true, commitId);
1196 329 : });
1197 329 : }
1198 :
1199 : void
1200 462 : ConversationModule::Impl::fixStructures(
1201 : std::shared_ptr<JamiAccount> acc,
1202 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
1203 : const std::set<std::string>& toRm)
1204 : {
1205 462 : for (const auto& [uri, oldConv, newConv] : updateContactConv) {
1206 0 : acc->updateConvForContact(uri, oldConv, newConv);
1207 : }
1208 : ////////////////////////////////////////////////////////////////
1209 : // Note: This is only to homogeneize trust and convRequests
1210 462 : std::vector<std::string> invalidPendingRequests;
1211 : {
1212 462 : auto requests = acc->getTrustRequests();
1213 462 : std::lock_guard lk(conversationsRequestsMtx_);
1214 463 : for (const auto& request : requests) {
1215 1 : auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID);
1216 1 : auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM);
1217 1 : if (itConvId != request.end() && itConvFrom != request.end()) {
1218 : // Check if requests exists or is declined.
1219 1 : auto itReq = conversationsRequests_.find(itConvId->second);
1220 1 : auto declined = itReq == conversationsRequests_.end() || itReq->second.declined;
1221 1 : if (declined) {
1222 3 : JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second);
1223 1 : invalidPendingRequests.emplace_back(itConvFrom->second);
1224 : }
1225 : }
1226 : }
1227 462 : auto requestRemoved = false;
1228 464 : for (auto it = conversationsRequests_.begin(); it != conversationsRequests_.end();) {
1229 2 : if (it->second.from == username_) {
1230 0 : JAMI_WARNING("Detected request from ourself, this makes no sense. Remove {}",
1231 : it->first);
1232 0 : it = conversationsRequests_.erase(it);
1233 : } else {
1234 2 : ++it;
1235 : }
1236 : }
1237 462 : if (requestRemoved) {
1238 0 : saveConvRequests();
1239 : }
1240 462 : }
1241 463 : for (const auto& invalidPendingRequest : invalidPendingRequests)
1242 1 : acc->discardTrustRequest(invalidPendingRequest);
1243 :
1244 : ////////////////////////////////////////////////////////////////
1245 462 : for (const auto& conv : toRm) {
1246 0 : JAMI_ERROR("Remove conversation ({})", conv);
1247 0 : removeConversation(conv);
1248 : }
1249 1386 : JAMI_DEBUG("[Account {}] Conversations loaded!", accountId_);
1250 462 : }
1251 :
1252 : void
1253 139 : ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
1254 : const std::string& deviceId,
1255 : const std::string& oldConvId)
1256 : {
1257 139 : std::lock_guard lk(conv->mtx);
1258 139 : const auto& conversationId = conv->info.id;
1259 139 : if (!conv->startFetch(deviceId, true)) {
1260 15 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
1261 5 : return;
1262 : }
1263 :
1264 134 : onNeedSocket_(
1265 : conversationId,
1266 : deviceId,
1267 134 : [sthis = shared_from_this(), conv, conversationId, oldConvId, deviceId](
1268 : const auto& channel) {
1269 134 : auto acc = sthis->account_.lock();
1270 134 : std::lock_guard lk(conv->mtx);
1271 134 : if (conv->pending && !conv->pending->ready) {
1272 132 : conv->pending->removeId = oldConvId;
1273 132 : if (channel) {
1274 126 : conv->pending->ready = true;
1275 126 : conv->pending->deviceId = channel->deviceId().toString();
1276 126 : conv->pending->socket = channel;
1277 126 : if (!conv->pending->cloning) {
1278 126 : conv->pending->cloning = true;
1279 378 : dht::ThreadPool::io().run([w = sthis->weak(),
1280 126 : conversationId,
1281 126 : deviceId = conv->pending->deviceId]() {
1282 252 : if (auto sthis = w.lock())
1283 126 : sthis->handlePendingConversation(conversationId, deviceId);
1284 : });
1285 : }
1286 126 : return true;
1287 : } else {
1288 6 : conv->stopFetch(deviceId);
1289 : }
1290 : }
1291 8 : return false;
1292 134 : },
1293 : MIME_TYPE_GIT);
1294 139 : }
1295 :
1296 : void
1297 594 : ConversationModule::Impl::bootstrap(const std::string& convId)
1298 : {
1299 594 : std::vector<DeviceId> kd;
1300 594 : if (auto acc = account_.lock())
1301 1241 : for (const auto& [id, _] : acc->getKnownDevices())
1302 1835 : kd.emplace_back(id);
1303 :
1304 110 : auto bootstrap = [&](auto& conv) {
1305 110 : if (conv) {
1306 : #ifdef LIBJAMI_TESTABLE
1307 110 : conv->onBootstrapStatus(bootstrapCbTest_);
1308 : #endif // LIBJAMI_TESTABLE
1309 110 : conv->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb, this, conv->id()), kd);
1310 : }
1311 110 : };
1312 594 : std::vector<std::string> toClone;
1313 594 : if (convId.empty()) {
1314 464 : std::lock_guard lk(convInfosMtx_);
1315 490 : for (const auto& [conversationId, convInfo] : convInfos_) {
1316 26 : auto conv = getConversation(conversationId);
1317 26 : if (!conv)
1318 0 : return;
1319 26 : if ((!conv->conversation && !conv->info.isRemoved())) {
1320 : // Because we're not tracking contact presence in order to sync now,
1321 : // we need to ask to clone requests when bootstraping all conversations
1322 : // else it can stay syncing
1323 3 : toClone.emplace_back(conversationId);
1324 23 : } else if (conv->conversation) {
1325 23 : bootstrap(conv->conversation);
1326 : }
1327 26 : }
1328 594 : } else if (auto conv = getConversation(convId)) {
1329 111 : std::lock_guard lk(conv->mtx);
1330 111 : if (conv->conversation)
1331 87 : bootstrap(conv->conversation);
1332 241 : }
1333 :
1334 597 : for (const auto& cid : toClone) {
1335 3 : auto members = getConversationMembers(cid);
1336 8 : for (const auto& member : members) {
1337 5 : if (member.at("uri") != username_)
1338 2 : cloneConversationFrom(cid, member.at("uri"));
1339 : }
1340 3 : }
1341 594 : }
1342 :
1343 : void
1344 140 : ConversationModule::Impl::cloneConversationFrom(const std::string& conversationId,
1345 : const std::string& uri,
1346 : const std::string& oldConvId)
1347 : {
1348 140 : auto acc = account_.lock();
1349 140 : auto memberHash = dht::InfoHash(uri);
1350 140 : if (!acc || !memberHash) {
1351 0 : JAMI_WARNING("Invalid member detected: {}", uri);
1352 0 : return;
1353 : }
1354 140 : auto conv = startConversation(conversationId);
1355 140 : std::lock_guard lk(conv->mtx);
1356 140 : conv->info = {};
1357 140 : conv->info.id = conversationId;
1358 140 : conv->info.created = std::time(nullptr);
1359 140 : conv->info.members.emplace(username_);
1360 140 : conv->info.members.emplace(uri);
1361 140 : acc->forEachDevice(memberHash,
1362 138 : [w = weak(), conv, conversationId, oldConvId](
1363 : const std::shared_ptr<dht::crypto::PublicKey>& pk) {
1364 138 : auto sthis = w.lock();
1365 138 : auto deviceId = pk->getLongId().toString();
1366 138 : if (!sthis or deviceId == sthis->deviceId_)
1367 0 : return;
1368 138 : sthis->cloneConversationFrom(conv, deviceId, oldConvId);
1369 138 : });
1370 140 : addConvInfo(conv->info);
1371 140 : }
1372 :
1373 : ////////////////////////////////////////////////////////////////
1374 :
1375 : void
1376 413 : ConversationModule::saveConvRequests(
1377 : const std::string& accountId,
1378 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1379 : {
1380 413 : auto path = fileutils::get_data_dir() / accountId;
1381 413 : saveConvRequestsToPath(path, conversationsRequests);
1382 413 : }
1383 :
1384 : void
1385 934 : ConversationModule::saveConvRequestsToPath(
1386 : const std::filesystem::path& path,
1387 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1388 : {
1389 934 : auto p = path / "convRequests";
1390 933 : std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
1391 934 : std::ofstream file(p, std::ios::trunc | std::ios::binary);
1392 934 : msgpack::pack(file, conversationsRequests);
1393 934 : }
1394 :
1395 : void
1396 1694 : ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations)
1397 : {
1398 1694 : auto path = fileutils::get_data_dir() / accountId;
1399 1694 : saveConvInfosToPath(path, conversations);
1400 1694 : }
1401 :
1402 : void
1403 2215 : ConversationModule::saveConvInfosToPath(const std::filesystem::path& path,
1404 : const ConvInfoMap& conversations)
1405 : {
1406 4430 : std::ofstream file(path / "convInfo", std::ios::trunc | std::ios::binary);
1407 2215 : msgpack::pack(file, conversations);
1408 2215 : }
1409 :
1410 : ////////////////////////////////////////////////////////////////
1411 :
1412 456 : ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account,
1413 : NeedsSyncingCb&& needsSyncingCb,
1414 : SengMsgCb&& sendMsgCb,
1415 : NeedSocketCb&& onNeedSocket,
1416 : NeedSocketCb&& onNeedSwarmSocket,
1417 : UpdateConvReq&& updateConvReqCb,
1418 : OneToOneRecvCb&& oneToOneRecvCb,
1419 456 : bool autoLoadConversations)
1420 456 : : pimpl_ {std::make_unique<Impl>(std::move(account),
1421 456 : std::move(needsSyncingCb),
1422 456 : std::move(sendMsgCb),
1423 456 : std::move(onNeedSocket),
1424 456 : std::move(onNeedSwarmSocket),
1425 456 : std::move(updateConvReqCb),
1426 456 : std::move(oneToOneRecvCb))}
1427 : {
1428 456 : if (autoLoadConversations) {
1429 456 : loadConversations();
1430 : }
1431 456 : }
1432 :
1433 : #ifdef LIBJAMI_TESTABLE
1434 : void
1435 17 : ConversationModule::onBootstrapStatus(
1436 : const std::function<void(std::string, Conversation::BootstrapStatus)>& cb)
1437 : {
1438 17 : pimpl_->bootstrapCbTest_ = cb;
1439 17 : std::unique_lock lk(pimpl_->conversationsMtx_);
1440 19 : for (auto& [_, c] : pimpl_->conversations_)
1441 2 : if (c && c->conversation)
1442 1 : c->conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
1443 17 : }
1444 : #endif
1445 :
1446 : void
1447 462 : ConversationModule::loadConversations()
1448 : {
1449 462 : auto acc = pimpl_->account_.lock();
1450 462 : if (!acc)
1451 0 : return;
1452 1386 : JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
1453 : auto conversationsRepositories = dhtnet::fileutils::readDirectory(
1454 924 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1455 :
1456 : auto contacts = acc->getContacts(
1457 462 : true); // Avoid to lock configurationMtx while conv Mtx is locked
1458 462 : std::unique_lock lk(pimpl_->conversationsMtx_);
1459 462 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1460 462 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1461 462 : pimpl_->conversations_.clear();
1462 :
1463 : struct Ctx
1464 : {
1465 : std::mutex cvMtx;
1466 : std::condition_variable cv;
1467 : std::mutex toRmMtx;
1468 : std::set<std::string> toRm;
1469 : std::mutex convMtx;
1470 : std::atomic_int convNb;
1471 : std::vector<std::map<std::string, std::string>> contacts;
1472 : std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
1473 : };
1474 462 : auto ctx = std::make_shared<Ctx>();
1475 462 : ctx->convNb = conversationsRepositories.empty() ? 0 : conversationsRepositories.size();
1476 462 : ctx->contacts = std::move(contacts);
1477 :
1478 469 : for (auto repository : conversationsRepositories) {
1479 7 : dht::ThreadPool::io().run([this, ctx, repository, acc] {
1480 : try {
1481 7 : auto sconv = std::make_shared<SyncedConversation>(repository);
1482 :
1483 7 : auto conv = std::make_shared<Conversation>(acc, repository);
1484 7 : conv->onMessageStatusChanged([this, repository](const auto& status) {
1485 3 : auto msg = std::make_shared<SyncMsg>();
1486 6 : msg->ms = {{repository, status}};
1487 3 : pimpl_->needsSyncingCb_(std::move(msg));
1488 3 : });
1489 21 : conv->onMembersChanged(
1490 14 : [w = pimpl_->weak_from_this(), repository](const auto& members) {
1491 0 : if (auto p = w.lock())
1492 0 : p->setConversationMembers(repository, members);
1493 0 : });
1494 7 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1495 7 : auto members = conv->memberUris(acc->getUsername(), {});
1496 : // NOTE: The following if is here to protect against any incorrect state
1497 : // that can be introduced
1498 7 : if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
1499 : // If we got a 1:1 conversation, but not in the contact details, it's rather a
1500 : // duplicate or a weird state
1501 1 : auto otherUri = *members.begin();
1502 1 : auto itContact = std::find_if(ctx->contacts.cbegin(),
1503 1 : ctx->contacts.cend(),
1504 1 : [&](const auto& c) {
1505 1 : return c.at("id") == otherUri;
1506 : });
1507 1 : if (itContact == ctx->contacts.end()) {
1508 0 : JAMI_WARNING("Contact {} not found", otherUri);
1509 0 : std::lock_guard lkCv {ctx->cvMtx};
1510 0 : --ctx->convNb;
1511 0 : ctx->cv.notify_all();
1512 0 : return;
1513 0 : }
1514 2 : std::string convFromDetails = itContact->at("conversationId");
1515 1 : auto removed = std::stoul(itContact->at("removed"));
1516 1 : auto added = std::stoul(itContact->at("added"));
1517 1 : auto isRemoved = removed > added;
1518 1 : if (convFromDetails != repository) {
1519 0 : if (convFromDetails.empty()) {
1520 0 : if (isRemoved) {
1521 : // If details is empty, contact is removed and not banned.
1522 0 : JAMI_ERROR("Conversation {} detected for {} and should be removed",
1523 : repository,
1524 : otherUri);
1525 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1526 0 : ctx->toRm.insert(repository);
1527 0 : } else {
1528 0 : JAMI_ERROR("No conversation detected for {} but one exists ({}). "
1529 : "Update details",
1530 : otherUri,
1531 : repository);
1532 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1533 0 : ctx->updateContactConv.emplace_back(
1534 0 : std::make_tuple(otherUri, convFromDetails, repository));
1535 0 : }
1536 : } else {
1537 0 : JAMI_ERROR("Multiple conversation detected for {} but ({} & {})",
1538 : otherUri,
1539 : repository,
1540 : convFromDetails);
1541 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1542 0 : ctx->toRm.insert(repository);
1543 0 : }
1544 : }
1545 1 : }
1546 : {
1547 7 : std::lock_guard lkMtx {ctx->convMtx};
1548 7 : auto convInfo = pimpl_->convInfos_.find(repository);
1549 7 : if (convInfo == pimpl_->convInfos_.end()) {
1550 0 : JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
1551 0 : sconv->info.created = std::time(nullptr);
1552 0 : sconv->info.members = std::move(members);
1553 0 : sconv->info.lastDisplayed
1554 0 : = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
1555 : // convInfosMtx_ is already locked
1556 0 : pimpl_->convInfos_[repository] = sconv->info;
1557 : } else {
1558 7 : sconv->info = convInfo->second;
1559 7 : if (convInfo->second.isRemoved()) {
1560 : // A conversation was removed, but repository still exists
1561 0 : conv->setRemovingFlag();
1562 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1563 0 : ctx->toRm.insert(repository);
1564 0 : }
1565 : }
1566 7 : }
1567 7 : auto commits = conv->commitsEndedCalls();
1568 :
1569 7 : if (!commits.empty()) {
1570 : // Note: here, this means that some calls were actives while the
1571 : // daemon finished (can be a crash).
1572 : // Notify other in the conversation that the call is finished
1573 0 : pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
1574 : }
1575 7 : sconv->conversation = conv;
1576 7 : std::lock_guard lkMtx {ctx->convMtx};
1577 7 : pimpl_->conversations_.emplace(repository, std::move(sconv));
1578 7 : } catch (const std::logic_error& e) {
1579 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}",
1580 : pimpl_->accountId_,
1581 : e.what());
1582 0 : }
1583 7 : std::lock_guard lkCv {ctx->cvMtx};
1584 7 : --ctx->convNb;
1585 7 : ctx->cv.notify_all();
1586 7 : });
1587 7 : }
1588 :
1589 462 : std::unique_lock lkCv {ctx->cvMtx};
1590 1400 : ctx->cv.wait(lkCv, [&] { return ctx->convNb.load() == 0; });
1591 :
1592 : // Prune any invalid conversations without members and
1593 : // set the removed flag if needed
1594 462 : std::set<std::string> removed;
1595 481 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1596 19 : const auto& info = itInfo->second;
1597 19 : if (info.members.empty()) {
1598 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1599 0 : continue;
1600 : }
1601 19 : if (info.isRemoved())
1602 0 : removed.insert(info.id);
1603 19 : auto itConv = pimpl_->conversations_.find(info.id);
1604 19 : if (itConv == pimpl_->conversations_.end()) {
1605 : // convInfos_ can contain a conversation that is not yet cloned
1606 : // so we need to add it there.
1607 12 : itConv = pimpl_->conversations_
1608 12 : .emplace(info.id, std::make_shared<SyncedConversation>(info))
1609 : .first;
1610 : }
1611 38 : if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation
1612 38 : && info.isRemoved())
1613 0 : itConv->second->conversation->setRemovingFlag();
1614 19 : if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
1615 : // In this case, the conversation is not synced and we only know ourself
1616 0 : if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
1617 0 : JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.",
1618 : pimpl_->accountId_,
1619 : info.id);
1620 0 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
1621 0 : info.id);
1622 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1623 0 : continue;
1624 0 : }
1625 : }
1626 19 : ++itInfo;
1627 : }
1628 : // On oldest version, removeConversation didn't update "appdata/contacts"
1629 : // causing a potential incorrect state between "appdata/contacts" and "appdata/convInfos"
1630 462 : if (!removed.empty())
1631 0 : acc->unlinkConversations(removed);
1632 : // Save if we've removed some invalid entries
1633 462 : pimpl_->saveConvInfos();
1634 :
1635 462 : ilk.unlock();
1636 462 : lk.unlock();
1637 :
1638 1386 : dht::ThreadPool::io().run([w = pimpl_->weak(),
1639 : acc,
1640 462 : updateContactConv = std::move(ctx->updateContactConv),
1641 462 : toRm = std::move(ctx->toRm)]() {
1642 : // Will lock account manager
1643 462 : if (auto shared = w.lock())
1644 462 : shared->fixStructures(acc, updateContactConv, toRm);
1645 462 : });
1646 462 : }
1647 :
1648 : void
1649 0 : ConversationModule::loadSingleConversation(const std::string& convId)
1650 : {
1651 0 : auto acc = pimpl_->account_.lock();
1652 0 : if (!acc)
1653 0 : return;
1654 0 : JAMI_LOG("[Account {}] Start loading conversation {}", pimpl_->accountId_, convId);
1655 :
1656 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
1657 0 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1658 : // Load convInfos to retrieve requests that have been accepted but not yet synchronized.
1659 0 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1660 0 : pimpl_->conversations_.clear();
1661 :
1662 : try {
1663 0 : auto sconv = std::make_shared<SyncedConversation>(convId);
1664 :
1665 0 : auto conv = std::make_shared<Conversation>(acc, convId);
1666 :
1667 0 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1668 :
1669 0 : sconv->conversation = conv;
1670 0 : pimpl_->conversations_.emplace(convId, std::move(sconv));
1671 0 : } catch (const std::logic_error& e) {
1672 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
1673 0 : }
1674 :
1675 : // Add all other conversations as dummy conversations to indicate their existence so
1676 : // isConversation could detect conversations correctly.
1677 : auto conversationsRepositoryIds = dhtnet::fileutils::readDirectory(
1678 0 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1679 0 : for (auto repositoryId : conversationsRepositoryIds) {
1680 0 : if (repositoryId != convId) {
1681 0 : auto conv = std::make_shared<SyncedConversation>(convId);
1682 0 : pimpl_->conversations_.emplace(repositoryId, conv);
1683 0 : }
1684 0 : }
1685 :
1686 : // Add conversations from convInfos_ so isConversation could detect conversations correctly.
1687 : // This includes conversations that have been accepted but are not yet synchronized.
1688 0 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1689 0 : const auto& info = itInfo->second;
1690 0 : if (info.members.empty()) {
1691 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1692 0 : continue;
1693 : }
1694 0 : auto itConv = pimpl_->conversations_.find(info.id);
1695 0 : if (itConv == pimpl_->conversations_.end()) {
1696 : // convInfos_ can contain a conversation that is not yet cloned
1697 : // so we need to add it there.
1698 0 : pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info)).first;
1699 : }
1700 0 : ++itInfo;
1701 : }
1702 :
1703 0 : ilk.unlock();
1704 0 : lk.unlock();
1705 0 : }
1706 :
1707 : void
1708 594 : ConversationModule::bootstrap(const std::string& convId)
1709 : {
1710 594 : pimpl_->bootstrap(convId);
1711 594 : }
1712 :
1713 : void
1714 0 : ConversationModule::monitor()
1715 : {
1716 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
1717 0 : for (auto& [_, conv] : pimpl_->conversations_) {
1718 0 : if (conv && conv->conversation) {
1719 0 : conv->conversation->monitor();
1720 : }
1721 : }
1722 0 : }
1723 :
1724 : void
1725 478 : ConversationModule::clearPendingFetch()
1726 : {
1727 : // Note: This is a workaround. convModule() is kept if account is disabled/re-enabled.
1728 : // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch
1729 : // is not erased (callback not called), it will block the new messages as it will not
1730 : // sync. The best way to debug this is to get logs from the last ICE connection for
1731 : // syncing the conversation. It may have been killed in some un-expected way avoiding to
1732 : // call the callbacks. This should never happen, but if it's the case, this will allow
1733 : // new messages to be synced correctly.
1734 478 : std::unique_lock lk(pimpl_->conversationsMtx_);
1735 499 : for (auto& [_, conv] : pimpl_->conversations_) {
1736 21 : if (conv && conv->pending) {
1737 0 : JAMI_ERR("This is a bug, seems to still fetch to some device on initializing");
1738 0 : conv->pending.reset();
1739 : }
1740 : }
1741 478 : }
1742 :
1743 : void
1744 0 : ConversationModule::reloadRequests()
1745 : {
1746 0 : pimpl_->conversationsRequests_ = convRequests(pimpl_->accountId_);
1747 0 : }
1748 :
1749 : std::vector<std::string>
1750 7 : ConversationModule::getConversations() const
1751 : {
1752 7 : std::vector<std::string> result;
1753 7 : std::lock_guard lk(pimpl_->convInfosMtx_);
1754 7 : result.reserve(pimpl_->convInfos_.size());
1755 14 : for (const auto& [key, conv] : pimpl_->convInfos_) {
1756 7 : if (conv.isRemoved())
1757 3 : continue;
1758 4 : result.emplace_back(key);
1759 : }
1760 14 : return result;
1761 7 : }
1762 :
1763 : std::string
1764 506 : ConversationModule::getOneToOneConversation(const std::string& uri) const noexcept
1765 : {
1766 506 : return pimpl_->getOneToOneConversation(uri);
1767 : }
1768 :
1769 : std::vector<std::map<std::string, std::string>>
1770 11 : ConversationModule::getConversationRequests() const
1771 : {
1772 11 : std::vector<std::map<std::string, std::string>> requests;
1773 11 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1774 11 : requests.reserve(pimpl_->conversationsRequests_.size());
1775 22 : for (const auto& [id, request] : pimpl_->conversationsRequests_) {
1776 11 : if (request.declined)
1777 6 : continue; // Do not add declined requests
1778 5 : requests.emplace_back(request.toMap());
1779 : }
1780 22 : return requests;
1781 11 : }
1782 :
1783 : void
1784 61 : ConversationModule::onTrustRequest(const std::string& uri,
1785 : const std::string& conversationId,
1786 : const std::vector<uint8_t>& payload,
1787 : time_t received)
1788 : {
1789 61 : auto oldConv = getOneToOneConversation(uri);
1790 61 : if (!oldConv.empty() && pimpl_->isConversation(oldConv)) {
1791 : // If there is already an active one to one conversation here, it's an active
1792 : // contact and the contact will reclone this activeConv, so ignore the request
1793 12 : JAMI_WARNING(
1794 : "Contact is sending a request for a non active conversation. Ignore. They will "
1795 : "clone the old one");
1796 4 : return;
1797 : }
1798 57 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1799 57 : ConversationRequest req;
1800 57 : req.from = uri;
1801 57 : req.conversationId = conversationId;
1802 57 : req.received = std::time(nullptr);
1803 171 : req.metadatas = ConversationRepository::infosFromVCard(vCard::utils::toMap(
1804 114 : std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size())));
1805 57 : auto reqMap = req.toMap();
1806 57 : if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
1807 49 : lk.unlock();
1808 49 : emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_,
1809 : conversationId,
1810 : uri,
1811 : payload,
1812 : received);
1813 49 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1814 : conversationId,
1815 : reqMap);
1816 49 : pimpl_->needsSyncingCb_({});
1817 : } else {
1818 24 : JAMI_DEBUG("[Account {}] Received a request for a conversation "
1819 : "already existing. Ignore",
1820 : pimpl_->accountId_);
1821 : }
1822 61 : }
1823 :
1824 : void
1825 509 : ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
1826 : {
1827 509 : ConversationRequest req(value);
1828 509 : auto acc = pimpl_->account_.lock();
1829 509 : auto isOneToOne = req.isOneToOne();
1830 509 : std::string oldConv;
1831 509 : if (acc && isOneToOne) {
1832 6 : oldConv = getOneToOneConversation(from);
1833 : }
1834 509 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1835 1527 : JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
1836 : pimpl_->accountId_,
1837 : req.conversationId,
1838 : from);
1839 509 : auto convId = req.conversationId;
1840 :
1841 : // Already accepted request, do nothing
1842 509 : if (pimpl_->isConversation(convId))
1843 56 : return;
1844 453 : auto oldReq = pimpl_->getRequest(convId);
1845 453 : if (oldReq != std::nullopt) {
1846 1038 : JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. "
1847 : "Ignore. Declined: {}",
1848 : pimpl_->accountId_,
1849 : static_cast<int>(oldReq->declined));
1850 346 : return;
1851 : }
1852 :
1853 107 : if (!oldConv.empty()) {
1854 4 : lk.unlock();
1855 : // Already a conversation with the contact.
1856 : // If there is already an active one to one conversation here, it's an active
1857 : // contact and the contact will reclone this activeConv, so ignore the request
1858 12 : JAMI_WARNING(
1859 : "Contact is sending a request for a non active conversation. Ignore. They will "
1860 : "clone the old one");
1861 4 : return;
1862 : }
1863 :
1864 103 : req.received = std::time(nullptr);
1865 103 : req.from = from;
1866 103 : auto reqMap = req.toMap();
1867 103 : if (pimpl_->addConversationRequest(convId, std::move(req))) {
1868 103 : lk.unlock();
1869 : // Note: no need to sync here because other connected devices should receive
1870 : // the same conversation request. Will sync when the conversation will be added
1871 103 : if (isOneToOne)
1872 1 : pimpl_->oneToOneRecvCb_(convId, from);
1873 103 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1874 : convId,
1875 : reqMap);
1876 : }
1877 2483 : }
1878 :
1879 : std::string
1880 3 : ConversationModule::peerFromConversationRequest(const std::string& convId) const
1881 : {
1882 3 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1883 3 : auto it = pimpl_->conversationsRequests_.find(convId);
1884 3 : if (it != pimpl_->conversationsRequests_.end()) {
1885 3 : return it->second.from;
1886 : }
1887 0 : return {};
1888 3 : }
1889 :
1890 : void
1891 408 : ConversationModule::onNeedConversationRequest(const std::string& from,
1892 : const std::string& conversationId)
1893 : {
1894 408 : pimpl_->withConversation(conversationId, [&](auto& conversation) {
1895 407 : if (!conversation.isMember(from, true)) {
1896 0 : JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId);
1897 0 : return;
1898 : }
1899 1221 : JAMI_LOG("{} is asking a new invite for {}", from, conversationId);
1900 407 : pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
1901 : });
1902 408 : }
1903 :
1904 : void
1905 146 : ConversationModule::acceptConversationRequest(const std::string& conversationId,
1906 : const std::string& deviceId)
1907 : {
1908 : // For all conversation members, try to open a git channel with this conversation ID
1909 146 : std::unique_lock lkCr(pimpl_->conversationsRequestsMtx_);
1910 146 : auto request = pimpl_->getRequest(conversationId);
1911 146 : if (request == std::nullopt) {
1912 13 : lkCr.unlock();
1913 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
1914 6 : std::unique_lock lk(conv->mtx);
1915 6 : if (!conv->conversation) {
1916 1 : lk.unlock();
1917 1 : pimpl_->cloneConversationFrom(conv, deviceId);
1918 : }
1919 19 : }
1920 39 : JAMI_WARNING("[Account {}] Request not found for conversation {}",
1921 : pimpl_->accountId_,
1922 : conversationId);
1923 13 : return;
1924 : }
1925 133 : pimpl_->rmConversationRequest(conversationId);
1926 133 : lkCr.unlock();
1927 133 : if (pimpl_->updateConvReqCb_)
1928 133 : pimpl_->updateConvReqCb_(conversationId, request->from, true);
1929 133 : cloneConversationFrom(conversationId, request->from);
1930 159 : }
1931 :
1932 : void
1933 6 : ConversationModule::declineConversationRequest(const std::string& conversationId)
1934 : {
1935 6 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1936 6 : auto it = pimpl_->conversationsRequests_.find(conversationId);
1937 6 : if (it != pimpl_->conversationsRequests_.end()) {
1938 6 : it->second.declined = std::time(nullptr);
1939 6 : pimpl_->saveConvRequests();
1940 : }
1941 6 : pimpl_->syncingMetadatas_.erase(conversationId);
1942 6 : pimpl_->saveMetadatas();
1943 6 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
1944 : conversationId);
1945 6 : pimpl_->needsSyncingCb_({});
1946 6 : }
1947 :
1948 : std::string
1949 128 : ConversationModule::startConversation(ConversationMode mode, const std::string& otherMember)
1950 : {
1951 128 : auto acc = pimpl_->account_.lock();
1952 128 : if (!acc)
1953 0 : return {};
1954 128 : std::vector<DeviceId> kd;
1955 258 : for (const auto& [id, _] : acc->getKnownDevices())
1956 258 : kd.emplace_back(id);
1957 : // Create the conversation object
1958 128 : std::shared_ptr<Conversation> conversation;
1959 : try {
1960 128 : conversation = std::make_shared<Conversation>(acc, mode, otherMember);
1961 128 : auto conversationId = conversation->id();
1962 128 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
1963 502 : auto msg = std::make_shared<SyncMsg>();
1964 1004 : msg->ms = {{conversationId, status}};
1965 502 : pimpl_->needsSyncingCb_(std::move(msg));
1966 502 : });
1967 128 : conversation->onMembersChanged([this, conversationId](const auto& members) {
1968 223 : pimpl_->setConversationMembers(conversationId, members);
1969 223 : });
1970 128 : conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1971 : #ifdef LIBJAMI_TESTABLE
1972 128 : conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
1973 : #endif // LIBJAMI_TESTABLE
1974 256 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
1975 128 : pimpl_.get(),
1976 : conversationId),
1977 : kd);
1978 128 : } catch (const std::exception& e) {
1979 0 : JAMI_ERR("[Account %s] Error while generating a conversation %s",
1980 : pimpl_->accountId_.c_str(),
1981 : e.what());
1982 0 : return {};
1983 0 : }
1984 128 : auto convId = conversation->id();
1985 128 : auto conv = pimpl_->startConversation(convId);
1986 128 : std::unique_lock lk(conv->mtx);
1987 128 : conv->info.created = std::time(nullptr);
1988 128 : conv->info.members.emplace(pimpl_->username_);
1989 128 : if (!otherMember.empty())
1990 51 : conv->info.members.emplace(otherMember);
1991 128 : conv->conversation = conversation;
1992 128 : addConvInfo(conv->info);
1993 128 : lk.unlock();
1994 :
1995 128 : pimpl_->needsSyncingCb_({});
1996 128 : emitSignal<libjami::ConversationSignal::ConversationReady>(pimpl_->accountId_, convId);
1997 128 : return convId;
1998 128 : }
1999 :
2000 : void
2001 138 : ConversationModule::cloneConversationFrom(const std::string& conversationId,
2002 : const std::string& uri,
2003 : const std::string& oldConvId)
2004 : {
2005 138 : pimpl_->cloneConversationFrom(conversationId, uri, oldConvId);
2006 138 : }
2007 :
2008 : // Message send/load
2009 : void
2010 55 : ConversationModule::sendMessage(const std::string& conversationId,
2011 : std::string message,
2012 : const std::string& replyTo,
2013 : const std::string& type,
2014 : bool announce,
2015 : OnCommitCb&& onCommit,
2016 : OnDoneCb&& cb)
2017 : {
2018 110 : pimpl_->sendMessage(conversationId,
2019 55 : std::move(message),
2020 : replyTo,
2021 : type,
2022 : announce,
2023 55 : std::move(onCommit),
2024 55 : std::move(cb));
2025 55 : }
2026 :
2027 : void
2028 13 : ConversationModule::sendMessage(const std::string& conversationId,
2029 : Json::Value&& value,
2030 : const std::string& replyTo,
2031 : bool announce,
2032 : OnCommitCb&& onCommit,
2033 : OnDoneCb&& cb)
2034 : {
2035 26 : pimpl_->sendMessage(conversationId,
2036 13 : std::move(value),
2037 : replyTo,
2038 : announce,
2039 13 : std::move(onCommit),
2040 13 : std::move(cb));
2041 13 : }
2042 :
2043 : void
2044 0 : ConversationModule::editMessage(const std::string& conversationId,
2045 : const std::string& newBody,
2046 : const std::string& editedId)
2047 : {
2048 0 : pimpl_->editMessage(conversationId, newBody, editedId);
2049 0 : }
2050 :
2051 : void
2052 0 : ConversationModule::reactToMessage(const std::string& conversationId,
2053 : const std::string& newBody,
2054 : const std::string& reactToId)
2055 : {
2056 : // Commit message edition
2057 0 : Json::Value json;
2058 0 : json["body"] = newBody;
2059 0 : json["react-to"] = reactToId;
2060 0 : json["type"] = "text/plain";
2061 0 : pimpl_->sendMessage(conversationId, std::move(json));
2062 0 : }
2063 :
2064 : void
2065 52 : ConversationModule::addCallHistoryMessage(const std::string& uri,
2066 : uint64_t duration_ms,
2067 : const std::string& reason)
2068 : {
2069 52 : auto finalUri = uri.substr(0, uri.find("@ring.dht"));
2070 52 : finalUri = finalUri.substr(0, uri.find("@jami.dht"));
2071 52 : auto convId = getOneToOneConversation(finalUri);
2072 52 : if (!convId.empty()) {
2073 3 : Json::Value value;
2074 3 : value["to"] = finalUri;
2075 3 : value["type"] = "application/call-history+json";
2076 3 : value["duration"] = std::to_string(duration_ms);
2077 3 : if (!reason.empty())
2078 3 : value["reason"] = reason;
2079 3 : sendMessage(convId, std::move(value));
2080 3 : }
2081 52 : }
2082 :
2083 : bool
2084 12 : ConversationModule::onMessageDisplayed(const std::string& peer,
2085 : const std::string& conversationId,
2086 : const std::string& interactionId)
2087 : {
2088 12 : if (auto conv = pimpl_->getConversation(conversationId)) {
2089 12 : std::unique_lock lk(conv->mtx);
2090 12 : if (auto conversation = conv->conversation) {
2091 12 : lk.unlock();
2092 12 : return conversation->setMessageDisplayed(peer, interactionId);
2093 12 : }
2094 24 : }
2095 0 : return false;
2096 : }
2097 :
2098 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
2099 220 : ConversationModule::convMessageStatus() const
2100 : {
2101 220 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
2102 220 : std::lock_guard lk(pimpl_->conversationsMtx_);
2103 347 : for (const auto& [id, conv] : pimpl_->conversations_) {
2104 127 : if (conv && conv->conversation) {
2105 85 : auto d = conv->conversation->messageStatus();
2106 85 : if (!d.empty())
2107 66 : messageStatus[id] = std::move(d);
2108 85 : }
2109 : }
2110 439 : return messageStatus;
2111 219 : }
2112 :
2113 : uint32_t
2114 0 : ConversationModule::loadConversationMessages(const std::string& conversationId,
2115 : const std::string& fromMessage,
2116 : size_t n)
2117 : {
2118 0 : auto acc = pimpl_->account_.lock();
2119 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2120 0 : std::lock_guard lk(conv->mtx);
2121 0 : if (conv->conversation) {
2122 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2123 0 : LogOptions options;
2124 0 : options.from = fromMessage;
2125 0 : options.nbOfCommits = n;
2126 0 : conv->conversation->loadMessages(
2127 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2128 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2129 0 : accountId,
2130 0 : conversationId,
2131 : messages);
2132 0 : },
2133 : options);
2134 0 : return id;
2135 0 : }
2136 0 : }
2137 0 : return 0;
2138 0 : }
2139 :
2140 : void
2141 0 : ConversationModule::clearCache(const std::string& conversationId)
2142 : {
2143 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2144 0 : std::lock_guard lk(conv->mtx);
2145 0 : if (conv->conversation) {
2146 0 : conv->conversation->clearCache();
2147 : }
2148 0 : }
2149 0 : }
2150 :
2151 : uint32_t
2152 2 : ConversationModule::loadConversation(const std::string& conversationId,
2153 : const std::string& fromMessage,
2154 : size_t n)
2155 : {
2156 2 : auto acc = pimpl_->account_.lock();
2157 2 : if (auto conv = pimpl_->getConversation(conversationId)) {
2158 2 : std::lock_guard lk(conv->mtx);
2159 2 : if (conv->conversation) {
2160 2 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2161 2 : LogOptions options;
2162 2 : options.from = fromMessage;
2163 2 : options.nbOfCommits = n;
2164 4 : conv->conversation->loadMessages2(
2165 2 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2166 4 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2167 2 : accountId,
2168 2 : conversationId,
2169 : messages);
2170 2 : },
2171 : options);
2172 2 : return id;
2173 2 : }
2174 4 : }
2175 0 : return 0;
2176 2 : }
2177 :
2178 : uint32_t
2179 0 : ConversationModule::loadConversationUntil(const std::string& conversationId,
2180 : const std::string& fromMessage,
2181 : const std::string& toMessage)
2182 : {
2183 0 : auto acc = pimpl_->account_.lock();
2184 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2185 0 : std::lock_guard lk(conv->mtx);
2186 0 : if (conv->conversation) {
2187 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2188 0 : LogOptions options;
2189 0 : options.from = fromMessage;
2190 0 : options.to = toMessage;
2191 0 : options.includeTo = true;
2192 0 : conv->conversation->loadMessages(
2193 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2194 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2195 0 : accountId,
2196 0 : conversationId,
2197 : messages);
2198 0 : },
2199 : options);
2200 0 : return id;
2201 0 : }
2202 0 : }
2203 0 : return 0;
2204 0 : }
2205 :
2206 : uint32_t
2207 0 : ConversationModule::loadSwarmUntil(const std::string& conversationId,
2208 : const std::string& fromMessage,
2209 : const std::string& toMessage)
2210 : {
2211 0 : auto acc = pimpl_->account_.lock();
2212 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2213 0 : std::lock_guard lk(conv->mtx);
2214 0 : if (conv->conversation) {
2215 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2216 0 : LogOptions options;
2217 0 : options.from = fromMessage;
2218 0 : options.to = toMessage;
2219 0 : options.includeTo = true;
2220 0 : conv->conversation->loadMessages2(
2221 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2222 0 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2223 0 : accountId,
2224 0 : conversationId,
2225 : messages);
2226 0 : },
2227 : options);
2228 0 : return id;
2229 0 : }
2230 0 : }
2231 0 : return 0;
2232 0 : }
2233 :
2234 : std::shared_ptr<TransferManager>
2235 62 : ConversationModule::dataTransfer(const std::string& conversationId) const
2236 : {
2237 62 : return pimpl_->withConversation(conversationId,
2238 122 : [](auto& conversation) { return conversation.dataTransfer(); });
2239 : }
2240 :
2241 : bool
2242 11 : ConversationModule::onFileChannelRequest(const std::string& conversationId,
2243 : const std::string& member,
2244 : const std::string& fileId,
2245 : bool verifyShaSum) const
2246 : {
2247 11 : if (auto conv = pimpl_->getConversation(conversationId)) {
2248 11 : std::lock_guard lk(conv->mtx);
2249 11 : if (conv->conversation)
2250 11 : return conv->conversation->onFileChannelRequest(member, fileId, verifyShaSum);
2251 22 : }
2252 0 : return false;
2253 : }
2254 :
2255 : bool
2256 11 : ConversationModule::downloadFile(const std::string& conversationId,
2257 : const std::string& interactionId,
2258 : const std::string& fileId,
2259 : const std::string& path,
2260 : size_t start,
2261 : size_t end)
2262 : {
2263 11 : if (auto conv = pimpl_->getConversation(conversationId)) {
2264 11 : std::lock_guard lk(conv->mtx);
2265 11 : if (conv->conversation)
2266 11 : return conv->conversation->downloadFile(interactionId, fileId, path, "", "", start, end);
2267 22 : }
2268 0 : return false;
2269 : }
2270 :
2271 : void
2272 1094 : ConversationModule::syncConversations(const std::string& peer, const std::string& deviceId)
2273 : {
2274 : // Sync conversations where peer is member
2275 1094 : std::set<std::string> toFetch;
2276 1094 : std::set<std::string> toClone;
2277 : {
2278 1094 : std::lock_guard lkCI(pimpl_->conversationsMtx_);
2279 1676 : for (const auto& [key, conv] : pimpl_->conversations_) {
2280 582 : std::lock_guard lk(conv->mtx);
2281 582 : if (conv->conversation) {
2282 555 : if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
2283 113 : toFetch.emplace(key);
2284 : // TODO connect to Swarm
2285 : // if (!conv->conversation->hasSwarmChannel(deviceId)) {
2286 : // if (auto acc = pimpl_->account_.lock()) {
2287 : // }
2288 : // }
2289 : }
2290 27 : } else if (!conv->info.isRemoved()
2291 71 : && std::find(conv->info.members.begin(), conv->info.members.end(), peer)
2292 71 : != conv->info.members.end()) {
2293 : // In this case the conversation was never cloned (can be after an import)
2294 22 : toClone.emplace(key);
2295 : }
2296 582 : }
2297 1094 : }
2298 1209 : for (const auto& cid : toFetch)
2299 113 : pimpl_->fetchNewCommits(peer, deviceId, cid);
2300 1118 : for (const auto& cid : toClone)
2301 22 : pimpl_->cloneConversation(deviceId, peer, cid);
2302 2192 : if (pimpl_->syncCnt.load() == 0) {
2303 705 : if (auto acc = pimpl_->account_.lock())
2304 705 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(
2305 1410 : acc->getAccountID().c_str());
2306 : }
2307 1096 : }
2308 :
2309 : void
2310 163 : ConversationModule::onSyncData(const SyncMsg& msg,
2311 : const std::string& peerId,
2312 : const std::string& deviceId)
2313 : {
2314 163 : std::vector<std::string> toClone;
2315 268 : for (const auto& [key, convInfo] : msg.c) {
2316 106 : const auto& convId = convInfo.id;
2317 : {
2318 106 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2319 106 : pimpl_->rmConversationRequest(convId);
2320 106 : }
2321 :
2322 106 : auto conv = pimpl_->startConversation(convInfo);
2323 106 : std::unique_lock lk(conv->mtx);
2324 : // Skip outdated info
2325 106 : if (std::max(convInfo.created, convInfo.removed)
2326 106 : < std::max(conv->info.created, conv->info.removed))
2327 2 : continue;
2328 104 : if (not convInfo.isRemoved()) {
2329 : // If multi devices, it can detect a conversation that was already
2330 : // removed, so just check if the convinfo contains a removed conv
2331 92 : if (conv->info.removed) {
2332 0 : if (conv->info.removed >= convInfo.created) {
2333 : // Only reclone if re-added, else the peer is not synced yet (could be
2334 : // offline before)
2335 0 : continue;
2336 : }
2337 0 : JAMI_DEBUG("Re-add previously removed conversation {:s}", convId);
2338 : }
2339 92 : conv->info = convInfo;
2340 92 : if (!conv->conversation) {
2341 50 : if (deviceId != "") {
2342 50 : pimpl_->cloneConversation(deviceId, peerId, conv);
2343 : } else {
2344 : // In this case, informations are from JAMS
2345 : // JAMS doesn't store the conversation itself, so we
2346 : // must use informations to clone the conversation
2347 0 : addConvInfo(convInfo);
2348 0 : toClone.emplace_back(convId);
2349 : }
2350 : }
2351 : } else {
2352 12 : if (conv->conversation && !conv->conversation->isRemoving()) {
2353 3 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
2354 : convId);
2355 3 : conv->conversation->setRemovingFlag();
2356 : }
2357 12 : auto update = false;
2358 12 : if (!conv->info.removed) {
2359 2 : update = true;
2360 2 : conv->info.removed = std::time(nullptr);
2361 : }
2362 12 : if (convInfo.erased && !conv->info.erased) {
2363 1 : conv->info.erased = std::time(nullptr);
2364 1 : pimpl_->addConvInfo(conv->info);
2365 1 : pimpl_->removeRepositoryImpl(*conv, false);
2366 11 : } else if (update) {
2367 2 : pimpl_->addConvInfo(conv->info);
2368 : }
2369 : }
2370 108 : }
2371 :
2372 163 : for (const auto& cid : toClone) {
2373 0 : auto members = getConversationMembers(cid);
2374 0 : for (const auto& member : members) {
2375 0 : if (member.at("uri") != pimpl_->username_)
2376 0 : cloneConversationFrom(cid, member.at("uri"));
2377 : }
2378 0 : }
2379 :
2380 183 : for (const auto& [convId, req] : msg.cr) {
2381 20 : if (req.from == pimpl_->username_) {
2382 0 : JAMI_WARNING("Detected request from ourself, ignore {}.", convId);
2383 15 : continue;
2384 0 : }
2385 20 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
2386 20 : if (pimpl_->isConversation(convId)) {
2387 : // Already handled request
2388 1 : pimpl_->rmConversationRequest(convId);
2389 1 : continue;
2390 : }
2391 :
2392 : // New request
2393 19 : if (!pimpl_->addConversationRequest(convId, req))
2394 8 : continue;
2395 11 : lk.unlock();
2396 :
2397 11 : if (req.declined != 0) {
2398 : // Request declined
2399 18 : JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
2400 : pimpl_->accountId_,
2401 : convId,
2402 : deviceId);
2403 6 : pimpl_->syncingMetadatas_.erase(convId);
2404 6 : pimpl_->saveMetadatas();
2405 6 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2406 : convId);
2407 6 : continue;
2408 6 : }
2409 :
2410 15 : JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})",
2411 : pimpl_->accountId_,
2412 : convId,
2413 : deviceId);
2414 :
2415 5 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
2416 : convId,
2417 10 : req.toMap());
2418 20 : }
2419 :
2420 : // Updates preferences for conversations
2421 163 : for (const auto& [convId, p] : msg.p) {
2422 0 : if (auto conv = pimpl_->getConversation(convId)) {
2423 0 : std::unique_lock lk(conv->mtx);
2424 0 : if (conv->conversation) {
2425 0 : auto conversation = conv->conversation;
2426 0 : lk.unlock();
2427 0 : conversation->updatePreferences(p);
2428 0 : } else if (conv->pending) {
2429 0 : conv->pending->preferences = p;
2430 : }
2431 0 : }
2432 : }
2433 :
2434 : // Updates displayed for conversations
2435 238 : for (const auto& [convId, ms] : msg.ms) {
2436 75 : if (auto conv = pimpl_->getConversation(convId)) {
2437 75 : std::unique_lock lk(conv->mtx);
2438 75 : if (conv->conversation) {
2439 29 : auto conversation = conv->conversation;
2440 29 : lk.unlock();
2441 29 : conversation->updateMessageStatus(ms);
2442 75 : } else if (conv->pending) {
2443 44 : conv->pending->status = ms;
2444 : }
2445 150 : }
2446 : }
2447 163 : }
2448 :
2449 : bool
2450 2 : ConversationModule::needsSyncingWith(const std::string& memberUri, const std::string& deviceId) const
2451 : {
2452 : // Check if a conversation needs to fetch remote or to be cloned
2453 2 : std::lock_guard lk(pimpl_->conversationsMtx_);
2454 2 : for (const auto& [key, ci] : pimpl_->conversations_) {
2455 1 : std::lock_guard lk(ci->mtx);
2456 1 : if (ci->conversation) {
2457 0 : if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false))
2458 0 : return true;
2459 1 : } else if (!ci->info.removed
2460 3 : && std::find(ci->info.members.begin(), ci->info.members.end(), memberUri)
2461 3 : != ci->info.members.end()) {
2462 : // In this case the conversation was never cloned (can be after an import)
2463 1 : return true;
2464 : }
2465 1 : }
2466 1 : return false;
2467 2 : }
2468 :
2469 : void
2470 996 : ConversationModule::setFetched(const std::string& conversationId,
2471 : const std::string& deviceId,
2472 : const std::string& commitId)
2473 : {
2474 996 : if (auto conv = pimpl_->getConversation(conversationId)) {
2475 997 : std::lock_guard lk(conv->mtx);
2476 997 : if (conv->conversation) {
2477 997 : bool remove = conv->conversation->isRemoving();
2478 997 : conv->conversation->hasFetched(deviceId, commitId);
2479 997 : if (remove)
2480 1 : pimpl_->removeRepositoryImpl(*conv, true);
2481 : }
2482 1994 : }
2483 997 : }
2484 :
2485 : void
2486 27610 : ConversationModule::fetchNewCommits(const std::string& peer,
2487 : const std::string& deviceId,
2488 : const std::string& conversationId,
2489 : const std::string& commitId)
2490 : {
2491 27610 : pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId);
2492 27610 : }
2493 :
2494 : void
2495 107 : ConversationModule::addConversationMember(const std::string& conversationId,
2496 : const std::string& contactUri,
2497 : bool sendRequest)
2498 : {
2499 107 : auto conv = pimpl_->getConversation(conversationId);
2500 107 : if (not conv || not conv->conversation) {
2501 0 : JAMI_ERROR("Conversation {:s} doesn't exist", conversationId);
2502 0 : return;
2503 : }
2504 107 : std::unique_lock lk(conv->mtx);
2505 :
2506 107 : if (conv->conversation->isMember(contactUri, true)) {
2507 0 : JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUri, conversationId);
2508 : // Note: This should not be necessary, but if for whatever reason the other side didn't
2509 : // join we should not forbid new invites
2510 0 : auto invite = conv->conversation->generateInvitation();
2511 0 : lk.unlock();
2512 0 : pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0);
2513 0 : return;
2514 0 : }
2515 :
2516 107 : conv->conversation->addMember(
2517 : contactUri,
2518 419 : [this, conv, conversationId, sendRequest, contactUri](bool ok, const std::string& commitId) {
2519 107 : if (ok) {
2520 105 : std::unique_lock lk(conv->mtx);
2521 105 : pimpl_->sendMessageNotification(*conv->conversation,
2522 : true,
2523 : commitId); // For the other members
2524 105 : if (sendRequest) {
2525 102 : auto invite = conv->conversation->generateInvitation();
2526 102 : lk.unlock();
2527 102 : pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0);
2528 102 : }
2529 105 : }
2530 107 : });
2531 107 : }
2532 :
2533 : void
2534 13 : ConversationModule::removeConversationMember(const std::string& conversationId,
2535 : const std::string& contactUri,
2536 : bool isDevice)
2537 : {
2538 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2539 13 : std::lock_guard lk(conv->mtx);
2540 13 : if (conv->conversation)
2541 13 : return conv->conversation->removeMember(
2542 24 : contactUri, isDevice, [this, conversationId](bool ok, const std::string& commitId) {
2543 13 : if (ok) {
2544 11 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2545 : }
2546 26 : });
2547 26 : }
2548 : }
2549 :
2550 : std::vector<std::map<std::string, std::string>>
2551 75 : ConversationModule::getConversationMembers(const std::string& conversationId,
2552 : bool includeBanned) const
2553 : {
2554 75 : return pimpl_->getConversationMembers(conversationId, includeBanned);
2555 : }
2556 :
2557 : uint32_t
2558 0 : ConversationModule::countInteractions(const std::string& convId,
2559 : const std::string& toId,
2560 : const std::string& fromId,
2561 : const std::string& authorUri) const
2562 : {
2563 0 : if (auto conv = pimpl_->getConversation(convId)) {
2564 0 : std::lock_guard lk(conv->mtx);
2565 0 : if (conv->conversation)
2566 0 : return conv->conversation->countInteractions(toId, fromId, authorUri);
2567 0 : }
2568 0 : return 0;
2569 : }
2570 :
2571 : void
2572 0 : ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const
2573 : {
2574 0 : if (convId.empty()) {
2575 0 : auto finishedFlag = std::make_shared<std::atomic_int>(pimpl_->conversations_.size());
2576 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
2577 0 : for (const auto& [cid, conv] : pimpl_->conversations_) {
2578 0 : std::lock_guard lk(conv->mtx);
2579 0 : if (!conv->conversation) {
2580 0 : if ((*finishedFlag)-- == 1) {
2581 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2582 : req,
2583 0 : pimpl_->accountId_,
2584 0 : std::string {},
2585 0 : std::vector<std::map<std::string, std::string>> {});
2586 : }
2587 : } else {
2588 0 : conv->conversation->search(req, filter, finishedFlag);
2589 : }
2590 0 : }
2591 0 : } else if (auto conv = pimpl_->getConversation(convId)) {
2592 0 : std::lock_guard lk(conv->mtx);
2593 0 : if (conv->conversation)
2594 0 : conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1));
2595 0 : }
2596 0 : }
2597 :
2598 : void
2599 4 : ConversationModule::updateConversationInfos(const std::string& conversationId,
2600 : const std::map<std::string, std::string>& infos,
2601 : bool sync)
2602 : {
2603 4 : auto conv = pimpl_->getConversation(conversationId);
2604 4 : if (not conv or not conv->conversation) {
2605 0 : JAMI_ERROR("Conversation {:s} doesn't exist", conversationId);
2606 0 : return;
2607 : }
2608 4 : std::lock_guard lk(conv->mtx);
2609 4 : conv->conversation
2610 4 : ->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) {
2611 4 : if (ok && sync) {
2612 4 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2613 0 : } else if (sync)
2614 0 : JAMI_WARNING("Couldn't update infos on {:s}", conversationId);
2615 4 : });
2616 4 : }
2617 :
2618 : std::map<std::string, std::string>
2619 1 : ConversationModule::conversationInfos(const std::string& conversationId) const
2620 : {
2621 : {
2622 1 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2623 1 : auto itReq = pimpl_->conversationsRequests_.find(conversationId);
2624 1 : if (itReq != pimpl_->conversationsRequests_.end())
2625 0 : return itReq->second.metadatas;
2626 1 : }
2627 1 : if (auto conv = pimpl_->getConversation(conversationId)) {
2628 1 : std::lock_guard lk(conv->mtx);
2629 1 : std::map<std::string, std::string> md;
2630 : {
2631 1 : auto syncingMetadatasIt = pimpl_->syncingMetadatas_.find(conversationId);
2632 1 : if (syncingMetadatasIt != pimpl_->syncingMetadatas_.end()) {
2633 1 : if (conv->conversation) {
2634 0 : pimpl_->syncingMetadatas_.erase(syncingMetadatasIt);
2635 0 : pimpl_->saveMetadatas();
2636 : } else {
2637 1 : md = syncingMetadatasIt->second;
2638 : }
2639 : }
2640 : }
2641 1 : if (conv->conversation)
2642 0 : return conv->conversation->infos();
2643 : else
2644 1 : return md;
2645 2 : }
2646 0 : JAMI_ERROR("Conversation {:s} doesn't exist", conversationId);
2647 0 : return {};
2648 : }
2649 :
2650 : void
2651 1 : ConversationModule::setConversationPreferences(const std::string& conversationId,
2652 : const std::map<std::string, std::string>& prefs)
2653 : {
2654 1 : if (auto conv = pimpl_->getConversation(conversationId)) {
2655 1 : std::unique_lock lk(conv->mtx);
2656 1 : if (not conv->conversation) {
2657 0 : JAMI_ERROR("Conversation {:s} doesn't exist", conversationId);
2658 0 : return;
2659 : }
2660 1 : auto conversation = conv->conversation;
2661 1 : lk.unlock();
2662 1 : conversation->updatePreferences(prefs);
2663 1 : auto msg = std::make_shared<SyncMsg>();
2664 2 : msg->p = {{conversationId, conversation->preferences(true)}};
2665 1 : pimpl_->needsSyncingCb_(std::move(msg));
2666 2 : }
2667 : }
2668 :
2669 : std::map<std::string, std::string>
2670 10 : ConversationModule::getConversationPreferences(const std::string& conversationId,
2671 : bool includeCreated) const
2672 : {
2673 10 : if (auto conv = pimpl_->getConversation(conversationId)) {
2674 10 : std::lock_guard lk(conv->mtx);
2675 10 : if (conv->conversation)
2676 10 : return conv->conversation->preferences(includeCreated);
2677 20 : }
2678 0 : return {};
2679 : }
2680 :
2681 : std::map<std::string, std::map<std::string, std::string>>
2682 220 : ConversationModule::convPreferences() const
2683 : {
2684 220 : std::map<std::string, std::map<std::string, std::string>> p;
2685 220 : std::lock_guard lk(pimpl_->conversationsMtx_);
2686 347 : for (const auto& [id, conv] : pimpl_->conversations_) {
2687 127 : if (conv && conv->conversation) {
2688 85 : auto prefs = conv->conversation->preferences(true);
2689 85 : if (!prefs.empty())
2690 0 : p[id] = std::move(prefs);
2691 85 : }
2692 : }
2693 440 : return p;
2694 220 : }
2695 :
2696 : std::vector<uint8_t>
2697 0 : ConversationModule::conversationVCard(const std::string& conversationId) const
2698 : {
2699 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2700 0 : std::lock_guard lk(conv->mtx);
2701 0 : if (conv->conversation)
2702 0 : return conv->conversation->vCard();
2703 0 : }
2704 0 : JAMI_ERROR("Conversation {:s} doesn't exist", conversationId);
2705 0 : return {};
2706 : }
2707 :
2708 : bool
2709 4036 : ConversationModule::isBanned(const std::string& convId, const std::string& uri) const
2710 : {
2711 4036 : if (auto conv = pimpl_->getConversation(convId)) {
2712 4023 : std::lock_guard lk(conv->mtx);
2713 4025 : if (!conv->conversation)
2714 63 : return true;
2715 3959 : if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE)
2716 3501 : return conv->conversation->isBanned(uri);
2717 8059 : }
2718 : // If 1:1 we check the certificate status
2719 470 : if (auto acc = pimpl_->account_.lock()) {
2720 470 : if (auto am = acc->accountManager())
2721 470 : return am->getCertificateStatus(uri)
2722 470 : == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
2723 470 : }
2724 0 : return true;
2725 : }
2726 :
2727 : void
2728 23 : ConversationModule::removeContact(const std::string& uri, bool banned)
2729 : {
2730 : // Remove linked conversation's requests
2731 : {
2732 23 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2733 23 : auto update = false;
2734 23 : for (auto it = pimpl_->conversationsRequests_.begin();
2735 27 : it != pimpl_->conversationsRequests_.end();
2736 4 : ++it) {
2737 4 : if (it->second.from == uri && !it->second.declined) {
2738 12 : JAMI_DEBUG("Declining conversation request {:s} from {:s}", it->first, uri);
2739 4 : pimpl_->syncingMetadatas_.erase(it->first);
2740 4 : pimpl_->saveMetadatas();
2741 4 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(
2742 4 : pimpl_->accountId_, it->first);
2743 4 : update = true;
2744 4 : it->second.declined = std::time(nullptr);
2745 : }
2746 : }
2747 23 : if (update) {
2748 4 : pimpl_->saveConvRequests();
2749 4 : pimpl_->needsSyncingCb_({});
2750 : }
2751 23 : }
2752 23 : if (banned) {
2753 7 : auto conversationId = getOneToOneConversation(uri);
2754 11 : pimpl_->withConversation(conversationId, [&](auto& conv) { conv.shutdownConnections(); });
2755 7 : return; // Keep the conversation in banned model but stop connections
2756 7 : }
2757 : // Remove related conversation
2758 16 : auto isSelf = uri == pimpl_->username_;
2759 16 : std::vector<std::string> toRm;
2760 14 : auto updateClient = [&](const auto& convId) {
2761 14 : if (pimpl_->updateConvReqCb_)
2762 14 : pimpl_->updateConvReqCb_(convId, uri, false);
2763 14 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, convId);
2764 14 : };
2765 15 : auto removeConvInfo = [&](const auto& conv, const auto& members) {
2766 1 : if ((isSelf && members.size() == 1)
2767 16 : || (!isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
2768 : // Mark as removed
2769 14 : conv->info.removed = std::time(nullptr);
2770 14 : updateClient(conv->info.id);
2771 14 : pimpl_->addConvInfo(conv->info);
2772 14 : return true;
2773 : }
2774 1 : return false;
2775 16 : };
2776 : {
2777 16 : std::lock_guard lk(pimpl_->conversationsMtx_);
2778 31 : for (auto& [convId, conv] : pimpl_->conversations_) {
2779 15 : std::lock_guard lk(conv->mtx);
2780 15 : if (conv->conversation) {
2781 : try {
2782 : // Note it's important to check getUsername(), else
2783 : // removing self can remove all conversations
2784 12 : if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) {
2785 12 : auto initMembers = conv->conversation->getInitialMembers();
2786 12 : if (removeConvInfo(conv, initMembers))
2787 11 : toRm.emplace_back(convId);
2788 12 : }
2789 0 : } catch (const std::exception& e) {
2790 0 : JAMI_WARN("%s", e.what());
2791 0 : }
2792 : } else {
2793 3 : removeConvInfo(conv, conv->info.members);
2794 : }
2795 15 : }
2796 16 : }
2797 : // Note, if we ban the device, we don't send the leave cause the other peer will just
2798 : // never got the notifications, so just erase the datas
2799 27 : for (const auto& id : toRm)
2800 11 : pimpl_->removeRepository(id, true, true);
2801 16 : }
2802 :
2803 : bool
2804 6 : ConversationModule::removeConversation(const std::string& conversationId)
2805 : {
2806 6 : return pimpl_->removeConversation(conversationId);
2807 : }
2808 :
2809 : void
2810 5 : ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId)
2811 : {
2812 5 : if (auto conv = pimpl_->getConversation(oldConvId)) {
2813 5 : std::lock_guard lk(conv->mtx);
2814 5 : if (conv->conversation) {
2815 5 : std::promise<bool> waitLoad;
2816 5 : std::future<bool> fut = waitLoad.get_future();
2817 : // we should wait for loadMessage, because it will be deleted after this.
2818 5 : conv->conversation->loadMessages(
2819 5 : [&](auto&& messages) {
2820 5 : std::reverse(messages.begin(),
2821 : messages.end()); // Log is inverted as we want to replay
2822 5 : std::lock_guard lk(pimpl_->replayMtx_);
2823 5 : pimpl_->replay_[newConvId] = std::move(messages);
2824 5 : waitLoad.set_value(true);
2825 5 : },
2826 : {});
2827 5 : fut.wait();
2828 5 : }
2829 10 : }
2830 5 : }
2831 :
2832 : bool
2833 14 : ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const
2834 : {
2835 14 : if (conversationId.empty()) {
2836 4 : std::lock_guard lk(pimpl_->conversationsMtx_);
2837 4 : return std::find_if(pimpl_->conversations_.cbegin(),
2838 4 : pimpl_->conversations_.cend(),
2839 4 : [&](const auto& conv) {
2840 4 : return conv.second->conversation
2841 4 : && conv.second->conversation->isHosting(confId);
2842 : })
2843 8 : != pimpl_->conversations_.cend();
2844 14 : } else if (auto conv = pimpl_->getConversation(conversationId)) {
2845 10 : if (conv->conversation) {
2846 10 : return conv->conversation->isHosting(confId);
2847 : }
2848 10 : }
2849 0 : return false;
2850 : }
2851 :
2852 : std::vector<std::map<std::string, std::string>>
2853 17 : ConversationModule::getActiveCalls(const std::string& conversationId) const
2854 : {
2855 34 : return pimpl_->withConversation(conversationId, [](const auto& conversation) {
2856 17 : return conversation.currentCalls();
2857 17 : });
2858 : }
2859 :
2860 : void
2861 22 : ConversationModule::call(const std::string& url,
2862 : const std::shared_ptr<SIPCall>& call,
2863 : std::function<void(const std::string&, const DeviceId&)>&& cb)
2864 : {
2865 88 : std::string conversationId = "", confId = "", uri = "", deviceId = "";
2866 22 : if (url.find('/') == std::string::npos) {
2867 13 : conversationId = url;
2868 : } else {
2869 9 : auto parameters = jami::split_string(url, '/');
2870 9 : if (parameters.size() != 4) {
2871 0 : JAMI_ERROR("Incorrect url {:s}", url);
2872 0 : return;
2873 : }
2874 9 : conversationId = parameters[0];
2875 9 : uri = parameters[1];
2876 9 : deviceId = parameters[2];
2877 9 : confId = parameters[3];
2878 9 : }
2879 :
2880 22 : std::string callUri;
2881 11 : auto sendCall = [&]() {
2882 11 : call->setState(Call::ConnectionState::TRYING);
2883 11 : call->setPeerNumber(callUri);
2884 11 : call->setPeerUri("rdv:" + callUri);
2885 11 : call->addStateListener([w = pimpl_->weak(), conversationId](Call::CallState call_state,
2886 : Call::ConnectionState cnx_state,
2887 : int) {
2888 62 : if (cnx_state == Call::ConnectionState::DISCONNECTED
2889 22 : && call_state == Call::CallState::MERROR) {
2890 1 : auto shared = w.lock();
2891 1 : if (!shared)
2892 0 : return false;
2893 1 : if (auto acc = shared->account_.lock())
2894 1 : emitSignal<libjami::ConfigurationSignal::NeedsHost>(acc->getAccountID(),
2895 2 : conversationId);
2896 1 : return true;
2897 1 : }
2898 61 : return true;
2899 : });
2900 11 : cb(callUri, DeviceId(deviceId));
2901 11 : };
2902 :
2903 22 : auto conv = pimpl_->getConversation(conversationId);
2904 22 : if (!conv)
2905 0 : return;
2906 22 : std::unique_lock lk(conv->mtx);
2907 22 : if (!conv->conversation) {
2908 0 : JAMI_ERROR("Conversation {:s} not found", conversationId);
2909 0 : return;
2910 : }
2911 :
2912 : // Check if we want to join a specific conference
2913 : // So, if confId is specified or if there is some activeCalls
2914 : // or if we are the default host.
2915 22 : auto activeCalls = conv->conversation->currentCalls();
2916 22 : auto infos = conv->conversation->infos();
2917 22 : auto itRdvAccount = infos.find("rdvAccount");
2918 22 : auto itRdvDevice = infos.find("rdvDevice");
2919 22 : auto sendCallRequest = false;
2920 22 : if (confId != "") {
2921 9 : sendCallRequest = true;
2922 9 : confId = confId == "0" ? Manager::instance().callFactory.getNewCallID() : confId;
2923 27 : JAMI_DEBUG("Calling self, join conference");
2924 13 : } else if (!activeCalls.empty()) {
2925 : // Else, we try to join active calls
2926 0 : sendCallRequest = true;
2927 0 : auto& ac = *activeCalls.rbegin();
2928 0 : confId = ac.at("id");
2929 0 : uri = ac.at("uri");
2930 0 : deviceId = ac.at("device");
2931 0 : JAMI_DEBUG("Calling last active call: {:s}", callUri);
2932 13 : } else if (itRdvAccount != infos.end() && itRdvDevice != infos.end()) {
2933 : // Else, creates "to" (accountId/deviceId/conversationId/confId) and ask remote host
2934 3 : sendCallRequest = true;
2935 3 : uri = itRdvAccount->second;
2936 3 : deviceId = itRdvDevice->second;
2937 3 : confId = "0";
2938 9 : JAMI_DEBUG("Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
2939 : }
2940 :
2941 22 : if (sendCallRequest) {
2942 12 : callUri = fmt::format("{}/{}/{}/{}", conversationId, uri, deviceId, confId);
2943 12 : if (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_) {
2944 : // In this case, we're probably hosting the conference.
2945 1 : call->setState(Call::ConnectionState::CONNECTED);
2946 : // In this case, the call is the only one in the conference
2947 : // and there is no peer, so media succeeded and are shown to
2948 : // the client.
2949 1 : call->reportMediaNegotiationStatus();
2950 1 : lk.unlock();
2951 1 : if (confId == "0")
2952 1 : confId = call->getCallId();
2953 1 : hostConference(conversationId, confId, call->getCallId());
2954 1 : return;
2955 : }
2956 33 : JAMI_DEBUG("Calling: {:s}", callUri);
2957 11 : sendCall();
2958 11 : return;
2959 : }
2960 :
2961 : // Else, we are the host.
2962 10 : confId = Manager::instance().callFactory.getNewCallID();
2963 10 : call->setState(Call::ConnectionState::CONNECTED);
2964 : // In this case, the call is the only one in the conference
2965 : // and there is no peer, so media succeeded and are shown to
2966 : // the client.
2967 10 : call->reportMediaNegotiationStatus();
2968 10 : lk.unlock();
2969 10 : hostConference(conversationId, confId, call->getCallId());
2970 118 : }
2971 :
2972 : void
2973 14 : ConversationModule::hostConference(const std::string& conversationId,
2974 : const std::string& confId,
2975 : const std::string& callId)
2976 : {
2977 14 : auto acc = pimpl_->account_.lock();
2978 14 : if (!acc)
2979 0 : return;
2980 14 : std::shared_ptr<Call> call;
2981 14 : call = acc->getCall(callId);
2982 14 : if (!call) {
2983 0 : JAMI_WARN("No call with id %s found", callId.c_str());
2984 0 : return;
2985 : }
2986 14 : auto conf = acc->getConference(confId);
2987 14 : auto createConf = !conf;
2988 14 : if (createConf) {
2989 14 : conf = std::make_shared<Conference>(acc, confId, true, call->getMediaAttributeList());
2990 14 : acc->attach(conf);
2991 : }
2992 14 : conf->addParticipant(callId);
2993 :
2994 14 : if (createConf) {
2995 14 : emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(), confId);
2996 : } else {
2997 0 : conf->attachLocalParticipant();
2998 0 : conf->reportMediaNegotiationStatus();
2999 0 : emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(),
3000 0 : conf->getConfId(),
3001 : conf->getStateStr());
3002 0 : return;
3003 : }
3004 :
3005 14 : auto conv = pimpl_->getConversation(conversationId);
3006 14 : if (!conv)
3007 0 : return;
3008 14 : std::unique_lock lk(conv->mtx);
3009 14 : if (!conv->conversation) {
3010 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3011 0 : return;
3012 : }
3013 : // Add commit to conversation
3014 14 : Json::Value value;
3015 14 : value["uri"] = pimpl_->username_;
3016 14 : value["device"] = pimpl_->deviceId_;
3017 14 : value["confId"] = confId;
3018 14 : value["type"] = "application/call-history+json";
3019 28 : conv->conversation->hostConference(std::move(value),
3020 14 : [w = pimpl_->weak(),
3021 : conversationId](bool ok, const std::string& commitId) {
3022 14 : if (ok) {
3023 14 : if (auto shared = w.lock())
3024 14 : shared->sendMessageNotification(conversationId,
3025 : true,
3026 14 : commitId);
3027 : } else {
3028 0 : JAMI_ERR("Failed to send message to conversation %s",
3029 : conversationId.c_str());
3030 : }
3031 14 : });
3032 :
3033 : // When conf finished = remove host & commit
3034 : // Master call, so when it's stopped, the conference will be stopped (as we use the hold
3035 : // state for detaching the call)
3036 42 : conf->onShutdown(
3037 28 : [w = pimpl_->weak(), accountUri = pimpl_->username_, confId, conversationId, call, conv](
3038 : int duration) {
3039 14 : auto shared = w.lock();
3040 14 : if (shared) {
3041 14 : Json::Value value;
3042 14 : value["uri"] = accountUri;
3043 14 : value["device"] = shared->deviceId_;
3044 14 : value["confId"] = confId;
3045 14 : value["type"] = "application/call-history+json";
3046 14 : value["duration"] = std::to_string(duration);
3047 :
3048 14 : std::lock_guard lk(conv->mtx);
3049 14 : if (!conv->conversation) {
3050 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3051 0 : return;
3052 : }
3053 42 : conv->conversation->removeActiveConference(
3054 28 : std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
3055 14 : if (ok) {
3056 14 : if (auto shared = w.lock()) {
3057 14 : shared->sendMessageNotification(conversationId, true, commitId);
3058 14 : }
3059 : } else {
3060 0 : JAMI_ERROR("Failed to send message to conversation {}", conversationId);
3061 : }
3062 14 : });
3063 14 : }
3064 14 : });
3065 14 : }
3066 :
3067 : std::map<std::string, ConvInfo>
3068 683 : ConversationModule::convInfos(const std::string& accountId)
3069 : {
3070 683 : auto path = fileutils::get_data_dir() / accountId;
3071 1368 : return convInfosFromPath(path);
3072 684 : }
3073 :
3074 : std::map<std::string, ConvInfo>
3075 717 : ConversationModule::convInfosFromPath(const std::filesystem::path& path)
3076 : {
3077 717 : std::map<std::string, ConvInfo> convInfos;
3078 : try {
3079 : // read file
3080 1435 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
3081 718 : auto file = fileutils::loadFile("convInfo", path);
3082 : // load values
3083 718 : msgpack::unpacked result;
3084 718 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
3085 709 : result.get().convert(convInfos);
3086 744 : } catch (const std::exception& e) {
3087 9 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3088 9 : }
3089 718 : return convInfos;
3090 0 : }
3091 :
3092 : std::map<std::string, ConversationRequest>
3093 676 : ConversationModule::convRequests(const std::string& accountId)
3094 : {
3095 676 : auto path = fileutils::get_data_dir() / accountId;
3096 1352 : return convRequestsFromPath(path.string());
3097 676 : }
3098 :
3099 : std::map<std::string, ConversationRequest>
3100 710 : ConversationModule::convRequestsFromPath(const std::filesystem::path& path)
3101 : {
3102 710 : std::map<std::string, ConversationRequest> convRequests;
3103 : try {
3104 : // read file
3105 1420 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convRequests"));
3106 710 : auto file = fileutils::loadFile("convRequests", path);
3107 : // load values
3108 710 : msgpack::unpacked result;
3109 710 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
3110 710 : result.get().convert(convRequests);
3111 710 : } catch (const std::exception& e) {
3112 0 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3113 0 : }
3114 710 : return convRequests;
3115 0 : }
3116 :
3117 : void
3118 128 : ConversationModule::addConvInfo(const ConvInfo& info)
3119 : {
3120 128 : pimpl_->addConvInfo(info);
3121 128 : }
3122 :
3123 : void
3124 848 : ConversationModule::Impl::setConversationMembers(const std::string& convId,
3125 : const std::set<std::string>& members)
3126 : {
3127 848 : if (auto conv = getConversation(convId)) {
3128 848 : std::lock_guard lk(conv->mtx);
3129 848 : conv->info.members = members;
3130 848 : addConvInfo(conv->info);
3131 1696 : }
3132 848 : }
3133 :
3134 : std::shared_ptr<Conversation>
3135 0 : ConversationModule::getConversation(const std::string& convId)
3136 : {
3137 0 : if (auto conv = pimpl_->getConversation(convId)) {
3138 0 : std::lock_guard lk(conv->mtx);
3139 0 : return conv->conversation;
3140 0 : }
3141 0 : return nullptr;
3142 : }
3143 :
3144 : std::shared_ptr<dhtnet::ChannelSocket>
3145 5588 : ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const
3146 : {
3147 5588 : if (auto conv = pimpl_->getConversation(convId)) {
3148 5583 : std::lock_guard lk(conv->mtx);
3149 5587 : if (conv->conversation)
3150 10565 : return conv->conversation->gitSocket(DeviceId(deviceId));
3151 303 : else if (conv->pending)
3152 302 : return conv->pending->socket;
3153 11174 : }
3154 2 : return nullptr;
3155 : }
3156 :
3157 : void
3158 0 : ConversationModule::addGitSocket(std::string_view deviceId,
3159 : std::string_view convId,
3160 : const std::shared_ptr<dhtnet::ChannelSocket>& channel)
3161 : {
3162 0 : if (auto conv = pimpl_->getConversation(convId)) {
3163 0 : std::lock_guard lk(conv->mtx);
3164 0 : conv->conversation->addGitSocket(DeviceId(deviceId), channel);
3165 0 : } else
3166 0 : JAMI_WARNING("addGitSocket: can't find conversation {:s}", convId);
3167 0 : }
3168 :
3169 : void
3170 768 : ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
3171 : {
3172 1521 : pimpl_->withConversation(convId, [&](auto& conv) { conv.removeGitSocket(DeviceId(deviceId)); });
3173 768 : }
3174 :
3175 : void
3176 500 : ConversationModule::shutdownConnections()
3177 : {
3178 500 : std::vector<std::shared_ptr<SyncedConversation>> conversations;
3179 : {
3180 500 : std::lock_guard lk(pimpl_->conversationsMtx_);
3181 500 : conversations.reserve(pimpl_->conversations_.size());
3182 817 : for (auto& [k, c] : pimpl_->conversations_) {
3183 317 : conversations.emplace_back(c);
3184 : }
3185 500 : }
3186 817 : for (const auto& c : conversations) {
3187 317 : std::lock_guard lkc(c->mtx);
3188 317 : if (c->conversation)
3189 290 : c->conversation->shutdownConnections();
3190 317 : if (c->pending)
3191 8 : c->pending->socket = {};
3192 317 : }
3193 500 : }
3194 : void
3195 906 : ConversationModule::addSwarmChannel(const std::string& conversationId,
3196 : std::shared_ptr<dhtnet::ChannelSocket> channel)
3197 : {
3198 906 : pimpl_->withConversation(conversationId,
3199 900 : [&](auto& conv) { conv.addSwarmChannel(std::move(channel)); });
3200 907 : }
3201 :
3202 : void
3203 0 : ConversationModule::connectivityChanged()
3204 : {
3205 0 : std::vector<std::shared_ptr<SyncedConversation>> syncedConversations;
3206 : {
3207 0 : std::lock_guard lk(pimpl_->conversationsMtx_);
3208 0 : syncedConversations.reserve(pimpl_->conversations_.size());
3209 0 : for (const auto& [k, c] : pimpl_->conversations_) {
3210 0 : syncedConversations.emplace_back(c);
3211 : }
3212 0 : }
3213 0 : std::vector<std::shared_ptr<Conversation>> conversations;
3214 0 : conversations.reserve(syncedConversations.size());
3215 0 : for (const auto& c : syncedConversations) {
3216 0 : std::lock_guard lkc(c->mtx);
3217 0 : if (c->conversation)
3218 0 : conversations.emplace_back(c->conversation);
3219 0 : }
3220 0 : for (const auto& conv : conversations)
3221 0 : conv->connectivityChanged();
3222 0 : }
3223 :
3224 : std::shared_ptr<Typers>
3225 9 : ConversationModule::getTypers(const std::string& convId)
3226 : {
3227 9 : if (auto c = pimpl_->getConversation(convId)) {
3228 9 : std::lock_guard lk(c->mtx);
3229 9 : if (c->conversation)
3230 9 : return c->conversation->typers();
3231 18 : }
3232 0 : return nullptr;
3233 : }
3234 :
3235 : } // namespace jami
|