Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation_module.h"
19 :
20 : #include <algorithm>
21 : #include <fstream>
22 :
23 : #include <opendht/thread_pool.h>
24 :
25 : #include "account_const.h"
26 : #include "call.h"
27 : #include "client/ring_signal.h"
28 : #include "fileutils.h"
29 : #include "jamidht/account_manager.h"
30 : #include "jamidht/jamiaccount.h"
31 : #include "manager.h"
32 : #include "sip/sipcall.h"
33 : #include "vcard.h"
34 :
35 : namespace jami {
36 :
37 : using ConvInfoMap = std::map<std::string, ConvInfo>;
38 :
39 : struct PendingConversationFetch
40 : {
41 : bool ready {false};
42 : bool cloning {false};
43 : std::string deviceId {};
44 : std::string removeId {};
45 : std::map<std::string, std::string> preferences {};
46 : std::map<std::string, std::map<std::string, std::string>> status {};
47 : std::set<std::string> connectingTo {};
48 : std::shared_ptr<dhtnet::ChannelSocket> socket {};
49 : };
50 :
51 : constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s};
52 :
53 : struct SyncedConversation
54 : {
55 : std::mutex mtx;
56 : std::unique_ptr<asio::steady_timer> fallbackClone;
57 : std::chrono::seconds fallbackTimer {5s};
58 : ConvInfo info;
59 : std::unique_ptr<PendingConversationFetch> pending;
60 : std::shared_ptr<Conversation> conversation;
61 :
62 365 : SyncedConversation(const std::string& convId)
63 365 : : info {convId}
64 : {
65 365 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
66 365 : }
67 29 : SyncedConversation(const ConvInfo& info)
68 29 : : info {info}
69 : {
70 29 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
71 29 : }
72 :
73 3106 : bool startFetch(const std::string& deviceId, bool checkIfConv = false)
74 : {
75 : // conversation mtx must be locked
76 3106 : if (checkIfConv && conversation)
77 10 : return false; // Already a conversation
78 3096 : if (pending) {
79 1109 : if (pending->ready)
80 31 : return false; // Already doing stuff
81 : // if (pending->deviceId == deviceId)
82 : // return false; // Already fetching
83 1077 : if (pending->connectingTo.find(deviceId) != pending->connectingTo.end())
84 367 : return false; // Already connecting to this device
85 : } else {
86 1984 : pending = std::make_unique<PendingConversationFetch>();
87 1986 : pending->connectingTo.insert(deviceId);
88 1987 : return true;
89 : }
90 710 : return true;
91 : }
92 :
93 1883 : void stopFetch(const std::string& deviceId)
94 : {
95 : // conversation mtx must be locked
96 1883 : if (!pending)
97 100 : return;
98 1783 : pending->connectingTo.erase(deviceId);
99 1783 : if (pending->connectingTo.empty())
100 1294 : pending.reset();
101 : }
102 :
103 118 : std::vector<std::map<std::string, std::string>> getMembers(bool includeLeft,
104 : bool includeBanned) const
105 : {
106 : // conversation mtx must be locked
107 118 : if (conversation)
108 100 : return conversation->getMembers(true, includeLeft, includeBanned);
109 : // If we're cloning, we can return the initial members
110 18 : std::vector<std::map<std::string, std::string>> result;
111 18 : result.reserve(info.members.size());
112 55 : for (const auto& uri : info.members) {
113 74 : result.emplace_back(std::map<std::string, std::string> {{"uri", uri}});
114 : }
115 18 : return result;
116 18 : }
117 : };
118 :
119 : class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
120 : {
121 : public:
122 : Impl(std::shared_ptr<JamiAccount>&& account,
123 : std::shared_ptr<AccountManager>&& accountManager,
124 : NeedsSyncingCb&& needsSyncingCb,
125 : SengMsgCb&& sendMsgCb,
126 : NeedSocketCb&& onNeedSocket,
127 : NeedSocketCb&& onNeedSwarmSocket,
128 : OneToOneRecvCb&& oneToOneRecvCb);
129 :
130 : template<typename S, typename T>
131 119 : inline auto withConv(const S& convId, T&& cb) const
132 : {
133 238 : if (auto conv = getConversation(convId)) {
134 118 : std::lock_guard lk(conv->mtx);
135 118 : return cb(*conv);
136 118 : } else {
137 3 : JAMI_WARNING("Conversation {} not found", convId);
138 : }
139 1 : return decltype(cb(std::declval<SyncedConversation&>()))();
140 : }
141 : template<typename S, typename T>
142 2065 : inline auto withConversation(const S& convId, T&& cb)
143 : {
144 4129 : if (auto conv = getConversation(convId)) {
145 2058 : std::lock_guard lk(conv->mtx);
146 2058 : if (conv->conversation)
147 2033 : return cb(*conv->conversation);
148 2058 : } else {
149 21 : JAMI_WARNING("Conversation {} not found", convId);
150 : }
151 32 : return decltype(cb(std::declval<Conversation&>()))();
152 : }
153 :
154 : // Retrieving recent commits
155 : /**
156 : * Clone a conversation (initial) from device
157 : * @param deviceId
158 : * @param convId
159 : */
160 : void cloneConversation(const std::string& deviceId,
161 : const std::string& peer,
162 : const std::string& convId);
163 : void cloneConversation(const std::string& deviceId,
164 : const std::string& peer,
165 : const std::shared_ptr<SyncedConversation>& conv);
166 :
167 : /**
168 : * Pull remote device
169 : * @param peer Contact URI
170 : * @param deviceId Contact's device
171 : * @param conversationId
172 : * @param commitId (optional)
173 : */
174 : void fetchNewCommits(const std::string& peer,
175 : const std::string& deviceId,
176 : const std::string& conversationId,
177 : const std::string& commitId = "");
178 : /**
179 : * Handle events to receive new commits
180 : */
181 : void handlePendingConversation(const std::string& conversationId, const std::string& deviceId);
182 :
183 : // Requests
184 : std::optional<ConversationRequest> getRequest(const std::string& id) const;
185 :
186 : // Conversations
187 : /**
188 : * Get members
189 : * @param conversationId
190 : * @param includeBanned
191 : * @return a map of members with their role and details
192 : */
193 : std::vector<std::map<std::string, std::string>> getConversationMembers(
194 : const std::string& conversationId, bool includeBanned = false) const;
195 : void setConversationMembers(const std::string& convId, const std::set<std::string>& members);
196 :
197 : /**
198 : * Remove a repository and all files
199 : * @param convId
200 : * @param sync If we send an update to other account's devices
201 : * @param force True if ignore the removing flag
202 : */
203 : void removeRepository(const std::string& convId, bool sync, bool force = false);
204 : void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false);
205 : /**
206 : * Remove a conversation
207 : * @param conversationId
208 : */
209 : bool removeConversation(const std::string& conversationId);
210 : bool removeConversationImpl(SyncedConversation& conv);
211 :
212 : /**
213 : * Send a message notification to all members
214 : * @param conversation
215 : * @param commit
216 : * @param sync If we send an update to other account's devices
217 : * @param deviceId If we need to filter a specific device
218 : */
219 : void sendMessageNotification(const std::string& conversationId,
220 : bool sync,
221 : const std::string& commitId = "",
222 : const std::string& deviceId = "");
223 : void sendMessageNotification(Conversation& conversation,
224 : bool sync,
225 : const std::string& commitId = "",
226 : const std::string& deviceId = "");
227 :
228 : /**
229 : * @return if a convId is a valid conversation (repository cloned & usable)
230 : */
231 595 : bool isConversation(const std::string& convId) const
232 : {
233 595 : std::lock_guard lk(conversationsMtx_);
234 594 : auto c = conversations_.find(convId);
235 1190 : return c != conversations_.end() && c->second;
236 594 : }
237 :
238 2714 : void addConvInfo(const ConvInfo& info)
239 : {
240 2714 : std::lock_guard lk(convInfosMtx_);
241 2714 : convInfos_[info.id] = info;
242 2714 : saveConvInfos();
243 2714 : }
244 :
245 : std::string getOneToOneConversation(const std::string& uri) const noexcept;
246 :
247 : bool updateConvForContact(const std::string& uri,
248 : const std::string& oldConv,
249 : const std::string& newConv);
250 :
251 :
252 119 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const
253 : {
254 119 : std::lock_guard lk(conversationsMtx_);
255 119 : auto c = conversations_.find(convId);
256 238 : return c != conversations_.end() ? c->second : nullptr;
257 119 : }
258 32945 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId)
259 : {
260 32945 : std::lock_guard lk(conversationsMtx_);
261 32944 : auto c = conversations_.find(convId);
262 65888 : return c != conversations_.end() ? c->second : nullptr;
263 32943 : }
264 396 : std::shared_ptr<SyncedConversation> startConversation(const std::string& convId)
265 : {
266 396 : std::lock_guard lk(conversationsMtx_);
267 396 : auto& c = conversations_[convId];
268 396 : if (!c)
269 347 : c = std::make_shared<SyncedConversation>(convId);
270 792 : return c;
271 396 : }
272 111 : std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info)
273 : {
274 111 : std::lock_guard lk(conversationsMtx_);
275 111 : auto& c = conversations_[info.id];
276 111 : if (!c)
277 15 : c = std::make_shared<SyncedConversation>(info);
278 222 : return c;
279 111 : }
280 2790 : std::vector<std::shared_ptr<SyncedConversation>> getSyncedConversations() const
281 : {
282 2790 : std::lock_guard lk(conversationsMtx_);
283 2789 : std::vector<std::shared_ptr<SyncedConversation>> result;
284 2789 : result.reserve(conversations_.size());
285 3929 : for (const auto& [_, c] : conversations_)
286 1140 : result.emplace_back(c);
287 5579 : return result;
288 2789 : }
289 438 : std::vector<std::shared_ptr<Conversation>> getConversations() const
290 : {
291 438 : std::lock_guard lk(conversationsMtx_);
292 438 : std::vector<std::shared_ptr<Conversation>> result;
293 438 : result.reserve(conversations_.size());
294 697 : for (const auto& [_, sc] : conversations_) {
295 260 : if (auto c = sc->conversation)
296 260 : result.emplace_back(std::move(c));
297 : }
298 876 : return result;
299 438 : }
300 :
301 : // Message send/load
302 : void sendMessage(const std::string& conversationId,
303 : Json::Value&& value,
304 : const std::string& replyTo = "",
305 : bool announce = true,
306 : OnCommitCb&& onCommit = {},
307 : OnDoneCb&& cb = {});
308 :
309 : void sendMessage(const std::string& conversationId,
310 : std::string message,
311 : const std::string& replyTo = "",
312 : const std::string& type = "text/plain",
313 : bool announce = true,
314 : OnCommitCb&& onCommit = {},
315 : OnDoneCb&& cb = {});
316 :
317 : void editMessage(const std::string& conversationId,
318 : const std::string& newBody,
319 : const std::string& editedId);
320 :
321 : void bootstrapCb(std::string convId);
322 :
323 : // The following methods modify what is stored on the disk
324 : /**
325 : * @note convInfosMtx_ should be locked
326 : */
327 3403 : void saveConvInfos() const { ConversationModule::saveConvInfos(accountId_, convInfos_); }
328 : /**
329 : * @note conversationsRequestsMtx_ should be locked
330 : */
331 494 : void saveConvRequests() const
332 : {
333 494 : ConversationModule::saveConvRequests(accountId_, conversationsRequests_);
334 494 : }
335 : void declineOtherConversationWith(const std::string& uri) noexcept;
336 224 : bool addConversationRequest(const std::string& id, const ConversationRequest& req)
337 : {
338 : // conversationsRequestsMtx_ MUST BE LOCKED
339 224 : if (isConversation(id))
340 0 : return false;
341 224 : auto it = conversationsRequests_.find(id);
342 224 : if (it != conversationsRequests_.end()) {
343 : // We only remove requests (if accepted) or change .declined
344 24 : if (!req.declined)
345 20 : return false;
346 200 : } else if (req.isOneToOne()) {
347 : // Check that we're not adding a second one to one trust request
348 : // NOTE: If a new one to one request is received, we can decline the previous one.
349 67 : declineOtherConversationWith(req.from);
350 : }
351 612 : JAMI_DEBUG("Adding conversation request from {} ({})", req.from, id);
352 204 : conversationsRequests_[id] = req;
353 204 : saveConvRequests();
354 204 : return true;
355 : }
356 280 : void rmConversationRequest(const std::string& id)
357 : {
358 : // conversationsRequestsMtx_ MUST BE LOCKED
359 280 : auto it = conversationsRequests_.find(id);
360 280 : if (it != conversationsRequests_.end()) {
361 175 : auto& md = syncingMetadatas_[id];
362 175 : md = it->second.metadatas;
363 175 : md["syncing"] = "true";
364 175 : md["created"] = std::to_string(it->second.received);
365 : }
366 280 : saveMetadatas();
367 280 : conversationsRequests_.erase(id);
368 280 : saveConvRequests();
369 280 : }
370 :
371 : std::weak_ptr<JamiAccount> account_;
372 : std::shared_ptr<AccountManager> accountManager_;
373 : NeedsSyncingCb needsSyncingCb_;
374 : SengMsgCb sendMsgCb_;
375 : NeedSocketCb onNeedSocket_;
376 : NeedSocketCb onNeedSwarmSocket_;
377 : OneToOneRecvCb oneToOneRecvCb_;
378 :
379 : std::string accountId_ {};
380 : std::string deviceId_ {};
381 : std::string username_ {};
382 :
383 : // Requests
384 : mutable std::mutex conversationsRequestsMtx_;
385 : std::map<std::string, ConversationRequest> conversationsRequests_;
386 :
387 : // Conversations
388 : mutable std::mutex conversationsMtx_ {};
389 : std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_;
390 :
391 : // The following information are stored on the disk
392 : mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed
393 : std::map<std::string, ConvInfo> convInfos_;
394 :
395 : // When sending a new message, we need to send the notification to some peers of the
396 : // conversation However, the conversation may be not bootstraped, so the list will be empty.
397 : // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync
398 : // with.
399 : std::mutex notSyncedNotificationMtx_;
400 : std::map<std::string, std::string> notSyncedNotification_;
401 :
402 3933 : std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
403 :
404 : // Replay conversations (after erasing/re-adding)
405 : std::mutex replayMtx_;
406 : std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_;
407 : std::map<std::string, uint64_t> refreshMessage;
408 : std::atomic_int syncCnt {0};
409 :
410 : #ifdef LIBJAMI_TESTABLE
411 : std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_;
412 : #endif
413 :
414 : void fixStructures(
415 : std::shared_ptr<JamiAccount> account,
416 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
417 : const std::set<std::string>& toRm);
418 :
419 : void cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
420 : const std::string& deviceId,
421 : const std::string& oldConvId = "");
422 : void bootstrap(const std::string& convId);
423 : void fallbackClone(const asio::error_code& ec, const std::string& conversationId);
424 : void cloneConversationFrom(const std::string& conversationId,
425 : const std::string& uri,
426 : const std::string& oldConvId = "");
427 :
428 : // While syncing, we do not want to lose metadata (avatar/title and mode)
429 : std::map<std::string, std::map<std::string, std::string>> syncingMetadatas_;
430 480 : void saveMetadatas()
431 : {
432 480 : auto path = fileutils::get_data_dir() / accountId_;
433 960 : std::ofstream file(path / "syncingMetadatas", std::ios::trunc | std::ios::binary);
434 480 : msgpack::pack(file, syncingMetadatas_);
435 480 : }
436 :
437 677 : void loadMetadatas()
438 : {
439 : try {
440 : // read file
441 677 : auto path = fileutils::get_data_dir() / accountId_;
442 1354 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
443 1354 : auto file = fileutils::loadFile("syncingMetadatas", path);
444 : // load values
445 0 : msgpack::unpacked result;
446 0 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
447 0 : result.get().convert(syncingMetadatas_);
448 2031 : } catch (const std::exception& e) {
449 2031 : JAMI_WARNING("[ConversationModule] error loading syncingMetadatas_: {}", e.what());
450 677 : }
451 677 : }
452 : };
453 :
454 677 : ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account,
455 : std::shared_ptr<AccountManager>&& accountManager,
456 : NeedsSyncingCb&& needsSyncingCb,
457 : SengMsgCb&& sendMsgCb,
458 : NeedSocketCb&& onNeedSocket,
459 : NeedSocketCb&& onNeedSwarmSocket,
460 677 : OneToOneRecvCb&& oneToOneRecvCb)
461 677 : : account_(account)
462 677 : , accountManager_(accountManager)
463 677 : , needsSyncingCb_(needsSyncingCb)
464 677 : , sendMsgCb_(sendMsgCb)
465 677 : , onNeedSocket_(onNeedSocket)
466 677 : , onNeedSwarmSocket_(onNeedSwarmSocket)
467 677 : , oneToOneRecvCb_(oneToOneRecvCb)
468 1354 : , accountId_(account->getAccountID())
469 : {
470 677 : if (auto accm = account->accountManager())
471 677 : if (const auto* info = accm->getInfo()) {
472 677 : deviceId_ = info->deviceId;
473 677 : username_ = info->accountId;
474 677 : }
475 677 : conversationsRequests_ = convRequests(accountId_);
476 677 : loadMetadatas();
477 677 : }
478 :
479 : void
480 23 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
481 : const std::string& peerUri,
482 : const std::string& convId)
483 : {
484 69 : JAMI_DEBUG("[Account {}] Clone conversation on device {}", accountId_, deviceId);
485 :
486 23 : auto conv = startConversation(convId);
487 23 : std::unique_lock lk(conv->mtx);
488 23 : cloneConversation(deviceId, peerUri, conv);
489 23 : }
490 :
491 : void
492 107 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
493 : const std::string& peerUri,
494 : const std::shared_ptr<SyncedConversation>& conv)
495 : {
496 : // conv->mtx must be locked
497 107 : if (!conv->conversation) {
498 : // Note: here we don't return and connect to all members
499 : // the first that will successfully connect will be used for
500 : // cloning.
501 : // This avoid the case when we try to clone from convInfos + sync message
502 : // at the same time.
503 107 : if (!conv->startFetch(deviceId, true)) {
504 126 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conv->info.id);
505 42 : addConvInfo(conv->info);
506 42 : return;
507 : }
508 130 : onNeedSocket_(
509 65 : conv->info.id,
510 : deviceId,
511 65 : [w = weak(), conv, deviceId](const auto& channel) {
512 65 : std::lock_guard lk(conv->mtx);
513 65 : if (conv->pending && !conv->pending->ready) {
514 64 : if (channel) {
515 64 : conv->pending->ready = true;
516 64 : conv->pending->deviceId = channel->deviceId().toString();
517 64 : conv->pending->socket = channel;
518 64 : if (!conv->pending->cloning) {
519 64 : conv->pending->cloning = true;
520 192 : dht::ThreadPool::io().run([w,
521 64 : convId = conv->info.id,
522 64 : deviceId = conv->pending->deviceId]() {
523 128 : if (auto sthis = w.lock())
524 64 : sthis->handlePendingConversation(convId, deviceId);
525 : });
526 : }
527 64 : return true;
528 : } else {
529 0 : conv->stopFetch(deviceId);
530 : }
531 : }
532 1 : return false;
533 65 : },
534 : MIME_TYPE_GIT);
535 :
536 195 : JAMI_LOG("[Account {}] New conversation detected: {}. Ask device {} to clone it",
537 : accountId_,
538 : conv->info.id,
539 : deviceId);
540 65 : conv->info.members.emplace(username_);
541 65 : conv->info.members.emplace(peerUri);
542 65 : addConvInfo(conv->info);
543 : } else {
544 0 : JAMI_DEBUG("[Account {}] Already have conversation {}", accountId_, conv->info.id);
545 : }
546 : }
547 :
548 : void
549 15692 : ConversationModule::Impl::fetchNewCommits(const std::string& peer,
550 : const std::string& deviceId,
551 : const std::string& conversationId,
552 : const std::string& commitId)
553 : {
554 : {
555 15692 : std::lock_guard lk(convInfosMtx_);
556 15693 : auto itConv = convInfos_.find(conversationId);
557 15691 : if (itConv != convInfos_.end() && itConv->second.isRemoved()) {
558 : // If the conversation is removed and we receives a new commit,
559 : // it means that the contact was removed but not banned.
560 : // If he wants a new conversation, they must removes/re-add the contact who declined.
561 12 : JAMI_WARNING("[Account {:s}] Received a commit for {}, but conversation is removed",
562 : accountId_,
563 : conversationId);
564 4 : return;
565 : }
566 15690 : }
567 15688 : std::optional<ConversationRequest> oldReq;
568 : {
569 15688 : std::lock_guard lk(conversationsRequestsMtx_);
570 15690 : oldReq = getRequest(conversationId);
571 15685 : if (oldReq != std::nullopt && oldReq->declined) {
572 3 : JAMI_DEBUG("[Account {}] Received a request for a conversation already declined.",
573 : accountId_);
574 1 : return;
575 : }
576 15685 : }
577 47059 : JAMI_DEBUG("[Account {:s}] fetch commits from {:s}, for {:s}, commit {:s}",
578 : accountId_,
579 : peer,
580 : conversationId,
581 : commitId);
582 :
583 15689 : auto conv = getConversation(conversationId);
584 15689 : if (!conv) {
585 622 : if (oldReq == std::nullopt) {
586 : // We didn't find a conversation or a request with the given ID.
587 : // This suggests that someone tried to send us an invitation but
588 : // that we didn't receive it, so we ask for a new one.
589 438 : JAMI_WARNING("[Account {}] Unable to find conversation {}, ask for an invite",
590 : accountId_,
591 : conversationId);
592 146 : sendMsgCb_(peer,
593 : {},
594 438 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
595 : 0);
596 : }
597 622 : return;
598 : }
599 15067 : std::unique_lock lk(conv->mtx);
600 :
601 15067 : if (conv->conversation) {
602 : // Check if we already have the commit
603 14995 : if (not commitId.empty() && conv->conversation->getCommit(commitId) != std::nullopt) {
604 12500 : return;
605 : }
606 2815 : if (conv->conversation->isRemoving()) {
607 0 : JAMI_WARNING("[Account {}] Conversation {} is being removed",
608 : accountId_,
609 : conversationId);
610 0 : return;
611 : }
612 2814 : if (!conv->conversation->isMember(peer, true)) {
613 9 : JAMI_WARNING("[Account {}] {} is not a member of {}", accountId_, peer, conversationId);
614 3 : return;
615 : }
616 2813 : if (conv->conversation->isBanned(deviceId)) {
617 0 : JAMI_WARNING("[Account {}] {} is a banned device in conversation {}",
618 : accountId_,
619 : deviceId,
620 : conversationId);
621 0 : return;
622 : }
623 :
624 : // Retrieve current last message
625 2813 : auto lastMessageId = conv->conversation->lastCommitId();
626 2812 : if (lastMessageId.empty()) {
627 0 : JAMI_ERROR("[Account {}] No message detected. This is a bug", accountId_);
628 0 : return;
629 : }
630 :
631 2811 : if (!conv->startFetch(deviceId)) {
632 954 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
633 318 : return;
634 : }
635 :
636 2493 : syncCnt.fetch_add(1);
637 12469 : onNeedSocket_(
638 : conversationId,
639 : deviceId,
640 4295 : [w = weak(),
641 : conv,
642 2494 : conversationId = std::move(conversationId),
643 2495 : peer = std::move(peer),
644 2494 : deviceId = std::move(deviceId),
645 2495 : commitId = std::move(commitId)](const auto& channel) {
646 4295 : auto sthis = w.lock();
647 4296 : auto acc = sthis ? sthis->account_.lock() : nullptr;
648 4296 : std::unique_lock lk(conv->mtx);
649 4294 : auto conversation = conv->conversation;
650 4296 : if (!channel || !acc || !conversation) {
651 1874 : conv->stopFetch(deviceId);
652 1874 : if (sthis)
653 1874 : sthis->syncCnt.fetch_sub(1);
654 1874 : return false;
655 : }
656 2421 : conversation->addGitSocket(channel->deviceId(), channel);
657 2421 : lk.unlock();
658 14529 : conversation->sync(
659 2421 : peer,
660 2421 : deviceId,
661 2422 : [w,
662 2422 : conv,
663 2421 : conversationId = std::move(conversationId),
664 2422 : peer = std::move(peer),
665 2422 : deviceId = std::move(deviceId),
666 2421 : commitId = std::move(commitId)](bool ok) {
667 2422 : auto shared = w.lock();
668 2422 : if (!shared)
669 0 : return;
670 2421 : if (!ok) {
671 3867 : JAMI_WARNING("[Account {}] Unable to fetch new commit from "
672 : "{} for {}, other "
673 : "peer may be disconnected",
674 : shared->accountId_,
675 : deviceId,
676 : conversationId);
677 3867 : JAMI_LOG("[Account {}] Relaunch sync with {} for {}",
678 : shared->accountId_,
679 : deviceId,
680 : conversationId);
681 : }
682 :
683 : {
684 2421 : std::lock_guard lk(conv->mtx);
685 2422 : conv->pending.reset();
686 : // Notify peers that a new commit is there (DRT)
687 2420 : if (not commitId.empty() && ok) {
688 2115 : shared->sendMessageNotification(*conv->conversation,
689 : false,
690 1058 : commitId,
691 1058 : deviceId);
692 : }
693 2422 : }
694 4844 : if (shared->syncCnt.fetch_sub(1) == 1) {
695 187 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(shared->accountId_);
696 : }
697 2422 : },
698 2421 : commitId);
699 2422 : return true;
700 4296 : },
701 : "");
702 2813 : } else {
703 72 : if (oldReq != std::nullopt)
704 0 : return;
705 72 : if (conv->pending)
706 31 : return;
707 41 : bool clone = !conv->info.isRemoved();
708 41 : if (clone) {
709 40 : cloneConversation(deviceId, peer, conv);
710 40 : return;
711 : }
712 1 : lk.unlock();
713 3 : JAMI_WARNING("[Account {}] Unable to find conversation {}, ask for an invite",
714 : accountId_,
715 : conversationId);
716 1 : sendMsgCb_(peer,
717 : {},
718 3 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
719 : 0);
720 : }
721 41453 : }
722 :
723 : // Clone and store conversation
724 : void
725 190 : ConversationModule::Impl::handlePendingConversation(const std::string& conversationId,
726 : const std::string& deviceId)
727 : {
728 190 : auto acc = account_.lock();
729 190 : if (!acc)
730 0 : return;
731 190 : std::vector<DeviceId> kd;
732 : {
733 190 : std::unique_lock lk(conversationsMtx_);
734 190 : const auto& devices = accountManager_->getKnownDevices();
735 190 : kd.reserve(devices.size());
736 1419 : for (const auto& [id, _] : devices)
737 1229 : kd.emplace_back(id);
738 190 : }
739 190 : auto conv = getConversation(conversationId);
740 190 : if (!conv)
741 0 : return;
742 190 : std::unique_lock lk(conv->mtx, std::defer_lock);
743 374 : auto erasePending = [&] {
744 374 : std::string toRm;
745 374 : if (conv->pending && !conv->pending->removeId.empty())
746 7 : toRm = std::move(conv->pending->removeId);
747 374 : conv->pending.reset();
748 374 : lk.unlock();
749 374 : if (!toRm.empty())
750 7 : removeConversation(toRm);
751 374 : };
752 : try {
753 190 : auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
754 184 : conversation->onMembersChanged([w=weak_from_this(), conversationId](const auto& members) {
755 : // Delay in another thread to avoid deadlocks
756 2837 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
757 2838 : if (auto sthis = w.lock())
758 1419 : sthis->setConversationMembers(conversationId, members);
759 : });
760 1419 : });
761 184 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
762 501 : auto msg = std::make_shared<SyncMsg>();
763 1002 : msg->ms = {{conversationId, status}};
764 501 : needsSyncingCb_(std::move(msg));
765 501 : });
766 184 : conversation->onNeedSocket(onNeedSwarmSocket_);
767 184 : if (!conversation->isMember(username_, true)) {
768 0 : JAMI_ERR("Conversation cloned but does not seems to be a valid member");
769 0 : conversation->erase();
770 0 : lk.lock();
771 0 : erasePending();
772 0 : return;
773 : }
774 :
775 : // Make sure that the list of members stored in convInfos_ matches the
776 : // one from the conversation's repository.
777 : // (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1026)
778 184 : setConversationMembers(conversationId, conversation->memberUris("", {}));
779 :
780 184 : lk.lock();
781 :
782 184 : if (conv->pending && conv->pending->socket)
783 184 : conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket));
784 184 : auto removeRepo = false;
785 : // Note: a removeContact while cloning. In this case, the conversation
786 : // must not be announced and removed.
787 184 : if (conv->info.isRemoved())
788 0 : removeRepo = true;
789 184 : std::map<std::string, std::string> preferences;
790 184 : std::map<std::string, std::map<std::string, std::string>> status;
791 184 : if (conv->pending) {
792 184 : preferences = std::move(conv->pending->preferences);
793 184 : status = std::move(conv->pending->status);
794 : }
795 184 : conv->conversation = conversation;
796 184 : if (removeRepo) {
797 0 : removeRepositoryImpl(*conv, false, true);
798 0 : erasePending();
799 0 : return;
800 : }
801 :
802 184 : auto commitId = conversation->join();
803 184 : std::vector<std::map<std::string, std::string>> messages;
804 : {
805 184 : std::lock_guard lk(replayMtx_);
806 184 : auto replayIt = replay_.find(conversationId);
807 184 : if (replayIt != replay_.end()) {
808 7 : messages = std::move(replayIt->second);
809 7 : replay_.erase(replayIt);
810 : }
811 184 : }
812 184 : if (!commitId.empty())
813 150 : sendMessageNotification(*conversation, false, commitId);
814 184 : erasePending(); // Will unlock
815 :
816 : #ifdef LIBJAMI_TESTABLE
817 184 : conversation->onBootstrapStatus(bootstrapCbTest_);
818 : #endif // LIBJAMI_TESTABLE
819 368 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
820 184 : this,
821 368 : conversation->id()),
822 : kd);
823 :
824 184 : if (!preferences.empty())
825 1 : conversation->updatePreferences(preferences);
826 184 : if (!status.empty())
827 23 : conversation->updateMessageStatus(status);
828 184 : syncingMetadatas_.erase(conversationId);
829 184 : saveMetadatas();
830 :
831 : // Inform user that the conversation is ready
832 184 : emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
833 184 : needsSyncingCb_({});
834 184 : std::vector<Json::Value> values;
835 184 : values.reserve(messages.size());
836 193 : for (const auto& message : messages) {
837 : // For now, only replay text messages.
838 : // File transfers will need more logic, and don't care about calls for now.
839 9 : if (message.at("type") == "text/plain" && message.at("author") == username_) {
840 2 : Json::Value json;
841 2 : json["body"] = message.at("body");
842 2 : json["type"] = "text/plain";
843 2 : values.emplace_back(std::move(json));
844 2 : }
845 : }
846 184 : if (!values.empty())
847 1 : conversation->sendMessages(std::move(values),
848 1 : [w = weak(), conversationId](const auto& commits) {
849 1 : auto shared = w.lock();
850 1 : if (shared and not commits.empty())
851 3 : shared->sendMessageNotification(conversationId,
852 : true,
853 2 : *commits.rbegin());
854 1 : });
855 : // Download members profile on first sync
856 184 : auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE;
857 184 : auto askForProfile = isOneOne;
858 184 : if (!isOneOne) {
859 : // If not 1:1 only download profiles from self (to avoid non checked files)
860 125 : auto cert = acc->certStore().getCertificate(deviceId);
861 125 : askForProfile = cert && cert->issuer
862 250 : && cert->issuer->getId().toString() == username_;
863 125 : }
864 184 : if (askForProfile) {
865 130 : for (const auto& member : conversation->memberUris(username_)) {
866 58 : acc->askForProfile(conversationId, deviceId, member);
867 72 : }
868 : }
869 190 : } catch (const std::exception& e) {
870 18 : JAMI_WARNING("Something went wrong when cloning conversation: {}. Re-clone in {}s", e.what(), conv->fallbackTimer.count());
871 6 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
872 6 : conv->fallbackTimer *= 2;
873 6 : if (conv->fallbackTimer > MAX_FALLBACK)
874 0 : conv->fallbackTimer = MAX_FALLBACK;
875 12 : conv->fallbackClone->async_wait(
876 12 : std::bind(&ConversationModule::Impl::fallbackClone,
877 12 : shared_from_this(),
878 : std::placeholders::_1,
879 : conversationId));
880 :
881 6 : }
882 190 : lk.lock();
883 190 : erasePending();
884 190 : }
885 :
886 : std::optional<ConversationRequest>
887 16105 : ConversationModule::Impl::getRequest(const std::string& id) const
888 : {
889 : // ConversationsRequestsMtx MUST BE LOCKED
890 16105 : auto it = conversationsRequests_.find(id);
891 16104 : if (it != conversationsRequests_.end())
892 734 : return it->second;
893 15370 : return std::nullopt;
894 : }
895 :
896 : std::string
897 702 : ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
898 : {
899 702 : auto details = accountManager_->getContactDetails(uri);
900 702 : auto itRemoved = details.find("removed");
901 : // If contact is removed there is no conversation
902 702 : if (itRemoved != details.end() && itRemoved->second != "0") {
903 53 : auto itBanned = details.find("banned");
904 : // If banned, conversation is still on disk
905 53 : if (itBanned == details.end() || itBanned->second == "0") {
906 : // Check if contact is removed
907 45 : auto itAdded = details.find("added");
908 45 : if (std::stoi(itRemoved->second) > std::stoi(itAdded->second))
909 39 : return {};
910 : }
911 : }
912 663 : auto it = details.find(libjami::Account::TrustRequest::CONVERSATIONID);
913 663 : if (it != details.end())
914 337 : return it->second;
915 326 : return {};
916 702 : }
917 :
918 : bool
919 35 : ConversationModule::Impl::updateConvForContact(const std::string& uri,
920 : const std::string& oldConv,
921 : const std::string& newConv)
922 : {
923 35 : if (newConv != oldConv) {
924 35 : auto conversation = getOneToOneConversation(uri);
925 35 : if (conversation != oldConv) {
926 63 : JAMI_DEBUG("Old conversation is not found in details {} - found: {}",
927 : oldConv,
928 : conversation);
929 21 : return false;
930 : }
931 14 : accountManager_->updateContactConversation(uri, newConv);
932 14 : return true;
933 35 : }
934 0 : return false;
935 : }
936 :
937 : void
938 67 : ConversationModule::Impl::declineOtherConversationWith(const std::string& uri) noexcept
939 : {
940 : // conversationsRequestsMtx_ MUST BE LOCKED
941 68 : for (auto& [id, request] : conversationsRequests_) {
942 1 : if (request.declined)
943 0 : continue; // Ignore already declined requests
944 1 : if (request.isOneToOne() && request.from == uri) {
945 3 : JAMI_WARNING("Decline conversation request ({}) from {}", id, uri);
946 1 : request.declined = std::time(nullptr);
947 1 : syncingMetadatas_.erase(id);
948 1 : saveMetadatas();
949 1 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(accountId_, id);
950 : }
951 : }
952 67 : }
953 :
954 : std::vector<std::map<std::string, std::string>>
955 101 : ConversationModule::Impl::getConversationMembers(const std::string& conversationId,
956 : bool includeBanned) const
957 : {
958 : return withConv(conversationId,
959 202 : [&](const auto& conv) { return conv.getMembers(true, includeBanned); });
960 : }
961 :
962 : void
963 14 : ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force)
964 : {
965 14 : auto conv = getConversation(conversationId);
966 14 : if (!conv)
967 0 : return;
968 14 : std::unique_lock lk(conv->mtx);
969 14 : removeRepositoryImpl(*conv, sync, force);
970 14 : }
971 :
972 : void
973 32 : ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force)
974 : {
975 32 : if (conv.conversation && (force || conv.conversation->isRemoving())) {
976 : // Stop fetch!
977 24 : conv.pending.reset();
978 :
979 72 : JAMI_LOG("Remove conversation: {}", conv.info.id);
980 : try {
981 24 : if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
982 51 : for (const auto& member : conv.conversation->getInitialMembers()) {
983 34 : if (member != username_) {
984 : // Note: this can happen while re-adding a contact.
985 : // In this case, check that we are removing the linked conversation.
986 17 : if (conv.info.id == getOneToOneConversation(member)) {
987 0 : accountManager_->removeContactConversation(member);
988 : }
989 : }
990 17 : }
991 : }
992 0 : } catch (const std::exception& e) {
993 0 : JAMI_ERR() << e.what();
994 0 : }
995 24 : conv.conversation->erase();
996 24 : conv.conversation.reset();
997 :
998 24 : if (!sync)
999 2 : return;
1000 :
1001 22 : conv.info.erased = std::time(nullptr);
1002 22 : needsSyncingCb_({});
1003 22 : addConvInfo(conv.info);
1004 : }
1005 : }
1006 :
1007 : bool
1008 18 : ConversationModule::Impl::removeConversation(const std::string& conversationId)
1009 : {
1010 35 : return withConv(conversationId, [this](auto& conv) { return removeConversationImpl(conv); });
1011 : }
1012 :
1013 : bool
1014 17 : ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv)
1015 : {
1016 17 : auto members = conv.getMembers(false, false);
1017 17 : auto isSyncing = !conv.conversation;
1018 17 : auto hasMembers = !isSyncing // If syncing there is no member to inform
1019 32 : && std::find_if(members.begin(),
1020 : members.end(),
1021 17 : [&](const auto& member) {
1022 17 : return member.at("uri") == username_;
1023 : })
1024 32 : != members.end() // We must be still a member
1025 33 : && members.size() != 1; // If there is only ourself
1026 17 : conv.info.removed = std::time(nullptr);
1027 17 : if (isSyncing)
1028 1 : conv.info.erased = std::time(nullptr);
1029 : // Sync now, because it can take some time to really removes the datas
1030 17 : needsSyncingCb_({});
1031 17 : addConvInfo(conv.info);
1032 17 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id);
1033 17 : if (isSyncing)
1034 1 : return true;
1035 16 : if (conv.conversation->mode() != ConversationMode::ONE_TO_ONE) {
1036 : // For one to one, we do not notify the leave. The other can still generate request
1037 : // and this is managed by the banned part. If we re-accept, the old conversation will be
1038 : // retrieved
1039 6 : auto commitId = conv.conversation->leave();
1040 6 : if (hasMembers) {
1041 3 : JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id);
1042 : // Commit that we left
1043 1 : if (!commitId.empty()) {
1044 : // Do not sync as it's synched by convInfos
1045 1 : sendMessageNotification(*conv.conversation, false, commitId);
1046 : } else {
1047 0 : JAMI_ERROR("Failed to send message to conversation {}", conv.info.id);
1048 : }
1049 : // In this case, we wait that another peer sync the conversation
1050 : // to definitely remove it from the device. This is to inform the
1051 : // peer that we left the conversation and never want to receive
1052 : // any messages
1053 1 : return true;
1054 : }
1055 6 : } else {
1056 30 : for (const auto& m : members)
1057 20 : if (username_ != m.at("uri"))
1058 10 : updateConvForContact(m.at("uri"), conv.info.id, "");
1059 : }
1060 : // Else we are the last member, so we can remove
1061 15 : removeRepositoryImpl(conv, true);
1062 15 : return true;
1063 17 : }
1064 :
1065 : void
1066 623 : ConversationModule::Impl::sendMessageNotification(const std::string& conversationId,
1067 : bool sync,
1068 : const std::string& commitId,
1069 : const std::string& deviceId)
1070 : {
1071 623 : if (auto conv = getConversation(conversationId)) {
1072 623 : std::lock_guard lk(conv->mtx);
1073 623 : if (conv->conversation)
1074 621 : sendMessageNotification(*conv->conversation, sync, commitId, deviceId);
1075 1246 : }
1076 623 : }
1077 :
1078 : void
1079 1964 : ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
1080 : bool sync,
1081 : const std::string& commitId,
1082 : const std::string& deviceId)
1083 : {
1084 1964 : auto acc = account_.lock();
1085 1963 : if (!acc)
1086 0 : return;
1087 1963 : Json::Value message;
1088 1965 : auto commit = commitId == "" ? conversation.lastCommitId() : commitId;
1089 1965 : message["id"] = conversation.id();
1090 1965 : message["commit"] = commit;
1091 1965 : message["deviceId"] = deviceId_;
1092 1965 : Json::StreamWriterBuilder builder;
1093 1965 : const auto text = Json::writeString(builder, message);
1094 :
1095 : // Send message notification will announce the new commit in 3 steps.
1096 :
1097 : // First, because our account can have several devices, announce to other devices
1098 1965 : if (sync) {
1099 : // Announce to our devices
1100 756 : refreshMessage[username_] = sendMsgCb_(username_,
1101 : {},
1102 3024 : std::map<std::string, std::string> {
1103 1512 : {MIME_TYPE_GIT, text}},
1104 756 : refreshMessage[username_]);
1105 : }
1106 :
1107 : // Then, we announce to 2 random members in the conversation that aren't in the DRT
1108 : // This allow new devices without the ability to sync to their other devices to sync with us.
1109 : // Or they can also use an old backup.
1110 1965 : std::vector<std::string> nonConnectedMembers;
1111 1965 : std::vector<NodeId> devices;
1112 : {
1113 1965 : std::lock_guard lk(notSyncedNotificationMtx_);
1114 1965 : devices = conversation.peersToSyncWith();
1115 3929 : auto members = conversation.memberUris(username_, {MemberRole::BANNED});
1116 1964 : std::vector<std::string> connectedMembers;
1117 : // print all members
1118 19461 : for (const auto& device : devices) {
1119 17495 : auto cert = acc->certStore().getCertificate(device.toString());
1120 17500 : if (cert && cert->issuer)
1121 17499 : connectedMembers.emplace_back(cert->issuer->getId().toString());
1122 17499 : }
1123 1964 : std::sort(std::begin(connectedMembers), std::end(connectedMembers));
1124 1965 : std::set_difference(members.begin(),
1125 : members.end(),
1126 : connectedMembers.begin(),
1127 : connectedMembers.end(),
1128 : std::inserter(nonConnectedMembers, nonConnectedMembers.begin()));
1129 1965 : std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand);
1130 1965 : if (nonConnectedMembers.size() > 2)
1131 150 : nonConnectedMembers.resize(2);
1132 1965 : if (!conversation.isBootstraped()) {
1133 975 : JAMI_DEBUG("[Conversation {}] Not yet bootstraped, save notification",
1134 : conversation.id());
1135 : // Because we can get some git channels but not bootstraped, we should keep this
1136 : // to refresh when bootstraped.
1137 325 : notSyncedNotification_[conversation.id()] = commit;
1138 : }
1139 1965 : }
1140 :
1141 2927 : for (const auto& member : nonConnectedMembers) {
1142 962 : refreshMessage[member] = sendMsgCb_(member,
1143 : {},
1144 3848 : std::map<std::string, std::string> {
1145 1924 : {MIME_TYPE_GIT, text}},
1146 962 : refreshMessage[member]);
1147 : }
1148 :
1149 : // Finally we send to devices that the DRT choose.
1150 19465 : for (const auto& device : devices) {
1151 17498 : auto deviceIdStr = device.toString();
1152 17500 : auto memberUri = conversation.uriFromDevice(deviceIdStr);
1153 17499 : if (memberUri.empty() || deviceIdStr == deviceId)
1154 1054 : continue;
1155 16445 : refreshMessage[deviceIdStr] = sendMsgCb_(memberUri,
1156 : device,
1157 65781 : std::map<std::string, std::string> {
1158 32890 : {MIME_TYPE_GIT, text}},
1159 16445 : refreshMessage[deviceIdStr]);
1160 18553 : }
1161 1964 : }
1162 :
1163 : void
1164 92 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1165 : std::string message,
1166 : const std::string& replyTo,
1167 : const std::string& type,
1168 : bool announce,
1169 : OnCommitCb&& onCommit,
1170 : OnDoneCb&& cb)
1171 : {
1172 92 : Json::Value json;
1173 92 : json["body"] = std::move(message);
1174 92 : json["type"] = type;
1175 92 : sendMessage(conversationId,
1176 92 : std::move(json),
1177 : replyTo,
1178 : announce,
1179 92 : std::move(onCommit),
1180 92 : std::move(cb));
1181 92 : }
1182 :
1183 : void
1184 114 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1185 : Json::Value&& value,
1186 : const std::string& replyTo,
1187 : bool announce,
1188 : OnCommitCb&& onCommit,
1189 : OnDoneCb&& cb)
1190 : {
1191 114 : if (auto conv = getConversation(conversationId)) {
1192 114 : std::lock_guard lk(conv->mtx);
1193 114 : if (conv->conversation)
1194 114 : conv->conversation
1195 342 : ->sendMessage(std::move(value),
1196 : replyTo,
1197 114 : std::move(onCommit),
1198 113 : [this,
1199 : conversationId,
1200 : announce,
1201 114 : cb = std::move(cb)](bool ok, const std::string& commitId) {
1202 113 : if (cb)
1203 3 : cb(ok, commitId);
1204 113 : if (!announce)
1205 0 : return;
1206 113 : if (ok)
1207 112 : sendMessageNotification(conversationId, true, commitId);
1208 : else
1209 1 : JAMI_ERR("Failed to send message to conversation %s",
1210 : conversationId.c_str());
1211 : });
1212 228 : }
1213 114 : }
1214 :
1215 : void
1216 7 : ConversationModule::Impl::editMessage(const std::string& conversationId,
1217 : const std::string& newBody,
1218 : const std::string& editedId)
1219 : {
1220 : // Check that editedId is a valid commit, from ourself and plain/text
1221 7 : auto validCommit = false;
1222 7 : std::string type, tid;
1223 7 : if (auto conv = getConversation(conversationId)) {
1224 7 : std::lock_guard lk(conv->mtx);
1225 7 : if (conv->conversation) {
1226 7 : auto commit = conv->conversation->getCommit(editedId);
1227 7 : if (commit != std::nullopt) {
1228 6 : type = commit->at("type");
1229 6 : if (type == "application/data-transfer+json")
1230 1 : tid = commit->at("tid");
1231 18 : validCommit = commit->at("author") == username_
1232 18 : && (type == "text/plain" || type == "application/data-transfer+json");
1233 : }
1234 7 : }
1235 14 : }
1236 7 : if (!validCommit) {
1237 6 : JAMI_ERROR("Unable to edit commit {:s}", editedId);
1238 2 : return;
1239 : }
1240 : // Commit message edition
1241 5 : Json::Value json;
1242 5 : if (type == "application/data-transfer+json") {
1243 1 : json["tid"] = "";
1244 : // Remove file!
1245 2 : auto path = fileutils::get_data_dir() / accountId_
1246 4 : / "conversation_data" / conversationId
1247 3 : / fmt::format("{}_{}", editedId, tid);
1248 1 : dhtnet::fileutils::remove(path, true);
1249 1 : } else {
1250 4 : json["body"] = newBody;
1251 : }
1252 5 : json["edit"] = editedId;
1253 5 : json["type"] = type;
1254 5 : sendMessage(conversationId, std::move(json));
1255 9 : }
1256 :
1257 : void
1258 465 : ConversationModule::Impl::bootstrapCb(std::string convId)
1259 : {
1260 465 : std::string commitId;
1261 : {
1262 465 : std::lock_guard lk(notSyncedNotificationMtx_);
1263 465 : auto it = notSyncedNotification_.find(convId);
1264 465 : if (it != notSyncedNotification_.end()) {
1265 211 : commitId = it->second;
1266 211 : notSyncedNotification_.erase(it);
1267 : }
1268 465 : }
1269 1395 : JAMI_DEBUG("[Conversation {}] Resend last message notification", convId);
1270 465 : dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] {
1271 465 : if (auto sthis = w.lock())
1272 465 : sthis->sendMessageNotification(convId, true, commitId);
1273 465 : });
1274 465 : }
1275 :
1276 : void
1277 689 : ConversationModule::Impl::fixStructures(
1278 : std::shared_ptr<JamiAccount> acc,
1279 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
1280 : const std::set<std::string>& toRm)
1281 : {
1282 691 : for (const auto& [uri, oldConv, newConv] : updateContactConv) {
1283 2 : updateConvForContact(uri, oldConv, newConv);
1284 : }
1285 : ////////////////////////////////////////////////////////////////
1286 : // Note: This is only to homogeneize trust and convRequests
1287 689 : std::vector<std::string> invalidPendingRequests;
1288 : {
1289 689 : auto requests = acc->getTrustRequests();
1290 689 : std::lock_guard lk(conversationsRequestsMtx_);
1291 690 : for (const auto& request : requests) {
1292 1 : auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID);
1293 1 : auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM);
1294 1 : if (itConvId != request.end() && itConvFrom != request.end()) {
1295 : // Check if requests exists or is declined.
1296 1 : auto itReq = conversationsRequests_.find(itConvId->second);
1297 1 : auto declined = itReq == conversationsRequests_.end() || itReq->second.declined;
1298 1 : if (declined) {
1299 3 : JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second);
1300 1 : invalidPendingRequests.emplace_back(itConvFrom->second);
1301 : }
1302 : }
1303 : }
1304 689 : auto requestRemoved = false;
1305 691 : for (auto it = conversationsRequests_.begin(); it != conversationsRequests_.end();) {
1306 2 : if (it->second.from == username_) {
1307 0 : JAMI_WARNING("Detected request from ourself, this makes no sense. Remove {}",
1308 : it->first);
1309 0 : it = conversationsRequests_.erase(it);
1310 : } else {
1311 2 : ++it;
1312 : }
1313 : }
1314 689 : if (requestRemoved) {
1315 0 : saveConvRequests();
1316 : }
1317 689 : }
1318 690 : for (const auto& invalidPendingRequest : invalidPendingRequests)
1319 1 : acc->discardTrustRequest(invalidPendingRequest);
1320 :
1321 : ////////////////////////////////////////////////////////////////
1322 691 : for (const auto& conv : toRm) {
1323 6 : JAMI_ERROR("Remove conversation ({})", conv);
1324 2 : removeConversation(conv);
1325 : }
1326 2067 : JAMI_DEBUG("[Account {}] Conversations loaded!", accountId_);
1327 689 : }
1328 :
1329 : void
1330 186 : ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
1331 : const std::string& deviceId,
1332 : const std::string& oldConvId)
1333 : {
1334 186 : std::lock_guard lk(conv->mtx);
1335 186 : const auto& conversationId = conv->info.id;
1336 186 : if (!conv->startFetch(deviceId, true)) {
1337 144 : JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
1338 48 : return;
1339 : }
1340 :
1341 138 : onNeedSocket_(
1342 : conversationId,
1343 : deviceId,
1344 138 : [wthis=weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) {
1345 138 : std::lock_guard lk(conv->mtx);
1346 138 : if (conv->pending && !conv->pending->ready) {
1347 135 : conv->pending->removeId = oldConvId;
1348 135 : if (channel) {
1349 126 : conv->pending->ready = true;
1350 126 : conv->pending->deviceId = channel->deviceId().toString();
1351 126 : conv->pending->socket = channel;
1352 126 : if (!conv->pending->cloning) {
1353 126 : conv->pending->cloning = true;
1354 378 : dht::ThreadPool::io().run([wthis,
1355 126 : conversationId,
1356 126 : deviceId = conv->pending->deviceId]() {
1357 252 : if (auto sthis = wthis.lock())
1358 126 : sthis->handlePendingConversation(conversationId, deviceId);
1359 : });
1360 : }
1361 126 : return true;
1362 18 : } else if (auto sthis = wthis.lock()) {
1363 9 : conv->stopFetch(deviceId);
1364 27 : JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count());
1365 9 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
1366 9 : conv->fallbackTimer *= 2;
1367 9 : if (conv->fallbackTimer > MAX_FALLBACK)
1368 0 : conv->fallbackTimer = MAX_FALLBACK;
1369 18 : conv->fallbackClone->async_wait(
1370 : std::bind(&ConversationModule::Impl::fallbackClone,
1371 : sthis,
1372 : std::placeholders::_1,
1373 9 : conversationId));
1374 :
1375 :
1376 : }
1377 : }
1378 12 : return false;
1379 138 : },
1380 : MIME_TYPE_GIT);
1381 186 : }
1382 :
1383 : void
1384 13 : ConversationModule::Impl::fallbackClone(const asio::error_code& ec, const std::string& conversationId)
1385 : {
1386 13 : if (ec == asio::error::operation_aborted)
1387 1 : return;
1388 12 : auto conv = getConversation(conversationId);
1389 12 : if (!conv || conv->conversation)
1390 0 : return;
1391 12 : auto members = getConversationMembers(conversationId);
1392 36 : for (const auto& member : members)
1393 24 : if (member.at("uri") != username_)
1394 12 : cloneConversationFrom(conversationId, member.at("uri"));
1395 12 : }
1396 :
1397 : void
1398 847 : ConversationModule::Impl::bootstrap(const std::string& convId)
1399 : {
1400 847 : std::vector<DeviceId> kd;
1401 : {
1402 847 : std::unique_lock lk(conversationsMtx_);
1403 847 : const auto& devices = accountManager_->getKnownDevices();
1404 847 : kd.reserve(devices.size());
1405 2759 : for (const auto& [id, _] : devices)
1406 1912 : kd.emplace_back(id);
1407 847 : }
1408 143 : auto bootstrap = [&](auto& conv) {
1409 143 : if (conv) {
1410 : #ifdef LIBJAMI_TESTABLE
1411 143 : conv->onBootstrapStatus(bootstrapCbTest_);
1412 : #endif // LIBJAMI_TESTABLE
1413 143 : conv->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb, this, conv->id()), kd);
1414 : }
1415 143 : };
1416 847 : std::vector<std::string> toClone;
1417 847 : if (convId.empty()) {
1418 681 : std::lock_guard lk(convInfosMtx_);
1419 718 : for (const auto& [conversationId, convInfo] : convInfos_) {
1420 37 : auto conv = getConversation(conversationId);
1421 37 : if (!conv)
1422 0 : return;
1423 37 : if ((!conv->conversation && !conv->info.isRemoved())) {
1424 : // Because we're not tracking contact presence in order to sync now,
1425 : // we need to ask to clone requests when bootstraping all conversations
1426 : // else it can stay syncing
1427 3 : toClone.emplace_back(conversationId);
1428 34 : } else if (conv->conversation) {
1429 32 : bootstrap(conv->conversation);
1430 : }
1431 37 : }
1432 847 : } else if (auto conv = getConversation(convId)) {
1433 112 : std::lock_guard lk(conv->mtx);
1434 112 : if (conv->conversation)
1435 111 : bootstrap(conv->conversation);
1436 278 : }
1437 :
1438 850 : for (const auto& cid : toClone) {
1439 3 : auto members = getConversationMembers(cid);
1440 10 : for (const auto& member : members) {
1441 7 : if (member.at("uri") != username_)
1442 4 : cloneConversationFrom(cid, member.at("uri"));
1443 : }
1444 3 : }
1445 847 : }
1446 :
1447 : void
1448 192 : ConversationModule::Impl::cloneConversationFrom(const std::string& conversationId,
1449 : const std::string& uri,
1450 : const std::string& oldConvId)
1451 : {
1452 192 : auto memberHash = dht::InfoHash(uri);
1453 192 : if (!memberHash) {
1454 0 : JAMI_WARNING("Invalid member detected: {}", uri);
1455 0 : return;
1456 : }
1457 192 : auto conv = startConversation(conversationId);
1458 192 : std::lock_guard lk(conv->mtx);
1459 192 : conv->info = {conversationId};
1460 192 : conv->info.created = std::time(nullptr);
1461 192 : conv->info.members.emplace(username_);
1462 192 : conv->info.members.emplace(uri);
1463 192 : accountManager_->forEachDevice(
1464 : memberHash,
1465 182 : [w = weak(), conv, conversationId, oldConvId](
1466 : const std::shared_ptr<dht::crypto::PublicKey>& pk) {
1467 182 : auto sthis = w.lock();
1468 182 : auto deviceId = pk->getLongId().toString();
1469 182 : if (!sthis or deviceId == sthis->deviceId_)
1470 0 : return;
1471 182 : sthis->cloneConversationFrom(conv, deviceId, oldConvId);
1472 182 : });
1473 192 : addConvInfo(conv->info);
1474 192 : }
1475 :
1476 : ////////////////////////////////////////////////////////////////
1477 :
1478 : void
1479 494 : ConversationModule::saveConvRequests(
1480 : const std::string& accountId,
1481 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1482 : {
1483 494 : auto path = fileutils::get_data_dir() / accountId;
1484 494 : saveConvRequestsToPath(path, conversationsRequests);
1485 494 : }
1486 :
1487 : void
1488 1277 : ConversationModule::saveConvRequestsToPath(
1489 : const std::filesystem::path& path,
1490 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1491 : {
1492 1277 : auto p = path / "convRequests";
1493 1277 : std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
1494 1277 : std::ofstream file(p, std::ios::trunc | std::ios::binary);
1495 1277 : msgpack::pack(file, conversationsRequests);
1496 1277 : }
1497 :
1498 : void
1499 3403 : ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations)
1500 : {
1501 3403 : auto path = fileutils::get_data_dir() / accountId;
1502 3403 : saveConvInfosToPath(path, conversations);
1503 3403 : }
1504 :
1505 : void
1506 4186 : ConversationModule::saveConvInfosToPath(const std::filesystem::path& path,
1507 : const ConvInfoMap& conversations)
1508 : {
1509 8372 : std::ofstream file(path / "convInfo", std::ios::trunc | std::ios::binary);
1510 4186 : msgpack::pack(file, conversations);
1511 4186 : }
1512 :
1513 : ////////////////////////////////////////////////////////////////
1514 :
1515 677 : ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account,
1516 : std::shared_ptr<AccountManager> accountManager,
1517 : NeedsSyncingCb&& needsSyncingCb,
1518 : SengMsgCb&& sendMsgCb,
1519 : NeedSocketCb&& onNeedSocket,
1520 : NeedSocketCb&& onNeedSwarmSocket,
1521 : OneToOneRecvCb&& oneToOneRecvCb,
1522 677 : bool autoLoadConversations)
1523 677 : : pimpl_ {std::make_unique<Impl>(std::move(account),
1524 677 : std::move(accountManager),
1525 677 : std::move(needsSyncingCb),
1526 677 : std::move(sendMsgCb),
1527 677 : std::move(onNeedSocket),
1528 677 : std::move(onNeedSwarmSocket),
1529 677 : std::move(oneToOneRecvCb))}
1530 : {
1531 677 : if (autoLoadConversations) {
1532 677 : loadConversations();
1533 : }
1534 677 : }
1535 :
1536 : void
1537 16 : ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager)
1538 : {
1539 16 : std::unique_lock lk(pimpl_->conversationsMtx_);
1540 16 : pimpl_->accountManager_ = accountManager;
1541 16 : }
1542 :
1543 : #ifdef LIBJAMI_TESTABLE
1544 : void
1545 18 : ConversationModule::onBootstrapStatus(
1546 : const std::function<void(std::string, Conversation::BootstrapStatus)>& cb)
1547 : {
1548 18 : pimpl_->bootstrapCbTest_ = cb;
1549 19 : for (auto& c : pimpl_->getConversations())
1550 19 : c->onBootstrapStatus(pimpl_->bootstrapCbTest_);
1551 18 : }
1552 : #endif
1553 :
1554 : void
1555 689 : ConversationModule::loadConversations()
1556 : {
1557 689 : auto acc = pimpl_->account_.lock();
1558 689 : if (!acc)
1559 0 : return;
1560 2067 : JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
1561 : auto conversationsRepositories = dhtnet::fileutils::readDirectory(
1562 1378 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1563 :
1564 689 : std::unique_lock lk(pimpl_->conversationsMtx_);
1565 689 : auto contacts = pimpl_->accountManager_->getContacts(true); // Avoid to lock configurationMtx while conv Mtx is locked
1566 689 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1567 689 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1568 689 : pimpl_->conversations_.clear();
1569 :
1570 : struct Ctx
1571 : {
1572 : std::mutex cvMtx;
1573 : std::condition_variable cv;
1574 : std::mutex toRmMtx;
1575 : std::set<std::string> toRm;
1576 : std::mutex convMtx;
1577 : size_t convNb;
1578 : std::vector<std::map<std::string, std::string>> contacts;
1579 : std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
1580 : };
1581 689 : auto ctx = std::make_shared<Ctx>();
1582 689 : ctx->convNb = conversationsRepositories.size();
1583 689 : ctx->contacts = std::move(contacts);
1584 :
1585 707 : for (auto&& r : conversationsRepositories) {
1586 18 : dht::ThreadPool::io().run([this, ctx, repository=std::move(r), acc] {
1587 : try {
1588 18 : auto sconv = std::make_shared<SyncedConversation>(repository);
1589 18 : auto conv = std::make_shared<Conversation>(acc, repository);
1590 18 : conv->onMessageStatusChanged([this, repository](const auto& status) {
1591 6 : auto msg = std::make_shared<SyncMsg>();
1592 12 : msg->ms = {{repository, status}};
1593 6 : pimpl_->needsSyncingCb_(std::move(msg));
1594 6 : });
1595 54 : conv->onMembersChanged(
1596 36 : [w = pimpl_->weak_from_this(), repository](const auto& members) {
1597 : // Delay in another thread to avoid deadlocks
1598 10 : dht::ThreadPool::io().run([w, repository, members = std::move(members)] {
1599 10 : if (auto sthis = w.lock())
1600 5 : sthis->setConversationMembers(repository, members);
1601 : });
1602 5 : });
1603 18 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1604 18 : auto members = conv->memberUris(acc->getUsername(), {});
1605 : // NOTE: The following if is here to protect against any incorrect state
1606 : // that can be introduced
1607 18 : if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
1608 : // If we got a 1:1 conversation, but not in the contact details, it's rather a
1609 : // duplicate or a weird state
1610 5 : auto otherUri = *members.begin();
1611 5 : auto itContact = std::find_if(ctx->contacts.cbegin(),
1612 5 : ctx->contacts.cend(),
1613 5 : [&](const auto& c) {
1614 5 : return c.at("id") == otherUri;
1615 : });
1616 5 : if (itContact == ctx->contacts.end()) {
1617 0 : JAMI_WARNING("Contact {} not found", otherUri);
1618 0 : std::lock_guard lkCv {ctx->cvMtx};
1619 0 : --ctx->convNb;
1620 0 : ctx->cv.notify_all();
1621 0 : return;
1622 0 : }
1623 5 : const std::string& convFromDetails = itContact->at("conversationId");
1624 5 : auto removed = std::stoul(itContact->at("removed"));
1625 5 : auto added = std::stoul(itContact->at("added"));
1626 5 : auto isRemoved = removed > added;
1627 5 : if (convFromDetails != repository) {
1628 3 : if (convFromDetails.empty()) {
1629 2 : if (isRemoved) {
1630 : // If details is empty, contact is removed and not banned.
1631 0 : JAMI_ERROR("Conversation {} detected for {} and should be removed",
1632 : repository,
1633 : otherUri);
1634 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1635 0 : ctx->toRm.insert(repository);
1636 0 : } else {
1637 6 : JAMI_ERROR("No conversation detected for {} but one exists ({}). "
1638 : "Update details",
1639 : otherUri,
1640 : repository);
1641 2 : std::lock_guard lkMtx {ctx->toRmMtx};
1642 4 : ctx->updateContactConv.emplace_back(
1643 4 : std::make_tuple(otherUri, convFromDetails, repository));
1644 2 : }
1645 : } else {
1646 3 : JAMI_ERROR("Multiple conversation detected for {} but ({} & {})",
1647 : otherUri,
1648 : repository,
1649 : convFromDetails);
1650 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1651 1 : ctx->toRm.insert(repository);
1652 1 : }
1653 : }
1654 5 : }
1655 : {
1656 18 : std::lock_guard lkMtx {ctx->convMtx};
1657 18 : auto convInfo = pimpl_->convInfos_.find(repository);
1658 18 : if (convInfo == pimpl_->convInfos_.end()) {
1659 9 : JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
1660 3 : sconv->info.created = std::time(nullptr);
1661 3 : sconv->info.lastDisplayed
1662 3 : = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
1663 : } else {
1664 15 : sconv->info = convInfo->second;
1665 15 : if (convInfo->second.isRemoved()) {
1666 : // A conversation was removed, but repository still exists
1667 1 : conv->setRemovingFlag();
1668 1 : std::lock_guard lkMtx {ctx->toRmMtx};
1669 1 : ctx->toRm.insert(repository);
1670 1 : }
1671 : }
1672 : // Even if we found the conversation in convInfos_, unable to assume that the list of members
1673 : // stored in `convInfo` is correct (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1025).
1674 : // For this reason, we always use the list we got from the conversation repository to set
1675 : // the value of `sconv->info.members`.
1676 18 : members.emplace(acc->getUsername());
1677 18 : sconv->info.members = std::move(members);
1678 : // convInfosMtx_ is already locked
1679 18 : pimpl_->convInfos_[repository] = sconv->info;
1680 18 : }
1681 18 : auto commits = conv->commitsEndedCalls();
1682 :
1683 18 : if (!commits.empty()) {
1684 : // Note: here, this means that some calls were actives while the
1685 : // daemon finished (can be a crash).
1686 : // Notify other in the conversation that the call is finished
1687 0 : pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
1688 : }
1689 18 : sconv->conversation = conv;
1690 18 : std::lock_guard lkMtx {ctx->convMtx};
1691 18 : pimpl_->conversations_.emplace(repository, std::move(sconv));
1692 18 : } catch (const std::logic_error& e) {
1693 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}",
1694 : pimpl_->accountId_,
1695 : e.what());
1696 0 : }
1697 18 : std::lock_guard lkCv {ctx->cvMtx};
1698 18 : --ctx->convNb;
1699 18 : ctx->cv.notify_all();
1700 18 : });
1701 : }
1702 :
1703 689 : std::unique_lock lkCv(ctx->cvMtx);
1704 1396 : ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; });
1705 :
1706 : // Prune any invalid conversations without members and
1707 : // set the removed flag if needed
1708 689 : std::set<std::string> removed;
1709 721 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1710 32 : const auto& info = itInfo->second;
1711 32 : if (info.members.empty()) {
1712 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1713 0 : continue;
1714 : }
1715 32 : if (info.isRemoved())
1716 2 : removed.insert(info.id);
1717 32 : auto itConv = pimpl_->conversations_.find(info.id);
1718 32 : if (itConv == pimpl_->conversations_.end()) {
1719 : // convInfos_ can contain a conversation that is not yet cloned
1720 : // so we need to add it there.
1721 14 : itConv = pimpl_->conversations_
1722 14 : .emplace(info.id, std::make_shared<SyncedConversation>(info))
1723 : .first;
1724 : }
1725 64 : if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation
1726 64 : && info.isRemoved())
1727 1 : itConv->second->conversation->setRemovingFlag();
1728 32 : if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
1729 : // In this case, the conversation is not synced and we only know ourself
1730 0 : if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
1731 0 : JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.",
1732 : pimpl_->accountId_,
1733 : info.id);
1734 0 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
1735 0 : info.id);
1736 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1737 0 : continue;
1738 0 : }
1739 : }
1740 32 : ++itInfo;
1741 : }
1742 : // On oldest version, removeConversation didn't update "appdata/contacts"
1743 : // causing a potential incorrect state between "appdata/contacts" and "appdata/convInfos"
1744 689 : if (!removed.empty())
1745 2 : acc->unlinkConversations(removed);
1746 : // Save if we've removed some invalid entries
1747 689 : pimpl_->saveConvInfos();
1748 :
1749 689 : ilk.unlock();
1750 689 : lk.unlock();
1751 :
1752 2067 : dht::ThreadPool::io().run([w = pimpl_->weak(),
1753 : acc,
1754 689 : updateContactConv = std::move(ctx->updateContactConv),
1755 689 : toRm = std::move(ctx->toRm)]() {
1756 : // Will lock account manager
1757 689 : if (auto shared = w.lock())
1758 689 : shared->fixStructures(acc, updateContactConv, toRm);
1759 689 : });
1760 689 : }
1761 :
1762 : void
1763 0 : ConversationModule::loadSingleConversation(const std::string& convId)
1764 : {
1765 0 : auto acc = pimpl_->account_.lock();
1766 0 : if (!acc)
1767 0 : return;
1768 0 : JAMI_LOG("[Account {}] Start loading conversation {}", pimpl_->accountId_, convId);
1769 :
1770 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
1771 0 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1772 : // Load convInfos to retrieve requests that have been accepted but not yet synchronized.
1773 0 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1774 0 : pimpl_->conversations_.clear();
1775 :
1776 : try {
1777 0 : auto sconv = std::make_shared<SyncedConversation>(convId);
1778 :
1779 0 : auto conv = std::make_shared<Conversation>(acc, convId);
1780 :
1781 0 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1782 :
1783 0 : sconv->conversation = conv;
1784 0 : pimpl_->conversations_.emplace(convId, std::move(sconv));
1785 0 : } catch (const std::logic_error& e) {
1786 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
1787 0 : }
1788 :
1789 : // Add all other conversations as dummy conversations to indicate their existence so
1790 : // isConversation could detect conversations correctly.
1791 : auto conversationsRepositoryIds = dhtnet::fileutils::readDirectory(
1792 0 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1793 0 : for (auto repositoryId : conversationsRepositoryIds) {
1794 0 : if (repositoryId != convId) {
1795 0 : auto conv = std::make_shared<SyncedConversation>(convId);
1796 0 : pimpl_->conversations_.emplace(repositoryId, conv);
1797 0 : }
1798 0 : }
1799 :
1800 : // Add conversations from convInfos_ so isConversation could detect conversations correctly.
1801 : // This includes conversations that have been accepted but are not yet synchronized.
1802 0 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1803 0 : const auto& info = itInfo->second;
1804 0 : if (info.members.empty()) {
1805 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1806 0 : continue;
1807 : }
1808 0 : auto itConv = pimpl_->conversations_.find(info.id);
1809 0 : if (itConv == pimpl_->conversations_.end()) {
1810 : // convInfos_ can contain a conversation that is not yet cloned
1811 : // so we need to add it there.
1812 0 : pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info)).first;
1813 : }
1814 0 : ++itInfo;
1815 : }
1816 :
1817 0 : ilk.unlock();
1818 0 : lk.unlock();
1819 0 : }
1820 :
1821 : void
1822 847 : ConversationModule::bootstrap(const std::string& convId)
1823 : {
1824 847 : pimpl_->bootstrap(convId);
1825 847 : }
1826 :
1827 : void
1828 0 : ConversationModule::monitor()
1829 : {
1830 0 : for (auto& conv : pimpl_->getConversations())
1831 0 : conv->monitor();
1832 0 : }
1833 :
1834 : void
1835 698 : ConversationModule::clearPendingFetch()
1836 : {
1837 : // Note: This is a workaround. convModule() is kept if account is disabled/re-enabled.
1838 : // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch
1839 : // is not erased (callback not called), it will block the new messages as it will not
1840 : // sync. The best way to debug this is to get logs from the last ICE connection for
1841 : // syncing the conversation. It may have been killed in some un-expected way avoiding to
1842 : // call the callbacks. This should never happen, but if it's the case, this will allow
1843 : // new messages to be synced correctly.
1844 726 : for (auto& conv : pimpl_->getSyncedConversations()) {
1845 28 : std::lock_guard lk(conv->mtx);
1846 28 : if (conv && conv->pending) {
1847 0 : JAMI_ERR("This is a bug, seems to still fetch to some device on initializing");
1848 0 : conv->pending.reset();
1849 : }
1850 726 : }
1851 698 : }
1852 :
1853 : void
1854 0 : ConversationModule::reloadRequests()
1855 : {
1856 0 : pimpl_->conversationsRequests_ = convRequests(pimpl_->accountId_);
1857 0 : }
1858 :
1859 : std::vector<std::string>
1860 12 : ConversationModule::getConversations() const
1861 : {
1862 12 : std::vector<std::string> result;
1863 12 : std::lock_guard lk(pimpl_->convInfosMtx_);
1864 12 : result.reserve(pimpl_->convInfos_.size());
1865 24 : for (const auto& [key, conv] : pimpl_->convInfos_) {
1866 12 : if (conv.isRemoved())
1867 4 : continue;
1868 8 : result.emplace_back(key);
1869 : }
1870 24 : return result;
1871 12 : }
1872 :
1873 : std::string
1874 636 : ConversationModule::getOneToOneConversation(const std::string& uri) const noexcept
1875 : {
1876 636 : return pimpl_->getOneToOneConversation(uri);
1877 : }
1878 :
1879 : bool
1880 8 : ConversationModule::updateConvForContact(const std::string& uri,
1881 : const std::string& oldConv,
1882 : const std::string& newConv)
1883 : {
1884 8 : return pimpl_->updateConvForContact(uri, oldConv, newConv);
1885 : }
1886 :
1887 : std::vector<std::map<std::string, std::string>>
1888 12 : ConversationModule::getConversationRequests() const
1889 : {
1890 12 : std::vector<std::map<std::string, std::string>> requests;
1891 12 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1892 12 : requests.reserve(pimpl_->conversationsRequests_.size());
1893 24 : for (const auto& [id, request] : pimpl_->conversationsRequests_) {
1894 12 : if (request.declined)
1895 6 : continue; // Do not add declined requests
1896 6 : requests.emplace_back(request.toMap());
1897 : }
1898 24 : return requests;
1899 12 : }
1900 :
1901 : void
1902 77 : ConversationModule::onTrustRequest(const std::string& uri,
1903 : const std::string& conversationId,
1904 : const std::vector<uint8_t>& payload,
1905 : time_t received)
1906 : {
1907 77 : auto oldConv = getOneToOneConversation(uri);
1908 77 : if (!oldConv.empty() && pimpl_->isConversation(oldConv)) {
1909 : // If there is already an active one to one conversation here, it's an active
1910 : // contact and the contact will reclone this activeConv, so ignore the request
1911 21 : JAMI_WARNING(
1912 : "Contact is sending a request for a non active conversation. Ignore. They will "
1913 : "clone the old one");
1914 7 : return;
1915 : }
1916 70 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1917 70 : ConversationRequest req;
1918 70 : req.from = uri;
1919 70 : req.conversationId = conversationId;
1920 70 : req.received = std::time(nullptr);
1921 210 : req.metadatas = ConversationRepository::infosFromVCard(vCard::utils::toMap(
1922 140 : std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size())));
1923 70 : auto reqMap = req.toMap();
1924 70 : if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
1925 62 : lk.unlock();
1926 62 : emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_,
1927 : conversationId,
1928 : uri,
1929 : payload,
1930 : received);
1931 62 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1932 : conversationId,
1933 : reqMap);
1934 62 : pimpl_->needsSyncingCb_({});
1935 : } else {
1936 24 : JAMI_DEBUG("[Account {}] Received a request for a conversation "
1937 : "already existing. Ignore",
1938 : pimpl_->accountId_);
1939 : }
1940 77 : }
1941 :
1942 : void
1943 342 : ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
1944 : {
1945 342 : ConversationRequest req(value);
1946 342 : auto isOneToOne = req.isOneToOne();
1947 342 : std::string oldConv;
1948 342 : if (isOneToOne) {
1949 14 : oldConv = pimpl_->getOneToOneConversation(from);
1950 : }
1951 342 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1952 1026 : JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
1953 : pimpl_->accountId_,
1954 : req.conversationId,
1955 : from);
1956 342 : auto convId = req.conversationId;
1957 :
1958 : // Already accepted request, do nothing
1959 342 : if (pimpl_->isConversation(convId))
1960 116 : return;
1961 226 : auto oldReq = pimpl_->getRequest(convId);
1962 226 : if (oldReq != std::nullopt) {
1963 261 : JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. "
1964 : "Ignore. Declined: {}",
1965 : pimpl_->accountId_,
1966 : static_cast<int>(oldReq->declined));
1967 87 : return;
1968 : }
1969 :
1970 139 : if (!oldConv.empty()) {
1971 7 : lk.unlock();
1972 : // Already a conversation with the contact.
1973 : // If there is already an active one to one conversation here, it's an active
1974 : // contact and the contact will reclone this activeConv, so ignore the request
1975 21 : JAMI_WARNING(
1976 : "Contact is sending a request for a non active conversation. Ignore. They will "
1977 : "clone the old one");
1978 7 : return;
1979 : }
1980 :
1981 132 : req.received = std::time(nullptr);
1982 132 : req.from = from;
1983 132 : auto reqMap = req.toMap();
1984 132 : if (pimpl_->addConversationRequest(convId, std::move(req))) {
1985 132 : lk.unlock();
1986 : // Note: no need to sync here because other connected devices should receive
1987 : // the same conversation request. Will sync when the conversation will be added
1988 132 : if (isOneToOne)
1989 1 : pimpl_->oneToOneRecvCb_(convId, from);
1990 132 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1991 : convId,
1992 : reqMap);
1993 : }
1994 1066 : }
1995 :
1996 : std::string
1997 3 : ConversationModule::peerFromConversationRequest(const std::string& convId) const
1998 : {
1999 3 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2000 3 : auto it = pimpl_->conversationsRequests_.find(convId);
2001 3 : if (it != pimpl_->conversationsRequests_.end()) {
2002 3 : return it->second.from;
2003 : }
2004 0 : return {};
2005 3 : }
2006 :
2007 : void
2008 150 : ConversationModule::onNeedConversationRequest(const std::string& from,
2009 : const std::string& conversationId)
2010 : {
2011 150 : pimpl_->withConversation(conversationId, [&](auto& conversation) {
2012 149 : if (!conversation.isMember(from, true)) {
2013 0 : JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId);
2014 0 : return;
2015 : }
2016 447 : JAMI_LOG("{} is asking a new invite for {}", from, conversationId);
2017 149 : pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
2018 : });
2019 150 : }
2020 :
2021 : void
2022 190 : ConversationModule::acceptConversationRequest(const std::string& conversationId,
2023 : const std::string& deviceId)
2024 : {
2025 : // For all conversation members, try to open a git channel with this conversation ID
2026 190 : std::unique_lock lkCr(pimpl_->conversationsRequestsMtx_);
2027 190 : auto request = pimpl_->getRequest(conversationId);
2028 190 : if (request == std::nullopt) {
2029 21 : lkCr.unlock();
2030 21 : if (auto conv = pimpl_->getConversation(conversationId)) {
2031 12 : std::unique_lock lk(conv->mtx);
2032 12 : if (!conv->conversation) {
2033 4 : lk.unlock();
2034 4 : pimpl_->cloneConversationFrom(conv, deviceId);
2035 : }
2036 33 : }
2037 63 : JAMI_WARNING("[Account {}] Request not found for conversation {}",
2038 : pimpl_->accountId_,
2039 : conversationId);
2040 21 : return;
2041 : }
2042 169 : pimpl_->rmConversationRequest(conversationId);
2043 169 : lkCr.unlock();
2044 169 : pimpl_->accountManager_->acceptTrustRequest(request->from, true);
2045 169 : cloneConversationFrom(conversationId, request->from);
2046 211 : }
2047 :
2048 : void
2049 6 : ConversationModule::declineConversationRequest(const std::string& conversationId)
2050 : {
2051 6 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2052 6 : auto it = pimpl_->conversationsRequests_.find(conversationId);
2053 6 : if (it != pimpl_->conversationsRequests_.end()) {
2054 6 : it->second.declined = std::time(nullptr);
2055 6 : pimpl_->saveConvRequests();
2056 : }
2057 6 : pimpl_->syncingMetadatas_.erase(conversationId);
2058 6 : pimpl_->saveMetadatas();
2059 6 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2060 : conversationId);
2061 6 : pimpl_->needsSyncingCb_({});
2062 6 : }
2063 :
2064 : std::string
2065 181 : ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash& otherMember)
2066 : {
2067 181 : auto acc = pimpl_->account_.lock();
2068 181 : if (!acc)
2069 0 : return {};
2070 181 : std::vector<DeviceId> kd;
2071 1364 : for (const auto& [id, _] : acc->getKnownDevices())
2072 1364 : kd.emplace_back(id);
2073 : // Create the conversation object
2074 181 : std::shared_ptr<Conversation> conversation;
2075 : try {
2076 181 : conversation = std::make_shared<Conversation>(acc, mode, otherMember.toString());
2077 181 : auto conversationId = conversation->id();
2078 181 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
2079 567 : auto msg = std::make_shared<SyncMsg>();
2080 1134 : msg->ms = {{conversationId, status}};
2081 567 : pimpl_->needsSyncingCb_(std::move(msg));
2082 567 : });
2083 181 : conversation->onMembersChanged([w=pimpl_->weak_from_this(), conversationId](const auto& members) {
2084 : // Delay in another thread to avoid deadlocks
2085 1136 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
2086 1136 : if (auto sthis = w.lock())
2087 568 : sthis->setConversationMembers(conversationId, members);
2088 : });
2089 568 : });
2090 181 : conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
2091 : #ifdef LIBJAMI_TESTABLE
2092 181 : conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
2093 : #endif // LIBJAMI_TESTABLE
2094 362 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
2095 181 : pimpl_.get(),
2096 : conversationId),
2097 : kd);
2098 181 : } catch (const std::exception& e) {
2099 0 : JAMI_ERROR("[Account {}] Error while generating a conversation {}",
2100 : pimpl_->accountId_, e.what());
2101 0 : return {};
2102 0 : }
2103 181 : auto convId = conversation->id();
2104 181 : auto conv = pimpl_->startConversation(convId);
2105 181 : std::unique_lock lk(conv->mtx);
2106 181 : conv->info.created = std::time(nullptr);
2107 181 : conv->info.members.emplace(pimpl_->username_);
2108 181 : if (otherMember)
2109 66 : conv->info.members.emplace(otherMember.toString());
2110 181 : conv->conversation = conversation;
2111 181 : addConvInfo(conv->info);
2112 181 : lk.unlock();
2113 :
2114 181 : pimpl_->needsSyncingCb_({});
2115 181 : emitSignal<libjami::ConversationSignal::ConversationReady>(pimpl_->accountId_, convId);
2116 181 : return convId;
2117 181 : }
2118 :
2119 : void
2120 176 : ConversationModule::cloneConversationFrom(const std::string& conversationId,
2121 : const std::string& uri,
2122 : const std::string& oldConvId)
2123 : {
2124 176 : pimpl_->cloneConversationFrom(conversationId, uri, oldConvId);
2125 176 : }
2126 :
2127 : // Message send/load
2128 : void
2129 92 : ConversationModule::sendMessage(const std::string& conversationId,
2130 : std::string message,
2131 : const std::string& replyTo,
2132 : const std::string& type,
2133 : bool announce,
2134 : OnCommitCb&& onCommit,
2135 : OnDoneCb&& cb)
2136 : {
2137 184 : pimpl_->sendMessage(conversationId,
2138 92 : std::move(message),
2139 : replyTo,
2140 : type,
2141 : announce,
2142 92 : std::move(onCommit),
2143 92 : std::move(cb));
2144 92 : }
2145 :
2146 : void
2147 14 : ConversationModule::sendMessage(const std::string& conversationId,
2148 : Json::Value&& value,
2149 : const std::string& replyTo,
2150 : bool announce,
2151 : OnCommitCb&& onCommit,
2152 : OnDoneCb&& cb)
2153 : {
2154 28 : pimpl_->sendMessage(conversationId,
2155 14 : std::move(value),
2156 : replyTo,
2157 : announce,
2158 14 : std::move(onCommit),
2159 14 : std::move(cb));
2160 14 : }
2161 :
2162 : void
2163 7 : ConversationModule::editMessage(const std::string& conversationId,
2164 : const std::string& newBody,
2165 : const std::string& editedId)
2166 : {
2167 7 : pimpl_->editMessage(conversationId, newBody, editedId);
2168 7 : }
2169 :
2170 : void
2171 3 : ConversationModule::reactToMessage(const std::string& conversationId,
2172 : const std::string& newBody,
2173 : const std::string& reactToId)
2174 : {
2175 : // Commit message edition
2176 3 : Json::Value json;
2177 3 : json["body"] = newBody;
2178 3 : json["react-to"] = reactToId;
2179 3 : json["type"] = "text/plain";
2180 3 : pimpl_->sendMessage(conversationId, std::move(json));
2181 3 : }
2182 :
2183 : void
2184 100 : ConversationModule::addCallHistoryMessage(const std::string& uri,
2185 : uint64_t duration_ms,
2186 : const std::string& reason)
2187 : {
2188 100 : auto finalUri = uri.substr(0, uri.find("@ring.dht"));
2189 100 : finalUri = finalUri.substr(0, uri.find("@jami.dht"));
2190 100 : auto convId = getOneToOneConversation(finalUri);
2191 100 : if (!convId.empty()) {
2192 3 : Json::Value value;
2193 3 : value["to"] = finalUri;
2194 3 : value["type"] = "application/call-history+json";
2195 3 : value["duration"] = std::to_string(duration_ms);
2196 3 : if (!reason.empty())
2197 3 : value["reason"] = reason;
2198 3 : sendMessage(convId, std::move(value));
2199 3 : }
2200 100 : }
2201 :
2202 : bool
2203 19 : ConversationModule::onMessageDisplayed(const std::string& peer,
2204 : const std::string& conversationId,
2205 : const std::string& interactionId)
2206 : {
2207 19 : if (auto conv = pimpl_->getConversation(conversationId)) {
2208 19 : std::unique_lock lk(conv->mtx);
2209 19 : if (auto conversation = conv->conversation) {
2210 19 : lk.unlock();
2211 19 : return conversation->setMessageDisplayed(peer, interactionId);
2212 19 : }
2213 38 : }
2214 0 : return false;
2215 : }
2216 :
2217 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
2218 210 : ConversationModule::convMessageStatus() const
2219 : {
2220 210 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
2221 301 : for (const auto& conv : pimpl_->getConversations()) {
2222 91 : auto d = conv->messageStatus();
2223 91 : if (!d.empty())
2224 71 : messageStatus[conv->id()] = std::move(d);
2225 301 : }
2226 210 : return messageStatus;
2227 0 : }
2228 :
2229 : uint32_t
2230 0 : ConversationModule::loadConversationMessages(const std::string& conversationId,
2231 : const std::string& fromMessage,
2232 : size_t n)
2233 : {
2234 0 : auto acc = pimpl_->account_.lock();
2235 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2236 0 : std::lock_guard lk(conv->mtx);
2237 0 : if (conv->conversation) {
2238 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2239 0 : LogOptions options;
2240 0 : options.from = fromMessage;
2241 0 : options.nbOfCommits = n;
2242 0 : conv->conversation->loadMessages(
2243 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2244 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2245 0 : accountId,
2246 0 : conversationId,
2247 : messages);
2248 0 : },
2249 : options);
2250 0 : return id;
2251 0 : }
2252 0 : }
2253 0 : return 0;
2254 0 : }
2255 :
2256 : void
2257 0 : ConversationModule::clearCache(const std::string& conversationId)
2258 : {
2259 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2260 0 : std::lock_guard lk(conv->mtx);
2261 0 : if (conv->conversation) {
2262 0 : conv->conversation->clearCache();
2263 : }
2264 0 : }
2265 0 : }
2266 :
2267 : uint32_t
2268 2 : ConversationModule::loadConversation(const std::string& conversationId,
2269 : const std::string& fromMessage,
2270 : size_t n)
2271 : {
2272 2 : auto acc = pimpl_->account_.lock();
2273 2 : if (auto conv = pimpl_->getConversation(conversationId)) {
2274 2 : std::lock_guard lk(conv->mtx);
2275 2 : if (conv->conversation) {
2276 2 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2277 2 : LogOptions options;
2278 2 : options.from = fromMessage;
2279 2 : options.nbOfCommits = n;
2280 4 : conv->conversation->loadMessages2(
2281 2 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2282 4 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2283 2 : accountId,
2284 2 : conversationId,
2285 : messages);
2286 2 : },
2287 : options);
2288 2 : return id;
2289 2 : }
2290 4 : }
2291 0 : return 0;
2292 2 : }
2293 :
2294 : uint32_t
2295 0 : ConversationModule::loadConversationUntil(const std::string& conversationId,
2296 : const std::string& fromMessage,
2297 : const std::string& toMessage)
2298 : {
2299 0 : auto acc = pimpl_->account_.lock();
2300 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2301 0 : std::lock_guard lk(conv->mtx);
2302 0 : if (conv->conversation) {
2303 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2304 0 : LogOptions options;
2305 0 : options.from = fromMessage;
2306 0 : options.to = toMessage;
2307 0 : options.includeTo = true;
2308 0 : conv->conversation->loadMessages(
2309 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2310 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2311 0 : accountId,
2312 0 : conversationId,
2313 : messages);
2314 0 : },
2315 : options);
2316 0 : return id;
2317 0 : }
2318 0 : }
2319 0 : return 0;
2320 0 : }
2321 :
2322 : uint32_t
2323 0 : ConversationModule::loadSwarmUntil(const std::string& conversationId,
2324 : const std::string& fromMessage,
2325 : const std::string& toMessage)
2326 : {
2327 0 : auto acc = pimpl_->account_.lock();
2328 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2329 0 : std::lock_guard lk(conv->mtx);
2330 0 : if (conv->conversation) {
2331 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2332 0 : LogOptions options;
2333 0 : options.from = fromMessage;
2334 0 : options.to = toMessage;
2335 0 : options.includeTo = true;
2336 0 : conv->conversation->loadMessages2(
2337 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2338 0 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2339 0 : accountId,
2340 0 : conversationId,
2341 : messages);
2342 0 : },
2343 : options);
2344 0 : return id;
2345 0 : }
2346 0 : }
2347 0 : return 0;
2348 0 : }
2349 :
2350 : std::shared_ptr<TransferManager>
2351 76 : ConversationModule::dataTransfer(const std::string& conversationId) const
2352 : {
2353 76 : return pimpl_->withConversation(conversationId,
2354 150 : [](auto& conversation) { return conversation.dataTransfer(); });
2355 : }
2356 :
2357 : bool
2358 13 : ConversationModule::onFileChannelRequest(const std::string& conversationId,
2359 : const std::string& member,
2360 : const std::string& fileId,
2361 : bool verifyShaSum) const
2362 : {
2363 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2364 13 : std::lock_guard lk(conv->mtx);
2365 13 : if (conv->conversation)
2366 13 : return conv->conversation->onFileChannelRequest(member, fileId, verifyShaSum);
2367 26 : }
2368 0 : return false;
2369 : }
2370 :
2371 : bool
2372 11 : ConversationModule::downloadFile(const std::string& conversationId,
2373 : const std::string& interactionId,
2374 : const std::string& fileId,
2375 : const std::string& path,
2376 : size_t start,
2377 : size_t end)
2378 : {
2379 11 : if (auto conv = pimpl_->getConversation(conversationId)) {
2380 11 : std::lock_guard lk(conv->mtx);
2381 11 : if (conv->conversation)
2382 11 : return conv->conversation->downloadFile(interactionId, fileId, path, "", "", start, end);
2383 22 : }
2384 0 : return false;
2385 : }
2386 :
2387 : void
2388 1367 : ConversationModule::syncConversations(const std::string& peer, const std::string& deviceId)
2389 : {
2390 : // Sync conversations where peer is member
2391 1367 : std::set<std::string> toFetch;
2392 1368 : std::set<std::string> toClone;
2393 2060 : for (const auto& conv : pimpl_->getSyncedConversations()) {
2394 692 : std::lock_guard lk(conv->mtx);
2395 692 : if (conv->conversation) {
2396 660 : if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
2397 172 : toFetch.emplace(conv->info.id);
2398 : }
2399 32 : } else if (!conv->info.isRemoved()
2400 80 : && std::find(conv->info.members.begin(), conv->info.members.end(), peer)
2401 80 : != conv->info.members.end()) {
2402 : // In this case the conversation was never cloned (can be after an import)
2403 23 : toClone.emplace(conv->info.id);
2404 : }
2405 2060 : }
2406 1540 : for (const auto& cid : toFetch)
2407 172 : pimpl_->fetchNewCommits(peer, deviceId, cid);
2408 1391 : for (const auto& cid : toClone)
2409 23 : pimpl_->cloneConversation(deviceId, peer, cid);
2410 2736 : if (pimpl_->syncCnt.load() == 0)
2411 917 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_);
2412 1368 : }
2413 :
2414 : void
2415 186 : ConversationModule::onSyncData(const SyncMsg& msg,
2416 : const std::string& peerId,
2417 : const std::string& deviceId)
2418 : {
2419 186 : std::vector<std::string> toClone;
2420 297 : for (const auto& [key, convInfo] : msg.c) {
2421 111 : const auto& convId = convInfo.id;
2422 : {
2423 111 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2424 111 : pimpl_->rmConversationRequest(convId);
2425 111 : }
2426 :
2427 111 : auto conv = pimpl_->startConversation(convInfo);
2428 111 : std::unique_lock lk(conv->mtx);
2429 : // Skip outdated info
2430 111 : if (std::max(convInfo.created, convInfo.removed)
2431 111 : < std::max(conv->info.created, conv->info.removed))
2432 4 : continue;
2433 107 : if (not convInfo.isRemoved()) {
2434 : // If multi devices, it can detect a conversation that was already
2435 : // removed, so just check if the convinfo contains a removed conv
2436 94 : if (conv->info.removed) {
2437 0 : if (conv->info.removed >= convInfo.created) {
2438 : // Only reclone if re-added, else the peer is not synced yet (could be
2439 : // offline before)
2440 0 : continue;
2441 : }
2442 0 : JAMI_DEBUG("Re-add previously removed conversation {:s}", convId);
2443 : }
2444 94 : conv->info = convInfo;
2445 94 : if (!conv->conversation) {
2446 44 : if (deviceId != "") {
2447 44 : pimpl_->cloneConversation(deviceId, peerId, conv);
2448 : } else {
2449 : // In this case, information is from JAMS
2450 : // JAMS does not store the conversation itself, so we
2451 : // must use information to clone the conversation
2452 0 : addConvInfo(convInfo);
2453 0 : toClone.emplace_back(convId);
2454 : }
2455 : }
2456 : } else {
2457 13 : if (conv->conversation && !conv->conversation->isRemoving()) {
2458 3 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
2459 : convId);
2460 3 : conv->conversation->setRemovingFlag();
2461 : }
2462 13 : auto update = false;
2463 13 : if (!conv->info.removed) {
2464 2 : update = true;
2465 2 : conv->info.removed = std::time(nullptr);
2466 : }
2467 13 : if (convInfo.erased && !conv->info.erased) {
2468 2 : conv->info.erased = std::time(nullptr);
2469 2 : pimpl_->addConvInfo(conv->info);
2470 2 : pimpl_->removeRepositoryImpl(*conv, false);
2471 11 : } else if (update) {
2472 2 : pimpl_->addConvInfo(conv->info);
2473 : }
2474 : }
2475 115 : }
2476 :
2477 186 : for (const auto& cid : toClone) {
2478 0 : auto members = getConversationMembers(cid);
2479 0 : for (const auto& member : members) {
2480 0 : if (member.at("uri") != pimpl_->username_)
2481 0 : cloneConversationFrom(cid, member.at("uri"));
2482 : }
2483 0 : }
2484 :
2485 208 : for (const auto& [convId, req] : msg.cr) {
2486 22 : if (req.from == pimpl_->username_) {
2487 0 : JAMI_WARNING("Detected request from ourself, ignore {}.", convId);
2488 17 : continue;
2489 0 : }
2490 22 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
2491 22 : if (pimpl_->isConversation(convId)) {
2492 : // Already handled request
2493 0 : pimpl_->rmConversationRequest(convId);
2494 0 : continue;
2495 : }
2496 :
2497 : // New request
2498 22 : if (!pimpl_->addConversationRequest(convId, req))
2499 12 : continue;
2500 10 : lk.unlock();
2501 :
2502 10 : if (req.declined != 0) {
2503 : // Request declined
2504 15 : JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
2505 : pimpl_->accountId_,
2506 : convId,
2507 : deviceId);
2508 5 : pimpl_->syncingMetadatas_.erase(convId);
2509 5 : pimpl_->saveMetadatas();
2510 5 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2511 : convId);
2512 5 : continue;
2513 5 : }
2514 :
2515 15 : JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})",
2516 : pimpl_->accountId_,
2517 : convId,
2518 : deviceId);
2519 :
2520 5 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
2521 : convId,
2522 10 : req.toMap());
2523 22 : }
2524 :
2525 : // Updates preferences for conversations
2526 190 : for (const auto& [convId, p] : msg.p) {
2527 4 : if (auto conv = pimpl_->getConversation(convId)) {
2528 4 : std::unique_lock lk(conv->mtx);
2529 4 : if (conv->conversation) {
2530 2 : auto conversation = conv->conversation;
2531 2 : lk.unlock();
2532 2 : conversation->updatePreferences(p);
2533 4 : } else if (conv->pending) {
2534 2 : conv->pending->preferences = p;
2535 : }
2536 8 : }
2537 : }
2538 :
2539 : // Updates displayed for conversations
2540 264 : for (const auto& [convId, ms] : msg.ms) {
2541 78 : if (auto conv = pimpl_->getConversation(convId)) {
2542 78 : std::unique_lock lk(conv->mtx);
2543 78 : if (conv->conversation) {
2544 36 : auto conversation = conv->conversation;
2545 36 : lk.unlock();
2546 36 : conversation->updateMessageStatus(ms);
2547 78 : } else if (conv->pending) {
2548 40 : conv->pending->status = ms;
2549 : }
2550 156 : }
2551 : }
2552 186 : }
2553 :
2554 : bool
2555 2 : ConversationModule::needsSyncingWith(const std::string& memberUri, const std::string& deviceId) const
2556 : {
2557 : // Check if a conversation needs to fetch remote or to be cloned
2558 2 : std::lock_guard lk(pimpl_->conversationsMtx_);
2559 2 : for (const auto& [key, ci] : pimpl_->conversations_) {
2560 1 : std::lock_guard lk(ci->mtx);
2561 1 : if (ci->conversation) {
2562 0 : if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false))
2563 0 : return true;
2564 1 : } else if (!ci->info.removed
2565 3 : && std::find(ci->info.members.begin(), ci->info.members.end(), memberUri)
2566 3 : != ci->info.members.end()) {
2567 : // In this case the conversation was never cloned (can be after an import)
2568 1 : return true;
2569 : }
2570 1 : }
2571 1 : return false;
2572 2 : }
2573 :
2574 : void
2575 1109 : ConversationModule::setFetched(const std::string& conversationId,
2576 : const std::string& deviceId,
2577 : const std::string& commitId)
2578 : {
2579 1109 : if (auto conv = pimpl_->getConversation(conversationId)) {
2580 1109 : std::lock_guard lk(conv->mtx);
2581 1109 : if (conv->conversation) {
2582 1109 : bool remove = conv->conversation->isRemoving();
2583 1109 : conv->conversation->hasFetched(deviceId, commitId);
2584 1109 : if (remove)
2585 1 : pimpl_->removeRepositoryImpl(*conv, true);
2586 : }
2587 2218 : }
2588 1109 : }
2589 :
2590 : void
2591 15520 : ConversationModule::fetchNewCommits(const std::string& peer,
2592 : const std::string& deviceId,
2593 : const std::string& conversationId,
2594 : const std::string& commitId)
2595 : {
2596 15520 : pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId);
2597 15520 : }
2598 :
2599 : void
2600 136 : ConversationModule::addConversationMember(const std::string& conversationId,
2601 : const dht::InfoHash& contactUri,
2602 : bool sendRequest)
2603 : {
2604 136 : auto conv = pimpl_->getConversation(conversationId);
2605 136 : if (not conv || not conv->conversation) {
2606 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2607 0 : return;
2608 : }
2609 136 : std::unique_lock lk(conv->mtx);
2610 :
2611 136 : auto contactUriStr = contactUri.toString();
2612 136 : if (conv->conversation->isMember(contactUriStr, true)) {
2613 0 : JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUriStr, conversationId);
2614 : // Note: This should not be necessary, but if for whatever reason the other side didn't
2615 : // join we should not forbid new invites
2616 0 : auto invite = conv->conversation->generateInvitation();
2617 0 : lk.unlock();
2618 0 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2619 0 : return;
2620 0 : }
2621 :
2622 136 : conv->conversation->addMember(
2623 : contactUriStr,
2624 534 : [this, conv, conversationId, sendRequest, contactUriStr](bool ok, const std::string& commitId) {
2625 136 : if (ok) {
2626 134 : std::unique_lock lk(conv->mtx);
2627 134 : pimpl_->sendMessageNotification(*conv->conversation,
2628 : true,
2629 : commitId); // For the other members
2630 134 : if (sendRequest) {
2631 130 : auto invite = conv->conversation->generateInvitation();
2632 130 : lk.unlock();
2633 130 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2634 130 : }
2635 134 : }
2636 136 : });
2637 136 : }
2638 :
2639 : void
2640 14 : ConversationModule::removeConversationMember(const std::string& conversationId,
2641 : const dht::InfoHash& contactUri,
2642 : bool isDevice)
2643 : {
2644 14 : auto contactUriStr = contactUri.toString();
2645 14 : if (auto conv = pimpl_->getConversation(conversationId)) {
2646 14 : std::lock_guard lk(conv->mtx);
2647 14 : if (conv->conversation)
2648 14 : return conv->conversation->removeMember(
2649 26 : contactUriStr, isDevice, [this, conversationId](bool ok, const std::string& commitId) {
2650 14 : if (ok) {
2651 12 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2652 : }
2653 28 : });
2654 28 : }
2655 14 : }
2656 :
2657 : std::vector<std::map<std::string, std::string>>
2658 86 : ConversationModule::getConversationMembers(const std::string& conversationId,
2659 : bool includeBanned) const
2660 : {
2661 86 : return pimpl_->getConversationMembers(conversationId, includeBanned);
2662 : }
2663 :
2664 : uint32_t
2665 4 : ConversationModule::countInteractions(const std::string& convId,
2666 : const std::string& toId,
2667 : const std::string& fromId,
2668 : const std::string& authorUri) const
2669 : {
2670 4 : if (auto conv = pimpl_->getConversation(convId)) {
2671 4 : std::lock_guard lk(conv->mtx);
2672 4 : if (conv->conversation)
2673 4 : return conv->conversation->countInteractions(toId, fromId, authorUri);
2674 8 : }
2675 0 : return 0;
2676 : }
2677 :
2678 : void
2679 4 : ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const
2680 : {
2681 4 : if (convId.empty()) {
2682 0 : auto convs = pimpl_->getConversations();
2683 0 : if (convs.empty()) {
2684 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2685 : req,
2686 0 : pimpl_->accountId_,
2687 0 : std::string {},
2688 0 : std::vector<std::map<std::string, std::string>> {});
2689 0 : return;
2690 : }
2691 0 : auto finishedFlag = std::make_shared<std::atomic_int>(convs.size());
2692 0 : for (const auto& conv : convs) {
2693 0 : conv->search(req, filter, finishedFlag);
2694 : }
2695 4 : } else if (auto conv = pimpl_->getConversation(convId)) {
2696 4 : std::lock_guard lk(conv->mtx);
2697 4 : if (conv->conversation)
2698 4 : conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1));
2699 8 : }
2700 : }
2701 :
2702 : void
2703 9 : ConversationModule::updateConversationInfos(const std::string& conversationId,
2704 : const std::map<std::string, std::string>& infos,
2705 : bool sync)
2706 : {
2707 9 : auto conv = pimpl_->getConversation(conversationId);
2708 9 : if (not conv or not conv->conversation) {
2709 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2710 0 : return;
2711 : }
2712 9 : std::lock_guard lk(conv->mtx);
2713 9 : conv->conversation
2714 9 : ->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) {
2715 9 : if (ok && sync) {
2716 8 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2717 1 : } else if (sync)
2718 3 : JAMI_WARNING("Unable to update info on {:s}", conversationId);
2719 9 : });
2720 9 : }
2721 :
2722 : std::map<std::string, std::string>
2723 5 : ConversationModule::conversationInfos(const std::string& conversationId) const
2724 : {
2725 : {
2726 5 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2727 5 : auto itReq = pimpl_->conversationsRequests_.find(conversationId);
2728 5 : if (itReq != pimpl_->conversationsRequests_.end())
2729 1 : return itReq->second.metadatas;
2730 5 : }
2731 4 : if (auto conv = pimpl_->getConversation(conversationId)) {
2732 4 : std::lock_guard lk(conv->mtx);
2733 4 : std::map<std::string, std::string> md;
2734 : {
2735 4 : auto syncingMetadatasIt = pimpl_->syncingMetadatas_.find(conversationId);
2736 4 : if (syncingMetadatasIt != pimpl_->syncingMetadatas_.end()) {
2737 2 : if (conv->conversation) {
2738 0 : pimpl_->syncingMetadatas_.erase(syncingMetadatasIt);
2739 0 : pimpl_->saveMetadatas();
2740 : } else {
2741 2 : md = syncingMetadatasIt->second;
2742 : }
2743 : }
2744 : }
2745 4 : if (conv->conversation)
2746 2 : return conv->conversation->infos();
2747 : else
2748 2 : return md;
2749 8 : }
2750 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2751 0 : return {};
2752 : }
2753 :
2754 : void
2755 5 : ConversationModule::setConversationPreferences(const std::string& conversationId,
2756 : const std::map<std::string, std::string>& prefs)
2757 : {
2758 5 : if (auto conv = pimpl_->getConversation(conversationId)) {
2759 5 : std::unique_lock lk(conv->mtx);
2760 5 : if (not conv->conversation) {
2761 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2762 0 : return;
2763 : }
2764 5 : auto conversation = conv->conversation;
2765 5 : lk.unlock();
2766 5 : conversation->updatePreferences(prefs);
2767 5 : auto msg = std::make_shared<SyncMsg>();
2768 10 : msg->p = {{conversationId, conversation->preferences(true)}};
2769 5 : pimpl_->needsSyncingCb_(std::move(msg));
2770 10 : }
2771 : }
2772 :
2773 : std::map<std::string, std::string>
2774 14 : ConversationModule::getConversationPreferences(const std::string& conversationId,
2775 : bool includeCreated) const
2776 : {
2777 14 : if (auto conv = pimpl_->getConversation(conversationId)) {
2778 14 : std::lock_guard lk(conv->mtx);
2779 14 : if (conv->conversation)
2780 13 : return conv->conversation->preferences(includeCreated);
2781 28 : }
2782 1 : return {};
2783 : }
2784 :
2785 : std::map<std::string, std::map<std::string, std::string>>
2786 210 : ConversationModule::convPreferences() const
2787 : {
2788 210 : std::map<std::string, std::map<std::string, std::string>> p;
2789 301 : for (const auto& conv : pimpl_->getConversations()) {
2790 91 : auto prefs = conv->preferences(true);
2791 91 : if (!prefs.empty())
2792 3 : p[conv->id()] = std::move(prefs);
2793 301 : }
2794 210 : return p;
2795 0 : }
2796 :
2797 : std::vector<uint8_t>
2798 0 : ConversationModule::conversationVCard(const std::string& conversationId) const
2799 : {
2800 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2801 0 : std::lock_guard lk(conv->mtx);
2802 0 : if (conv->conversation)
2803 0 : return conv->conversation->vCard();
2804 0 : }
2805 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2806 0 : return {};
2807 : }
2808 :
2809 : bool
2810 4394 : ConversationModule::isBanned(const std::string& convId, const std::string& uri) const
2811 : {
2812 4394 : if (auto conv = pimpl_->getConversation(convId)) {
2813 4379 : std::lock_guard lk(conv->mtx);
2814 4381 : if (!conv->conversation)
2815 57 : return true;
2816 4323 : if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE)
2817 3683 : return conv->conversation->isBanned(uri);
2818 8772 : }
2819 : // If 1:1 we check the certificate status
2820 653 : std::lock_guard lk(pimpl_->conversationsMtx_);
2821 653 : return pimpl_->accountManager_->getCertificateStatus(uri) == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
2822 653 : }
2823 :
2824 : void
2825 26 : ConversationModule::removeContact(const std::string& uri, bool banned)
2826 : {
2827 : // Remove linked conversation's requests
2828 : {
2829 26 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2830 26 : auto update = false;
2831 26 : for (auto it = pimpl_->conversationsRequests_.begin();
2832 30 : it != pimpl_->conversationsRequests_.end();
2833 4 : ++it) {
2834 4 : if (it->second.from == uri && !it->second.declined) {
2835 12 : JAMI_DEBUG("Declining conversation request {:s} from {:s}", it->first, uri);
2836 4 : pimpl_->syncingMetadatas_.erase(it->first);
2837 4 : pimpl_->saveMetadatas();
2838 4 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(
2839 4 : pimpl_->accountId_, it->first);
2840 4 : update = true;
2841 4 : it->second.declined = std::time(nullptr);
2842 : }
2843 : }
2844 26 : if (update) {
2845 4 : pimpl_->saveConvRequests();
2846 4 : pimpl_->needsSyncingCb_({});
2847 : }
2848 26 : }
2849 26 : if (banned) {
2850 7 : auto conversationId = getOneToOneConversation(uri);
2851 11 : pimpl_->withConversation(conversationId, [&](auto& conv) { conv.shutdownConnections(); });
2852 7 : return; // Keep the conversation in banned model but stop connections
2853 7 : }
2854 : // Remove related conversation
2855 19 : auto isSelf = uri == pimpl_->username_;
2856 19 : std::vector<std::string> toRm;
2857 15 : auto updateClient = [&](const auto& convId) {
2858 15 : pimpl_->updateConvForContact(uri, convId, "");
2859 15 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, convId);
2860 15 : };
2861 18 : auto removeConvInfo = [&](const auto& conv, const auto& members) {
2862 1 : if ((isSelf && members.size() == 1)
2863 19 : || (!isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
2864 : // Mark the conversation as removed if it wasn't already
2865 17 : if (!conv->info.isRemoved()) {
2866 15 : conv->info.removed = std::time(nullptr);
2867 15 : updateClient(conv->info.id);
2868 15 : pimpl_->addConvInfo(conv->info);
2869 15 : return true;
2870 : }
2871 : }
2872 3 : return false;
2873 19 : };
2874 : {
2875 19 : std::lock_guard lk(pimpl_->conversationsMtx_);
2876 37 : for (auto& [convId, conv] : pimpl_->conversations_) {
2877 18 : std::lock_guard lk(conv->mtx);
2878 18 : if (conv->conversation) {
2879 : try {
2880 : // Note it's important to check getUsername(), else
2881 : // removing self can remove all conversations
2882 15 : if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) {
2883 15 : auto initMembers = conv->conversation->getInitialMembers();
2884 15 : if (removeConvInfo(conv, initMembers))
2885 14 : toRm.emplace_back(convId);
2886 15 : }
2887 0 : } catch (const std::exception& e) {
2888 0 : JAMI_WARN("%s", e.what());
2889 0 : }
2890 : } else {
2891 3 : removeConvInfo(conv, conv->info.members);
2892 : }
2893 18 : }
2894 19 : }
2895 : // Note, if we ban the device, we don't send the leave cause the other peer will just
2896 : // never got the notifications, so just erase the datas
2897 33 : for (const auto& id : toRm)
2898 14 : pimpl_->removeRepository(id, true, true);
2899 19 : }
2900 :
2901 : bool
2902 9 : ConversationModule::removeConversation(const std::string& conversationId)
2903 : {
2904 9 : return pimpl_->removeConversation(conversationId);
2905 : }
2906 :
2907 : void
2908 7 : ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId)
2909 : {
2910 7 : if (auto conv = pimpl_->getConversation(oldConvId)) {
2911 7 : std::lock_guard lk(conv->mtx);
2912 7 : if (conv->conversation) {
2913 7 : std::promise<bool> waitLoad;
2914 7 : std::future<bool> fut = waitLoad.get_future();
2915 : // we should wait for loadMessage, because it will be deleted after this.
2916 7 : conv->conversation->loadMessages(
2917 7 : [&](auto&& messages) {
2918 7 : std::reverse(messages.begin(),
2919 : messages.end()); // Log is inverted as we want to replay
2920 7 : std::lock_guard lk(pimpl_->replayMtx_);
2921 7 : pimpl_->replay_[newConvId] = std::move(messages);
2922 7 : waitLoad.set_value(true);
2923 7 : },
2924 : {});
2925 7 : fut.wait();
2926 7 : }
2927 14 : }
2928 7 : }
2929 :
2930 : bool
2931 64 : ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const
2932 : {
2933 64 : if (conversationId.empty()) {
2934 54 : std::lock_guard lk(pimpl_->conversationsMtx_);
2935 54 : return std::find_if(pimpl_->conversations_.cbegin(),
2936 54 : pimpl_->conversations_.cend(),
2937 10 : [&](const auto& conv) {
2938 10 : return conv.second->conversation
2939 10 : && conv.second->conversation->isHosting(confId);
2940 : })
2941 108 : != pimpl_->conversations_.cend();
2942 64 : } else if (auto conv = pimpl_->getConversation(conversationId)) {
2943 10 : if (conv->conversation) {
2944 10 : return conv->conversation->isHosting(confId);
2945 : }
2946 10 : }
2947 0 : return false;
2948 : }
2949 :
2950 : std::vector<std::map<std::string, std::string>>
2951 17 : ConversationModule::getActiveCalls(const std::string& conversationId) const
2952 : {
2953 34 : return pimpl_->withConversation(conversationId, [](const auto& conversation) {
2954 17 : return conversation.currentCalls();
2955 17 : });
2956 : }
2957 :
2958 : std::shared_ptr<SIPCall>
2959 22 : ConversationModule::call(const std::string& url,
2960 : const std::vector<libjami::MediaMap>& mediaList,
2961 : std::function<void(const std::string&, const DeviceId&, const std::shared_ptr<SIPCall>&)>&& cb)
2962 : {
2963 88 : std::string conversationId = "", confId = "", uri = "", deviceId = "";
2964 22 : if (url.find('/') == std::string::npos) {
2965 13 : conversationId = url;
2966 : } else {
2967 9 : auto parameters = jami::split_string(url, '/');
2968 9 : if (parameters.size() != 4) {
2969 0 : JAMI_ERROR("Incorrect url {:s}", url);
2970 0 : return {};
2971 : }
2972 9 : conversationId = parameters[0];
2973 9 : uri = parameters[1];
2974 9 : deviceId = parameters[2];
2975 9 : confId = parameters[3];
2976 9 : }
2977 :
2978 :
2979 22 : auto conv = pimpl_->getConversation(conversationId);
2980 22 : if (!conv)
2981 0 : return {};
2982 22 : std::unique_lock lk(conv->mtx);
2983 22 : if (!conv->conversation) {
2984 0 : JAMI_ERROR("Conversation {:s} not found", conversationId);
2985 0 : return {};
2986 : }
2987 :
2988 : // Check if we want to join a specific conference
2989 : // So, if confId is specified or if there is some activeCalls
2990 : // or if we are the default host.
2991 22 : auto activeCalls = conv->conversation->currentCalls();
2992 22 : auto infos = conv->conversation->infos();
2993 22 : auto itRdvAccount = infos.find("rdvAccount");
2994 22 : auto itRdvDevice = infos.find("rdvDevice");
2995 22 : auto sendCallRequest = false;
2996 22 : if (!confId.empty()) {
2997 9 : sendCallRequest = true;
2998 27 : JAMI_DEBUG("Calling self, join conference");
2999 13 : } else if (!activeCalls.empty()) {
3000 : // Else, we try to join active calls
3001 0 : sendCallRequest = true;
3002 0 : auto& ac = *activeCalls.rbegin();
3003 0 : confId = ac.at("id");
3004 0 : uri = ac.at("uri");
3005 0 : deviceId = ac.at("device");
3006 16 : } else if (itRdvAccount != infos.end() && itRdvDevice != infos.end()
3007 16 : && !itRdvAccount->second.empty()) {
3008 : // Else, creates "to" (accountId/deviceId/conversationId/confId) and ask remote host
3009 3 : sendCallRequest = true;
3010 3 : uri = itRdvAccount->second;
3011 3 : deviceId = itRdvDevice->second;
3012 3 : confId = "0";
3013 9 : JAMI_DEBUG("Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
3014 : }
3015 22 : lk.unlock();
3016 :
3017 22 : auto account = pimpl_->account_.lock();
3018 22 : std::vector<libjami::MediaMap> mediaMap = mediaList.empty()
3019 : ? MediaAttribute::mediaAttributesToMediaMaps(
3020 41 : pimpl_->account_.lock()->createDefaultMediaList(
3021 60 : pimpl_->account_.lock()->isVideoEnabled()))
3022 41 : : mediaList;
3023 :
3024 44 : if (!sendCallRequest
3025 22 : || (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_)) {
3026 11 : confId = confId == "0" ? Manager::instance().callFactory.getNewCallID() : confId;
3027 : // TODO attach host with media list
3028 11 : hostConference(conversationId, confId, "", mediaMap);
3029 11 : return {};
3030 : }
3031 :
3032 : // Else we need to create a call
3033 11 : auto& manager = Manager::instance();
3034 11 : std::shared_ptr<SIPCall> call = manager.callFactory.newSipCall(account, Call::CallType::OUTGOING, mediaMap);
3035 :
3036 11 : if (not call)
3037 0 : return {};
3038 :
3039 0 : auto callUri = fmt::format("{}/{}/{}/{}", conversationId, uri, deviceId, confId);
3040 11 : account->getIceOptions([call, accountId = account->getAccountID(), callUri, uri = std::move(uri), conversationId, deviceId, cb=std::move(cb)](auto&& opts) {
3041 11 : if (call->isIceEnabled()) {
3042 11 : if (not call->createIceMediaTransport(false)
3043 22 : or not call->initIceMediaTransport(true,
3044 : std::forward<dhtnet::IceTransportOptions>(opts))) {
3045 0 : return;
3046 : }
3047 : }
3048 33 : JAMI_DEBUG("New outgoing call with {}", uri);
3049 11 : call->setPeerNumber(uri);
3050 11 : call->setPeerUri("swarm:" + uri);
3051 :
3052 33 : JAMI_DEBUG("Calling: {:s}", callUri);
3053 11 : call->setState(Call::ConnectionState::TRYING);
3054 11 : call->setPeerNumber(callUri);
3055 11 : call->setPeerUri("rdv:" + callUri);
3056 11 : call->addStateListener([accountId, conversationId](Call::CallState call_state,
3057 : Call::ConnectionState cnx_state,
3058 : int) {
3059 62 : if (cnx_state == Call::ConnectionState::DISCONNECTED
3060 22 : && call_state == Call::CallState::MERROR) {
3061 1 : emitSignal<libjami::ConfigurationSignal::NeedsHost>(accountId, conversationId);
3062 1 : return true;
3063 : }
3064 61 : return true;
3065 : });
3066 11 : cb(callUri, DeviceId(deviceId), call);
3067 : });
3068 :
3069 11 : return call;
3070 22 : }
3071 :
3072 : void
3073 14 : ConversationModule::hostConference(const std::string& conversationId,
3074 : const std::string& confId,
3075 : const std::string& callId,
3076 : const std::vector<libjami::MediaMap>& mediaList)
3077 : {
3078 14 : auto acc = pimpl_->account_.lock();
3079 14 : if (!acc)
3080 0 : return;
3081 14 : auto conf = acc->getConference(confId);
3082 14 : auto createConf = !conf;
3083 14 : std::shared_ptr<SIPCall> call;
3084 14 : if (!callId.empty()) {
3085 3 : call = std::dynamic_pointer_cast<SIPCall>(acc->getCall(callId));
3086 3 : if (!call) {
3087 0 : JAMI_WARNING("No call with id {} found", callId);
3088 0 : return;
3089 : }
3090 : }
3091 14 : if (createConf) {
3092 14 : conf = std::make_shared<Conference>(acc, confId);
3093 14 : acc->attach(conf);
3094 : }
3095 :
3096 14 : if (!callId.empty())
3097 3 : conf->addSubCall(callId);
3098 :
3099 14 : if (callId.empty())
3100 11 : conf->attachHost(mediaList);
3101 :
3102 14 : if (createConf) {
3103 14 : emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(), conversationId, conf->getConfId());
3104 : } else {
3105 0 : conf->reportMediaNegotiationStatus();
3106 0 : emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(),
3107 0 : conf->getConfId(),
3108 : conf->getStateStr());
3109 0 : return;
3110 : }
3111 :
3112 14 : auto conv = pimpl_->getConversation(conversationId);
3113 14 : if (!conv)
3114 0 : return;
3115 14 : std::unique_lock lk(conv->mtx);
3116 14 : if (!conv->conversation) {
3117 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3118 0 : return;
3119 : }
3120 : // Add commit to conversation
3121 14 : Json::Value value;
3122 14 : value["uri"] = pimpl_->username_;
3123 14 : value["device"] = pimpl_->deviceId_;
3124 14 : value["confId"] = conf->getConfId();
3125 14 : value["type"] = "application/call-history+json";
3126 28 : conv->conversation->hostConference(std::move(value),
3127 14 : [w = pimpl_->weak(),
3128 : conversationId](bool ok, const std::string& commitId) {
3129 14 : if (ok) {
3130 14 : if (auto shared = w.lock())
3131 14 : shared->sendMessageNotification(conversationId,
3132 : true,
3133 14 : commitId);
3134 : } else {
3135 0 : JAMI_ERR("Failed to send message to conversation %s",
3136 : conversationId.c_str());
3137 : }
3138 14 : });
3139 :
3140 : // When conf finished = remove host & commit
3141 : // Master call, so when it's stopped, the conference will be stopped (as we use the hold
3142 : // state for detaching the call)
3143 42 : conf->onShutdown(
3144 28 : [w = pimpl_->weak(), accountUri = pimpl_->username_, confId=conf->getConfId(), conversationId, conv](
3145 : int duration) {
3146 14 : auto shared = w.lock();
3147 14 : if (shared) {
3148 11 : Json::Value value;
3149 11 : value["uri"] = accountUri;
3150 11 : value["device"] = shared->deviceId_;
3151 11 : value["confId"] = confId;
3152 11 : value["type"] = "application/call-history+json";
3153 11 : value["duration"] = std::to_string(duration);
3154 :
3155 11 : std::lock_guard lk(conv->mtx);
3156 11 : if (!conv->conversation) {
3157 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3158 0 : return;
3159 : }
3160 33 : conv->conversation->removeActiveConference(
3161 22 : std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
3162 11 : if (ok) {
3163 11 : if (auto shared = w.lock()) {
3164 11 : shared->sendMessageNotification(conversationId, true, commitId);
3165 11 : }
3166 : } else {
3167 0 : JAMI_ERROR("Failed to send message to conversation {}", conversationId);
3168 : }
3169 11 : });
3170 11 : }
3171 14 : });
3172 14 : }
3173 :
3174 : std::map<std::string, ConvInfo>
3175 899 : ConversationModule::convInfos(const std::string& accountId)
3176 : {
3177 1798 : return convInfosFromPath(fileutils::get_data_dir() / accountId);
3178 : }
3179 :
3180 : std::map<std::string, ConvInfo>
3181 940 : ConversationModule::convInfosFromPath(const std::filesystem::path& path)
3182 : {
3183 940 : std::map<std::string, ConvInfo> convInfos;
3184 : try {
3185 : // read file
3186 1880 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
3187 940 : auto file = fileutils::loadFile("convInfo", path);
3188 : // load values
3189 940 : msgpack::unpacked result;
3190 940 : msgpack::unpack(result, (const char*) file.data(), file.size());
3191 937 : result.get().convert(convInfos);
3192 949 : } catch (const std::exception& e) {
3193 3 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3194 3 : }
3195 940 : return convInfos;
3196 0 : }
3197 :
3198 : std::map<std::string, ConversationRequest>
3199 887 : ConversationModule::convRequests(const std::string& accountId)
3200 : {
3201 887 : auto path = fileutils::get_data_dir() / accountId;
3202 1773 : return convRequestsFromPath(path.string());
3203 887 : }
3204 :
3205 : std::map<std::string, ConversationRequest>
3206 928 : ConversationModule::convRequestsFromPath(const std::filesystem::path& path)
3207 : {
3208 928 : std::map<std::string, ConversationRequest> convRequests;
3209 : try {
3210 : // read file
3211 1855 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convRequests"));
3212 928 : auto file = fileutils::loadFile("convRequests", path);
3213 : // load values
3214 928 : msgpack::unpacked result;
3215 928 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
3216 928 : result.get().convert(convRequests);
3217 928 : } catch (const std::exception& e) {
3218 0 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3219 0 : }
3220 928 : return convRequests;
3221 0 : }
3222 :
3223 : void
3224 181 : ConversationModule::addConvInfo(const ConvInfo& info)
3225 : {
3226 181 : pimpl_->addConvInfo(info);
3227 181 : }
3228 :
3229 : void
3230 2176 : ConversationModule::Impl::setConversationMembers(const std::string& convId,
3231 : const std::set<std::string>& members)
3232 : {
3233 2176 : if (auto conv = getConversation(convId)) {
3234 2176 : std::lock_guard lk(conv->mtx);
3235 2176 : conv->info.members = members;
3236 2176 : addConvInfo(conv->info);
3237 4352 : }
3238 2176 : }
3239 :
3240 : std::shared_ptr<Conversation>
3241 0 : ConversationModule::getConversation(const std::string& convId)
3242 : {
3243 0 : if (auto conv = pimpl_->getConversation(convId)) {
3244 0 : std::lock_guard lk(conv->mtx);
3245 0 : return conv->conversation;
3246 0 : }
3247 0 : return nullptr;
3248 : }
3249 :
3250 : std::shared_ptr<dhtnet::ChannelSocket>
3251 5954 : ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const
3252 : {
3253 5954 : if (auto conv = pimpl_->getConversation(convId)) {
3254 5952 : std::lock_guard lk(conv->mtx);
3255 5953 : if (conv->conversation)
3256 11120 : return conv->conversation->gitSocket(DeviceId(deviceId));
3257 393 : else if (conv->pending)
3258 393 : return conv->pending->socket;
3259 11906 : }
3260 1 : return nullptr;
3261 : }
3262 :
3263 : void
3264 0 : ConversationModule::addGitSocket(std::string_view deviceId,
3265 : std::string_view convId,
3266 : const std::shared_ptr<dhtnet::ChannelSocket>& channel)
3267 : {
3268 0 : if (auto conv = pimpl_->getConversation(convId)) {
3269 0 : std::lock_guard lk(conv->mtx);
3270 0 : conv->conversation->addGitSocket(DeviceId(deviceId), channel);
3271 0 : } else
3272 0 : JAMI_WARNING("addGitSocket: Unable to find conversation {:s}", convId);
3273 0 : }
3274 :
3275 : void
3276 812 : ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
3277 : {
3278 1603 : pimpl_->withConversation(convId, [&](auto& conv) { conv.removeGitSocket(DeviceId(deviceId)); });
3279 812 : }
3280 :
3281 : void
3282 724 : ConversationModule::shutdownConnections()
3283 : {
3284 1144 : for (const auto& c : pimpl_->getSyncedConversations()) {
3285 420 : std::lock_guard lkc(c->mtx);
3286 420 : if (c->conversation)
3287 385 : c->conversation->shutdownConnections();
3288 420 : if (c->pending)
3289 16 : c->pending->socket = {};
3290 1144 : }
3291 724 : }
3292 : void
3293 1003 : ConversationModule::addSwarmChannel(const std::string& conversationId,
3294 : std::shared_ptr<dhtnet::ChannelSocket> channel)
3295 : {
3296 1003 : pimpl_->withConversation(conversationId,
3297 998 : [&](auto& conv) { conv.addSwarmChannel(std::move(channel)); });
3298 1003 : }
3299 :
3300 : void
3301 0 : ConversationModule::connectivityChanged()
3302 : {
3303 0 : for (const auto& conv : pimpl_->getConversations())
3304 0 : conv->connectivityChanged();
3305 0 : }
3306 :
3307 : std::shared_ptr<Typers>
3308 9 : ConversationModule::getTypers(const std::string& convId)
3309 : {
3310 9 : if (auto c = pimpl_->getConversation(convId)) {
3311 9 : std::lock_guard lk(c->mtx);
3312 9 : if (c->conversation)
3313 9 : return c->conversation->typers();
3314 18 : }
3315 0 : return nullptr;
3316 : }
3317 :
3318 : } // namespace jami
|