Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "conversation_module.h"
19 :
20 : #include "account_const.h"
21 : #include "call.h"
22 : #include "client/ring_signal.h"
23 : #include "fileutils.h"
24 : #include "jamidht/account_manager.h"
25 : #include "jamidht/jamiaccount.h"
26 : #include "manager.h"
27 : #include "sip/sipcall.h"
28 : #include "vcard.h"
29 : #include "json_utils.h"
30 :
31 : #include <opendht/thread_pool.h>
32 : #include <dhtnet/certstore.h>
33 :
34 : #include <algorithm>
35 : #include <fstream>
36 :
37 : namespace jami {
38 :
39 : using ConvInfoMap = std::map<std::string, ConvInfo>;
40 :
41 : struct PendingConversationFetch
42 : {
43 : bool ready {false};
44 : bool cloning {false};
45 : std::string deviceId {};
46 : std::string removeId {};
47 : std::map<std::string, std::string> preferences {};
48 : std::map<std::string, std::map<std::string, std::string>> status {};
49 : std::set<std::string> connectingTo {};
50 : std::shared_ptr<dhtnet::ChannelSocket> socket {};
51 : };
52 :
53 : constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s};
54 :
55 : struct SyncedConversation
56 : {
57 : std::mutex mtx;
58 : std::unique_ptr<asio::steady_timer> fallbackClone;
59 : std::chrono::seconds fallbackTimer {5s};
60 : ConvInfo info;
61 : std::unique_ptr<PendingConversationFetch> pending;
62 : std::shared_ptr<Conversation> conversation;
63 :
64 277 : SyncedConversation(const std::string& convId)
65 277 : : info {convId}
66 : {
67 277 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
68 277 : }
69 20 : SyncedConversation(const ConvInfo& info)
70 20 : : info {info}
71 : {
72 20 : fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
73 20 : }
74 :
75 2390 : bool startFetch(const std::string& deviceId, bool checkIfConv = false)
76 : {
77 : // conversation mtx must be locked
78 2390 : if (checkIfConv && conversation)
79 7 : return false; // Already a conversation
80 2383 : if (pending) {
81 352 : if (pending->ready)
82 9 : return false; // Already doing stuff
83 : // if (pending->deviceId == deviceId)
84 : // return false; // Already fetching
85 343 : if (pending->connectingTo.find(deviceId) != pending->connectingTo.end())
86 186 : return false; // Already connecting to this device
87 : } else {
88 2031 : pending = std::make_unique<PendingConversationFetch>();
89 2031 : pending->connectingTo.insert(deviceId);
90 2031 : return true;
91 : }
92 157 : return true;
93 : }
94 :
95 1425 : void stopFetch(const std::string& deviceId)
96 : {
97 : // conversation mtx must be locked
98 1425 : if (!pending)
99 70 : return;
100 1355 : pending->connectingTo.erase(deviceId);
101 1355 : if (pending->connectingTo.empty())
102 1291 : pending.reset();
103 : }
104 :
105 96 : std::vector<std::map<std::string, std::string>> getMembers(bool includeLeft,
106 : bool includeBanned) const
107 : {
108 : // conversation mtx must be locked
109 96 : if (conversation)
110 80 : return conversation->getMembers(true, includeLeft, includeBanned);
111 : // If we're cloning, we can return the initial members
112 16 : std::vector<std::map<std::string, std::string>> result;
113 16 : result.reserve(info.members.size());
114 44 : for (const auto& uri : info.members) {
115 56 : result.emplace_back(std::map<std::string, std::string> {{"uri", uri}});
116 : }
117 16 : return result;
118 16 : }
119 : };
120 :
121 : class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
122 : {
123 : public:
124 : Impl(std::shared_ptr<JamiAccount>&& account,
125 : std::shared_ptr<AccountManager>&& accountManager,
126 : NeedsSyncingCb&& needsSyncingCb,
127 : SengMsgCb&& sendMsgCb,
128 : NeedSocketCb&& onNeedSocket,
129 : NeedSocketCb&& onNeedSwarmSocket,
130 : OneToOneRecvCb&& oneToOneRecvCb);
131 :
132 : template<typename S, typename T>
133 96 : inline auto withConv(const S& convId, T&& cb) const
134 : {
135 192 : if (auto conv = getConversation(convId)) {
136 96 : std::lock_guard lk(conv->mtx);
137 96 : return cb(*conv);
138 96 : } else {
139 0 : JAMI_WARNING("Conversation {} not found", convId);
140 : }
141 0 : return decltype(cb(std::declval<SyncedConversation&>()))();
142 : }
143 : template<typename S, typename T>
144 1562 : inline auto withConversation(const S& convId, T&& cb)
145 : {
146 3124 : if (auto conv = getConversation(convId)) {
147 1559 : std::lock_guard lk(conv->mtx);
148 1559 : if (conv->conversation)
149 1544 : return cb(*conv->conversation);
150 1559 : } else {
151 9 : JAMI_WARNING("Conversation {} not found", convId);
152 : }
153 18 : return decltype(cb(std::declval<Conversation&>()))();
154 : }
155 :
156 : // Retrieving recent commits
157 : /**
158 : * Clone a conversation (initial) from device
159 : * @param deviceId
160 : * @param convId
161 : */
162 : void cloneConversation(const std::string& deviceId,
163 : const std::string& peer,
164 : const std::string& convId);
165 : void cloneConversation(const std::string& deviceId,
166 : const std::string& peer,
167 : const std::shared_ptr<SyncedConversation>& conv);
168 :
169 : /**
170 : * Pull remote device
171 : * @param peer Contact URI
172 : * @param deviceId Contact's device
173 : * @param conversationId
174 : * @param commitId (optional)
175 : */
176 : void fetchNewCommits(const std::string& peer,
177 : const std::string& deviceId,
178 : const std::string& conversationId,
179 : const std::string& commitId = "");
180 : /**
181 : * Handle events to receive new commits
182 : */
183 : void handlePendingConversation(const std::string& conversationId, const std::string& deviceId);
184 :
185 : // Requests
186 : std::optional<ConversationRequest> getRequest(const std::string& id) const;
187 :
188 : // Conversations
189 : /**
190 : * Get members
191 : * @param conversationId
192 : * @param includeBanned
193 : * @return a map of members with their role and details
194 : */
195 : std::vector<std::map<std::string, std::string>> getConversationMembers(
196 : const std::string& conversationId, bool includeBanned = false) const;
197 : void setConversationMembers(const std::string& convId, const std::set<std::string>& members);
198 :
199 : /**
200 : * Remove a repository and all files
201 : * @param convId
202 : * @param sync If we send an update to other account's devices
203 : * @param force True if ignore the removing flag
204 : */
205 : void removeRepository(const std::string& convId, bool sync, bool force = false);
206 : void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false);
207 : /**
208 : * Remove a conversation
209 : * @param conversationId
210 : */
211 : bool removeConversation(const std::string& conversationId);
212 : bool removeConversationImpl(SyncedConversation& conv);
213 :
214 : /**
215 : * Send a message notification to all members
216 : * @param conversation
217 : * @param commit
218 : * @param sync If we send an update to other account's devices
219 : * @param deviceId If we need to filter a specific device
220 : */
221 : void sendMessageNotification(const std::string& conversationId,
222 : bool sync,
223 : const std::string& commitId = "",
224 : const std::string& deviceId = "");
225 : void sendMessageNotification(Conversation& conversation,
226 : bool sync,
227 : const std::string& commitId = "",
228 : const std::string& deviceId = "");
229 :
230 : /**
231 : * @return if a convId is a valid conversation (repository cloned & usable)
232 : */
233 445 : bool isConversation(const std::string& convId) const
234 : {
235 445 : std::lock_guard lk(conversationsMtx_);
236 445 : auto c = conversations_.find(convId);
237 890 : return c != conversations_.end() && c->second;
238 445 : }
239 :
240 2361 : void addConvInfo(const ConvInfo& info)
241 : {
242 2361 : std::lock_guard lk(convInfosMtx_);
243 2361 : convInfos_[info.id] = info;
244 2361 : saveConvInfos();
245 2361 : }
246 :
247 : std::string getOneToOneConversation(const std::string& uri) const noexcept;
248 :
249 : bool updateConvForContact(const std::string& uri,
250 : const std::string& oldConv,
251 : const std::string& newConv);
252 :
253 96 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const
254 : {
255 96 : std::lock_guard lk(conversationsMtx_);
256 96 : auto c = conversations_.find(convId);
257 192 : return c != conversations_.end() ? c->second : nullptr;
258 96 : }
259 27546 : std::shared_ptr<SyncedConversation> getConversation(std::string_view convId)
260 : {
261 27546 : std::lock_guard lk(conversationsMtx_);
262 27546 : auto c = conversations_.find(convId);
263 55092 : return c != conversations_.end() ? c->second : nullptr;
264 27546 : }
265 297 : std::shared_ptr<SyncedConversation> startConversation(const std::string& convId)
266 : {
267 297 : std::lock_guard lk(conversationsMtx_);
268 297 : auto& c = conversations_[convId];
269 297 : if (!c)
270 269 : c = std::make_shared<SyncedConversation>(convId);
271 594 : return c;
272 297 : }
273 78 : std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info)
274 : {
275 78 : std::lock_guard lk(conversationsMtx_);
276 78 : auto& c = conversations_[info.id];
277 78 : if (!c)
278 8 : c = std::make_shared<SyncedConversation>(info);
279 156 : return c;
280 78 : }
281 1812 : std::vector<std::shared_ptr<SyncedConversation>> getSyncedConversations() const
282 : {
283 1812 : std::lock_guard lk(conversationsMtx_);
284 1812 : std::vector<std::shared_ptr<SyncedConversation>> result;
285 1812 : result.reserve(conversations_.size());
286 2757 : for (const auto& [_, c] : conversations_)
287 945 : result.emplace_back(c);
288 3624 : return result;
289 1812 : }
290 329 : std::vector<std::shared_ptr<Conversation>> getConversations() const
291 : {
292 329 : std::lock_guard lk(conversationsMtx_);
293 329 : std::vector<std::shared_ptr<Conversation>> result;
294 329 : result.reserve(conversations_.size());
295 521 : for (const auto& [_, sc] : conversations_) {
296 192 : if (auto c = sc->conversation)
297 192 : result.emplace_back(std::move(c));
298 : }
299 658 : return result;
300 329 : }
301 :
302 : // Message send/load
303 : void sendMessage(const std::string& conversationId,
304 : Json::Value&& value,
305 : const std::string& replyTo = "",
306 : bool announce = true,
307 : OnCommitCb&& onCommit = {},
308 : OnDoneCb&& cb = {});
309 :
310 : void sendMessage(const std::string& conversationId,
311 : std::string message,
312 : const std::string& replyTo = "",
313 : const std::string& type = "text/plain",
314 : bool announce = true,
315 : OnCommitCb&& onCommit = {},
316 : OnDoneCb&& cb = {});
317 :
318 : void editMessage(const std::string& conversationId,
319 : const std::string& newBody,
320 : const std::string& editedId);
321 :
322 : void bootstrapCb(std::string convId);
323 :
324 : // The following methods modify what is stored on the disk
325 : /**
326 : * @note convInfosMtx_ should be locked
327 : */
328 2948 : void saveConvInfos() const { ConversationModule::saveConvInfos(accountId_, convInfos_); }
329 : /**
330 : * @note conversationsRequestsMtx_ should be locked
331 : */
332 404 : void saveConvRequests() const
333 : {
334 404 : ConversationModule::saveConvRequests(accountId_, conversationsRequests_);
335 404 : }
336 : void declineOtherConversationWith(const std::string& uri);
337 211 : bool addConversationRequest(const std::string& id, const ConversationRequest& req)
338 : {
339 : // conversationsRequestsMtx_ MUST BE LOCKED
340 211 : if (isConversation(id))
341 2 : return false;
342 209 : auto it = conversationsRequests_.find(id);
343 209 : if (it != conversationsRequests_.end()) {
344 : // We only remove requests (if accepted) or change .declined
345 39 : if (!req.declined)
346 35 : return false;
347 170 : } else if (req.isOneToOne()) {
348 : // Check that we're not adding a second one to one trust request
349 : // NOTE: If a new one to one request is received, we can decline the previous one.
350 62 : declineOtherConversationWith(req.from);
351 : }
352 522 : JAMI_DEBUG("[Account {}] [Conversation {}] Adding conversation request from {}", accountId_, id, req.from);
353 174 : conversationsRequests_[id] = req;
354 174 : saveConvRequests();
355 174 : return true;
356 : }
357 220 : void rmConversationRequest(const std::string& id)
358 : {
359 : // conversationsRequestsMtx_ MUST BE LOCKED
360 220 : auto it = conversationsRequests_.find(id);
361 220 : if (it != conversationsRequests_.end()) {
362 145 : auto& md = syncingMetadatas_[id];
363 145 : md = it->second.metadatas;
364 145 : md["syncing"] = "true";
365 145 : md["created"] = std::to_string(it->second.received);
366 : }
367 220 : saveMetadata();
368 220 : conversationsRequests_.erase(id);
369 220 : saveConvRequests();
370 220 : }
371 :
372 : std::weak_ptr<JamiAccount> account_;
373 : std::shared_ptr<AccountManager> accountManager_;
374 : const std::string accountId_ {};
375 : NeedsSyncingCb needsSyncingCb_;
376 : SengMsgCb sendMsgCb_;
377 : NeedSocketCb onNeedSocket_;
378 : NeedSocketCb onNeedSwarmSocket_;
379 : OneToOneRecvCb oneToOneRecvCb_;
380 :
381 : std::string deviceId_ {};
382 : std::string username_ {};
383 :
384 : // Requests
385 : mutable std::mutex conversationsRequestsMtx_;
386 : std::map<std::string, ConversationRequest> conversationsRequests_;
387 :
388 : // Conversations
389 : mutable std::mutex conversationsMtx_ {};
390 : std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_;
391 :
392 : // The following information are stored on the disk
393 : mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed
394 : std::map<std::string, ConvInfo> convInfos_;
395 :
396 : // When sending a new message, we need to send the notification to some peers of the
397 : // conversation However, the conversation may be not bootstraped, so the list will be empty.
398 : // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync
399 : // with.
400 : std::mutex notSyncedNotificationMtx_;
401 : std::map<std::string, std::string> notSyncedNotification_;
402 :
403 3099 : std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
404 :
405 : // Replay conversations (after erasing/re-adding)
406 : std::mutex replayMtx_;
407 : std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_;
408 : std::map<std::string, uint64_t> refreshMessage;
409 : std::atomic_int syncCnt {0};
410 :
411 : #ifdef LIBJAMI_TEST
412 : std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_;
413 : #endif
414 :
415 : void fixStructures(
416 : std::shared_ptr<JamiAccount> account,
417 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
418 : const std::set<std::string>& toRm);
419 :
420 : void cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
421 : const std::string& deviceId,
422 : const std::string& oldConvId = "");
423 : void bootstrap(const std::string& convId);
424 : void fallbackClone(const asio::error_code& ec, const std::string& conversationId);
425 : void cloneConversationFrom(const std::string& conversationId,
426 : const std::string& uri,
427 : const std::string& oldConvId = "");
428 :
429 : // While syncing, we do not want to lose metadata (avatar/title and mode)
430 : std::map<std::string, std::map<std::string, std::string>> syncingMetadatas_;
431 379 : void saveMetadata()
432 : {
433 379 : auto path = fileutils::get_data_dir() / accountId_;
434 758 : std::ofstream file(path / "syncingMetadatas", std::ios::trunc | std::ios::binary);
435 379 : msgpack::pack(file, syncingMetadatas_);
436 379 : }
437 :
438 568 : void loadMetadata()
439 : {
440 : try {
441 : // read file
442 568 : auto path = fileutils::get_data_dir() / accountId_;
443 1136 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
444 1136 : auto file = fileutils::loadFile("syncingMetadatas", path);
445 : // load values
446 0 : msgpack::unpacked result;
447 0 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
448 0 : result.get().convert(syncingMetadatas_);
449 1704 : } catch (const std::exception& e) {
450 1704 : JAMI_WARNING("[Account {}] [ConversationModule] unable to load syncing metadata: {}",
451 : accountId_,
452 : e.what());
453 568 : }
454 568 : }
455 : };
456 :
457 568 : ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account,
458 : std::shared_ptr<AccountManager>&& accountManager,
459 : NeedsSyncingCb&& needsSyncingCb,
460 : SengMsgCb&& sendMsgCb,
461 : NeedSocketCb&& onNeedSocket,
462 : NeedSocketCb&& onNeedSwarmSocket,
463 568 : OneToOneRecvCb&& oneToOneRecvCb)
464 568 : : account_(account)
465 568 : , accountManager_(accountManager)
466 568 : , accountId_(account->getAccountID())
467 568 : , needsSyncingCb_(needsSyncingCb)
468 568 : , sendMsgCb_(sendMsgCb)
469 568 : , onNeedSocket_(onNeedSocket)
470 568 : , onNeedSwarmSocket_(onNeedSwarmSocket)
471 1136 : , oneToOneRecvCb_(oneToOneRecvCb)
472 : {
473 568 : if (auto accm = account->accountManager())
474 568 : if (const auto* info = accm->getInfo()) {
475 568 : deviceId_ = info->deviceId;
476 568 : username_ = info->accountId;
477 568 : }
478 568 : conversationsRequests_ = convRequests(accountId_);
479 568 : loadMetadata();
480 568 : }
481 :
482 : void
483 10 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
484 : const std::string& peerUri,
485 : const std::string& convId)
486 : {
487 30 : JAMI_DEBUG("[Account {}] [Conversation {}] [device {}] Cloning conversation", accountId_, convId, deviceId);
488 :
489 10 : auto conv = startConversation(convId);
490 10 : std::unique_lock lk(conv->mtx);
491 10 : cloneConversation(deviceId, peerUri, conv);
492 10 : }
493 :
494 : void
495 57 : ConversationModule::Impl::cloneConversation(const std::string& deviceId,
496 : const std::string& peerUri,
497 : const std::shared_ptr<SyncedConversation>& conv)
498 : {
499 : // conv->mtx must be locked
500 57 : if (!conv->conversation) {
501 : // Note: here we don't return and connect to all members
502 : // the first that will successfully connect will be used for
503 : // cloning.
504 : // This avoid the case when we try to clone from convInfos + sync message
505 : // at the same time.
506 57 : if (!conv->startFetch(deviceId, true)) {
507 69 : JAMI_WARNING("[Account {}] [Conversation {}] [device {}] Already fetching conversation", accountId_, conv->info.id, deviceId);
508 23 : addConvInfo(conv->info);
509 23 : return;
510 : }
511 68 : onNeedSocket_(
512 34 : conv->info.id,
513 : deviceId,
514 34 : [w = weak(), conv, deviceId](const auto& channel) {
515 34 : std::lock_guard lk(conv->mtx);
516 34 : if (conv->pending && !conv->pending->ready) {
517 34 : if (channel) {
518 34 : conv->pending->ready = true;
519 34 : conv->pending->deviceId = channel->deviceId().toString();
520 34 : conv->pending->socket = channel;
521 34 : if (!conv->pending->cloning) {
522 34 : conv->pending->cloning = true;
523 68 : dht::ThreadPool::io().run(
524 68 : [w, convId = conv->info.id, deviceId = conv->pending->deviceId]() {
525 68 : if (auto sthis = w.lock())
526 34 : sthis->handlePendingConversation(convId, deviceId);
527 : });
528 : }
529 34 : return true;
530 : } else {
531 0 : conv->stopFetch(deviceId);
532 : }
533 : }
534 0 : return false;
535 34 : },
536 : MIME_TYPE_GIT);
537 :
538 102 : JAMI_LOG("[Account {}] [Conversation {}] [device {}] Requesting device",
539 : accountId_,
540 : conv->info.id,
541 : deviceId);
542 34 : conv->info.members.emplace(username_);
543 34 : conv->info.members.emplace(peerUri);
544 34 : addConvInfo(conv->info);
545 : } else {
546 0 : JAMI_DEBUG("[Account {}] [Conversation {}] Conversation already cloned", accountId_, conv->info.id);
547 : }
548 : }
549 :
550 : void
551 13355 : ConversationModule::Impl::fetchNewCommits(const std::string& peer,
552 : const std::string& deviceId,
553 : const std::string& conversationId,
554 : const std::string& commitId)
555 : {
556 : {
557 13355 : std::lock_guard lk(convInfosMtx_);
558 13355 : auto itConv = convInfos_.find(conversationId);
559 13355 : if (itConv != convInfos_.end() && itConv->second.isRemoved()) {
560 : // If the conversation is removed and we receives a new commit,
561 : // it means that the contact was removed but not banned.
562 : // If he wants a new conversation, they must removes/re-add the contact who declined.
563 12 : JAMI_WARNING("[Account {:s}] [Conversation {}] Received a commit, but conversation is removed",
564 : accountId_,
565 : conversationId);
566 4 : return;
567 : }
568 13355 : }
569 13351 : std::optional<ConversationRequest> oldReq;
570 : {
571 13351 : std::lock_guard lk(conversationsRequestsMtx_);
572 13351 : oldReq = getRequest(conversationId);
573 13351 : if (oldReq != std::nullopt && oldReq->declined) {
574 0 : JAMI_DEBUG("[Account {}] [Conversation {}] Received a request for a conversation already declined.",
575 : accountId_, conversationId);
576 0 : return;
577 : }
578 13351 : }
579 40053 : JAMI_DEBUG("[Account {:s}] [Conversation {}] [device {}] fetching '{:s}'",
580 : accountId_,
581 : conversationId,
582 : deviceId,
583 : commitId);
584 :
585 13351 : const bool shouldRequestInvite = username_ != peer;
586 13351 : auto conv = getConversation(conversationId);
587 13351 : if (!conv) {
588 115 : if (oldReq == std::nullopt && shouldRequestInvite) {
589 : // We didn't find a conversation or a request with the given ID.
590 : // This suggests that someone tried to send us an invitation but
591 : // that we didn't receive it, so we ask for a new one.
592 318 : JAMI_WARNING("[Account {}] [Conversation {}] Unable to find conversation, asking for an invite",
593 : accountId_,
594 : conversationId);
595 106 : sendMsgCb_(peer,
596 : {},
597 318 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
598 : 0);
599 : }
600 115 : return;
601 : }
602 13236 : std::unique_lock lk(conv->mtx);
603 :
604 13236 : if (conv->conversation) {
605 : // Check if we already have the commit
606 13189 : if (not commitId.empty() && conv->conversation->getCommit(commitId) != std::nullopt) {
607 11156 : return;
608 : }
609 2167 : if (conv->conversation->isRemoving()) {
610 0 : JAMI_WARNING("[Account {}] [Conversation {}] conversaton is being removed",
611 : accountId_,
612 : conversationId);
613 0 : return;
614 : }
615 2167 : if (!conv->conversation->isMember(peer, true)) {
616 3 : JAMI_WARNING("[Account {}] [Conversation {}] {} is not a membe", accountId_, conversationId, peer);
617 1 : return;
618 : }
619 2166 : if (conv->conversation->isBanned(deviceId)) {
620 0 : JAMI_WARNING("[Account {}] [Conversation {}] device {} is banned",
621 : accountId_,
622 : conversationId,
623 : deviceId);
624 0 : return;
625 : }
626 :
627 : // Retrieve current last message
628 2166 : auto lastMessageId = conv->conversation->lastCommitId();
629 2166 : if (lastMessageId.empty()) {
630 0 : JAMI_ERROR("[Account {}] [Conversation {}] No message detected. This is a bug", accountId_, conversationId);
631 0 : return;
632 : }
633 :
634 2166 : if (!conv->startFetch(deviceId)) {
635 399 : JAMI_WARNING("[Account {}] [Conversation {}] Already fetching", accountId_, conversationId);
636 133 : return;
637 : }
638 :
639 2033 : syncCnt.fetch_add(1);
640 6099 : onNeedSocket_(
641 : conversationId,
642 : deviceId,
643 3398 : [w = weak(),
644 : conv,
645 : conversationId,
646 2033 : peer = std::move(peer),
647 : deviceId,
648 2033 : commitId = std::move(commitId)](const auto& channel) {
649 3398 : auto sthis = w.lock();
650 3398 : auto acc = sthis ? sthis->account_.lock() : nullptr;
651 3398 : std::unique_lock lk(conv->mtx);
652 3398 : auto conversation = conv->conversation;
653 3398 : if (!channel || !acc || !conversation) {
654 1416 : conv->stopFetch(deviceId);
655 1416 : if (sthis)
656 1416 : sthis->syncCnt.fetch_sub(1);
657 1416 : return false;
658 : }
659 1982 : conversation->addGitSocket(channel->deviceId(), channel);
660 1982 : lk.unlock();
661 7928 : conversation->sync(
662 1982 : peer,
663 1982 : deviceId,
664 1982 : [w,
665 1982 : conv,
666 1982 : conversationId = std::move(conversationId),
667 1982 : peer,
668 1982 : deviceId,
669 1982 : commitId](bool ok) {
670 1982 : auto shared = w.lock();
671 1982 : if (!shared)
672 0 : return;
673 1982 : if (!ok) {
674 1962 : JAMI_WARNING("[Account {}] [Conversation {}] Unable to fetch new commit from "
675 : "{}, other peer may be disconnected",
676 : shared->accountId_,
677 : conversationId,
678 : deviceId);
679 1962 : JAMI_LOG("[Account {}] [Conversation {}] Relaunch sync with {}",
680 : shared->accountId_,
681 : conversationId,
682 : deviceId);
683 : }
684 :
685 : {
686 1982 : std::lock_guard lk(conv->mtx);
687 1982 : conv->pending.reset();
688 : // Notify peers that a new commit is there (DRT)
689 1982 : if (not commitId.empty() && ok) {
690 1916 : shared->sendMessageNotification(*conv->conversation,
691 : false,
692 958 : commitId,
693 958 : deviceId);
694 : }
695 1982 : }
696 3964 : if (shared->syncCnt.fetch_sub(1) == 1) {
697 211 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(
698 211 : shared->accountId_);
699 : }
700 1982 : },
701 1982 : commitId);
702 1982 : return true;
703 3398 : },
704 : "");
705 2166 : } else {
706 47 : if (oldReq != std::nullopt)
707 0 : return;
708 47 : if (conv->pending)
709 32 : return;
710 15 : bool clone = !conv->info.isRemoved();
711 15 : if (clone) {
712 15 : cloneConversation(deviceId, peer, conv);
713 15 : return;
714 : }
715 0 : if (!shouldRequestInvite)
716 0 : return;
717 0 : lk.unlock();
718 0 : JAMI_WARNING("[Account {}] [Conversation {}] Unable to find conversation, asking for an invite",
719 : accountId_,
720 : conversationId);
721 0 : sendMsgCb_(peer,
722 : {},
723 0 : std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
724 : 0);
725 : }
726 35872 : }
727 :
728 : // Clone and store conversation
729 : void
730 144 : ConversationModule::Impl::handlePendingConversation(const std::string& conversationId,
731 : const std::string& deviceId)
732 : {
733 144 : auto acc = account_.lock();
734 144 : if (!acc)
735 0 : return;
736 144 : std::vector<DeviceId> kd;
737 : {
738 144 : std::unique_lock lk(conversationsMtx_);
739 144 : const auto& devices = accountManager_->getKnownDevices();
740 144 : kd.reserve(devices.size());
741 316 : for (const auto& [id, _] : devices)
742 172 : kd.emplace_back(id);
743 144 : }
744 144 : auto conv = getConversation(conversationId);
745 144 : if (!conv)
746 0 : return;
747 144 : std::unique_lock lk(conv->mtx, std::defer_lock);
748 287 : auto erasePending = [&] {
749 287 : std::string toRm;
750 287 : if (conv->pending && !conv->pending->removeId.empty())
751 0 : toRm = std::move(conv->pending->removeId);
752 287 : conv->pending.reset();
753 287 : lk.unlock();
754 287 : if (!toRm.empty())
755 0 : removeConversation(toRm);
756 287 : };
757 : try {
758 144 : auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
759 143 : conversation->onMembersChanged([w = weak_from_this(), conversationId](const auto& members) {
760 : // Delay in another thread to avoid deadlocks
761 2746 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
762 2746 : if (auto sthis = w.lock())
763 1373 : sthis->setConversationMembers(conversationId, members);
764 : });
765 1373 : });
766 143 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
767 458 : auto msg = std::make_shared<SyncMsg>();
768 916 : msg->ms = {{conversationId, status}};
769 458 : needsSyncingCb_(std::move(msg));
770 458 : });
771 143 : conversation->onNeedSocket(onNeedSwarmSocket_);
772 143 : if (!conversation->isMember(username_, true)) {
773 0 : JAMI_ERROR("[Account {}] [Conversation {}] Conversation cloned but we do not seem to be a valid member",
774 : accountId_,
775 : conversationId);
776 0 : conversation->erase();
777 0 : lk.lock();
778 0 : erasePending();
779 0 : return;
780 : }
781 :
782 : // Make sure that the list of members stored in convInfos_ matches the
783 : // one from the conversation's repository.
784 : // (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1026)
785 143 : setConversationMembers(conversationId, conversation->memberUris("", {}));
786 :
787 143 : lk.lock();
788 :
789 143 : if (conv->pending && conv->pending->socket)
790 143 : conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket));
791 143 : auto removeRepo = false;
792 : // Note: a removeContact while cloning. In this case, the conversation
793 : // must not be announced and removed.
794 143 : if (conv->info.isRemoved())
795 0 : removeRepo = true;
796 143 : std::map<std::string, std::string> preferences;
797 143 : std::map<std::string, std::map<std::string, std::string>> status;
798 143 : if (conv->pending) {
799 143 : preferences = std::move(conv->pending->preferences);
800 143 : status = std::move(conv->pending->status);
801 : }
802 143 : conv->conversation = conversation;
803 143 : if (removeRepo) {
804 0 : removeRepositoryImpl(*conv, false, true);
805 0 : erasePending();
806 0 : return;
807 : }
808 :
809 143 : auto commitId = conversation->join();
810 143 : std::vector<std::map<std::string, std::string>> messages;
811 : {
812 143 : std::lock_guard lk(replayMtx_);
813 143 : auto replayIt = replay_.find(conversationId);
814 143 : if (replayIt != replay_.end()) {
815 0 : messages = std::move(replayIt->second);
816 0 : replay_.erase(replayIt);
817 : }
818 143 : }
819 143 : if (!commitId.empty())
820 121 : sendMessageNotification(*conversation, false, commitId);
821 143 : erasePending(); // Will unlock
822 :
823 : #ifdef LIBJAMI_TEST
824 143 : conversation->onBootstrapStatus(bootstrapCbTest_);
825 : #endif
826 286 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
827 143 : this,
828 286 : conversation->id()),
829 : kd);
830 :
831 143 : if (!preferences.empty())
832 0 : conversation->updatePreferences(preferences);
833 143 : if (!status.empty())
834 18 : conversation->updateMessageStatus(status);
835 143 : syncingMetadatas_.erase(conversationId);
836 143 : saveMetadata();
837 :
838 : // Inform user that the conversation is ready
839 143 : emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
840 143 : needsSyncingCb_({});
841 143 : std::vector<Json::Value> values;
842 143 : values.reserve(messages.size());
843 143 : for (const auto& message : messages) {
844 : // For now, only replay text messages.
845 : // File transfers will need more logic, and don't care about calls for now.
846 0 : if (message.at("type") == "text/plain" && message.at("author") == username_) {
847 0 : Json::Value json;
848 0 : json["body"] = message.at("body");
849 0 : json["type"] = "text/plain";
850 0 : values.emplace_back(std::move(json));
851 0 : }
852 : }
853 143 : if (!values.empty())
854 0 : conversation->sendMessages(std::move(values),
855 0 : [w = weak(), conversationId](const auto& commits) {
856 0 : auto shared = w.lock();
857 0 : if (shared and not commits.empty())
858 0 : shared->sendMessageNotification(conversationId,
859 : true,
860 0 : *commits.rbegin());
861 0 : });
862 : // Download members profile on first sync
863 143 : auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE;
864 143 : auto askForProfile = isOneOne;
865 143 : if (!isOneOne) {
866 : // If not 1:1 only download profiles from self (to avoid non checked files)
867 102 : auto cert = acc->certStore().getCertificate(deviceId);
868 102 : askForProfile = cert && cert->issuer && cert->issuer->getId().toString() == username_;
869 102 : }
870 143 : if (askForProfile) {
871 90 : for (const auto& member : conversation->memberUris(username_)) {
872 39 : acc->askForProfile(conversationId, deviceId, member);
873 51 : }
874 : }
875 144 : } catch (const std::exception& e) {
876 3 : JAMI_WARNING("[Account {}] [Conversation {}] Something went wrong when cloning conversation: {}. Re-clone in {}s",
877 : accountId_,
878 : conversationId,
879 : e.what(),
880 : conv->fallbackTimer.count());
881 1 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
882 1 : conv->fallbackTimer *= 2;
883 1 : if (conv->fallbackTimer > MAX_FALLBACK)
884 0 : conv->fallbackTimer = MAX_FALLBACK;
885 2 : conv->fallbackClone->async_wait(std::bind(&ConversationModule::Impl::fallbackClone,
886 2 : shared_from_this(),
887 : std::placeholders::_1,
888 : conversationId));
889 1 : }
890 144 : lk.lock();
891 144 : erasePending();
892 144 : }
893 :
894 : std::optional<ConversationRequest>
895 13664 : ConversationModule::Impl::getRequest(const std::string& id) const
896 : {
897 : // ConversationsRequestsMtx MUST BE LOCKED
898 13664 : auto it = conversationsRequests_.find(id);
899 13664 : if (it != conversationsRequests_.end())
900 174 : return it->second;
901 13490 : return std::nullopt;
902 : }
903 :
904 : std::string
905 576 : ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
906 : {
907 576 : if (auto details = accountManager_->getContactInfo(uri)) {
908 : // If contact is removed there is no conversation
909 : // If banned, conversation is still on disk
910 266 : if (details->removed != 0 && details->banned == 0) {
911 : // Check if contact is removed
912 18 : if (details->removed > details->added)
913 16 : return {};
914 : }
915 250 : return details->conversationId;
916 576 : }
917 310 : return {};
918 : }
919 :
920 : bool
921 1 : ConversationModule::Impl::updateConvForContact(const std::string& uri,
922 : const std::string& oldConv,
923 : const std::string& newConv)
924 : {
925 1 : if (newConv != oldConv) {
926 1 : auto conversation = getOneToOneConversation(uri);
927 1 : if (conversation != oldConv) {
928 0 : JAMI_DEBUG("[Account {}] [Conversation {}] Old conversation is not found in details {} - found: {}",
929 : accountId_,
930 : newConv,
931 : oldConv,
932 : conversation);
933 0 : return false;
934 : }
935 1 : accountManager_->updateContactConversation(uri, newConv);
936 1 : return true;
937 1 : }
938 0 : return false;
939 : }
940 :
941 : void
942 62 : ConversationModule::Impl::declineOtherConversationWith(const std::string& uri)
943 : {
944 : // conversationsRequestsMtx_ MUST BE LOCKED
945 63 : for (auto& [id, request] : conversationsRequests_) {
946 1 : if (request.declined)
947 0 : continue; // Ignore already declined requests
948 1 : if (request.isOneToOne() && request.from == uri) {
949 3 : JAMI_WARNING("[Account {}] [Conversation {}] Decline conversation request from {}",
950 : accountId_, id, uri);
951 1 : request.declined = std::time(nullptr);
952 1 : syncingMetadatas_.erase(id);
953 1 : saveMetadata();
954 1 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(accountId_, id);
955 : }
956 : }
957 62 : }
958 :
959 : std::vector<std::map<std::string, std::string>>
960 90 : ConversationModule::Impl::getConversationMembers(const std::string& conversationId,
961 : bool includeBanned) const
962 : {
963 : return withConv(conversationId,
964 180 : [&](const auto& conv) { return conv.getMembers(true, includeBanned); });
965 : }
966 :
967 : void
968 11 : ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force)
969 : {
970 11 : auto conv = getConversation(conversationId);
971 11 : if (!conv)
972 0 : return;
973 11 : std::unique_lock lk(conv->mtx);
974 11 : removeRepositoryImpl(*conv, sync, force);
975 11 : }
976 :
977 : void
978 17 : ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force)
979 : {
980 17 : if (conv.conversation && (force || conv.conversation->isRemoving())) {
981 : // Stop fetch!
982 16 : conv.pending.reset();
983 :
984 48 : JAMI_LOG("[Account {}] [Conversation {}] Remove conversation", accountId_, conv.info.id);
985 : try {
986 16 : if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
987 33 : for (const auto& member : conv.conversation->getInitialMembers()) {
988 22 : if (member != username_) {
989 : // Note: this can happen while re-adding a contact.
990 : // In this case, check that we are removing the linked conversation.
991 11 : if (conv.info.id == getOneToOneConversation(member)) {
992 0 : accountManager_->removeContactConversation(member);
993 : }
994 : }
995 11 : }
996 : }
997 0 : } catch (const std::exception& e) {
998 0 : JAMI_ERR() << e.what();
999 0 : }
1000 16 : conv.conversation->erase();
1001 16 : conv.conversation.reset();
1002 :
1003 16 : if (!sync)
1004 1 : return;
1005 :
1006 15 : conv.info.erased = std::time(nullptr);
1007 15 : needsSyncingCb_({});
1008 15 : addConvInfo(conv.info);
1009 : }
1010 : }
1011 :
1012 : bool
1013 6 : ConversationModule::Impl::removeConversation(const std::string& conversationId)
1014 : {
1015 12 : return withConv(conversationId, [this](auto& conv) { return removeConversationImpl(conv); });
1016 : }
1017 :
1018 : bool
1019 6 : ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv)
1020 : {
1021 6 : auto members = conv.getMembers(false, false);
1022 6 : auto isSyncing = !conv.conversation;
1023 6 : auto hasMembers = !isSyncing // If syncing there is no member to inform
1024 10 : && std::find_if(members.begin(),
1025 : members.end(),
1026 6 : [&](const auto& member) {
1027 6 : return member.at("uri") == username_;
1028 : })
1029 10 : != members.end() // We must be still a member
1030 11 : && members.size() != 1; // If there is only ourself
1031 6 : conv.info.removed = std::time(nullptr);
1032 6 : if (isSyncing)
1033 1 : conv.info.erased = std::time(nullptr);
1034 : // Sync now, because it can take some time to really removes the datas
1035 6 : needsSyncingCb_({});
1036 6 : addConvInfo(conv.info);
1037 6 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id);
1038 6 : if (isSyncing)
1039 1 : return true;
1040 5 : if (conv.conversation->mode() != ConversationMode::ONE_TO_ONE) {
1041 : // For one to one, we do not notify the leave. The other can still generate request
1042 : // and this is managed by the banned part. If we re-accept, the old conversation will be
1043 : // retrieved
1044 4 : auto commitId = conv.conversation->leave();
1045 4 : if (hasMembers) {
1046 3 : JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id);
1047 : // Commit that we left
1048 1 : if (!commitId.empty()) {
1049 : // Do not sync as it's synched by convInfos
1050 1 : sendMessageNotification(*conv.conversation, false, commitId);
1051 : } else {
1052 0 : JAMI_ERROR("Failed to send message to conversation {}", conv.info.id);
1053 : }
1054 : // In this case, we wait that another peer sync the conversation
1055 : // to definitely remove it from the device. This is to inform the
1056 : // peer that we left the conversation and never want to receive
1057 : // any messages
1058 1 : return true;
1059 : }
1060 4 : } else {
1061 3 : for (const auto& m : members)
1062 2 : if (username_ != m.at("uri"))
1063 1 : updateConvForContact(m.at("uri"), conv.info.id, "");
1064 : }
1065 : // Else we are the last member, so we can remove
1066 4 : removeRepositoryImpl(conv, true);
1067 4 : return true;
1068 6 : }
1069 :
1070 : void
1071 377 : ConversationModule::Impl::sendMessageNotification(const std::string& conversationId,
1072 : bool sync,
1073 : const std::string& commitId,
1074 : const std::string& deviceId)
1075 : {
1076 377 : if (auto conv = getConversation(conversationId)) {
1077 377 : std::lock_guard lk(conv->mtx);
1078 377 : if (conv->conversation)
1079 375 : sendMessageNotification(*conv->conversation, sync, commitId, deviceId);
1080 754 : }
1081 377 : }
1082 :
1083 : void
1084 1563 : ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
1085 : bool sync,
1086 : const std::string& commitId,
1087 : const std::string& deviceId)
1088 : {
1089 1563 : auto acc = account_.lock();
1090 1563 : if (!acc)
1091 0 : return;
1092 1563 : Json::Value message;
1093 1563 : auto commit = commitId == "" ? conversation.lastCommitId() : commitId;
1094 1563 : message["id"] = conversation.id();
1095 1563 : message["commit"] = commit;
1096 1563 : message["deviceId"] = deviceId_;
1097 1563 : const auto text = json::toString(message);
1098 :
1099 : // Send message notification will announce the new commit in 3 steps.
1100 :
1101 : // First, because our account can have several devices, announce to other devices
1102 1563 : if (sync) {
1103 : // Announce to our devices
1104 483 : refreshMessage[username_] = sendMsgCb_(username_,
1105 : {},
1106 1932 : std::map<std::string, std::string> {
1107 966 : {MIME_TYPE_GIT, text}},
1108 483 : refreshMessage[username_]);
1109 : }
1110 :
1111 : // Then, we announce to 2 random members in the conversation that aren't in the DRT
1112 : // This allow new devices without the ability to sync to their other devices to sync with us.
1113 : // Or they can also use an old backup.
1114 1563 : std::vector<std::string> nonConnectedMembers;
1115 1563 : std::vector<NodeId> devices;
1116 : {
1117 1563 : std::lock_guard lk(notSyncedNotificationMtx_);
1118 1563 : devices = conversation.peersToSyncWith();
1119 3126 : auto members = conversation.memberUris(username_, {MemberRole::BANNED});
1120 1563 : std::vector<std::string> connectedMembers;
1121 : // print all members
1122 14310 : for (const auto& device : devices) {
1123 12747 : auto cert = acc->certStore().getCertificate(device.toString());
1124 12747 : if (cert && cert->issuer)
1125 12747 : connectedMembers.emplace_back(cert->issuer->getId().toString());
1126 12747 : }
1127 1563 : std::sort(std::begin(connectedMembers), std::end(connectedMembers));
1128 1563 : std::set_difference(members.begin(),
1129 : members.end(),
1130 : connectedMembers.begin(),
1131 : connectedMembers.end(),
1132 : std::inserter(nonConnectedMembers, nonConnectedMembers.begin()));
1133 1563 : std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand);
1134 1563 : if (nonConnectedMembers.size() > 2)
1135 385 : nonConnectedMembers.resize(2);
1136 1563 : if (!conversation.isBootstraped()) {
1137 732 : JAMI_DEBUG("[Conversation {}] Not yet bootstraped, save notification",
1138 : conversation.id());
1139 : // Because we can get some git channels but not bootstraped, we should keep this
1140 : // to refresh when bootstraped.
1141 244 : notSyncedNotification_[conversation.id()] = commit;
1142 : }
1143 1563 : }
1144 :
1145 3107 : for (const auto& member : nonConnectedMembers) {
1146 1544 : refreshMessage[member] = sendMsgCb_(member,
1147 : {},
1148 6176 : std::map<std::string, std::string> {
1149 3088 : {MIME_TYPE_GIT, text}},
1150 1544 : refreshMessage[member]);
1151 : }
1152 :
1153 : // Finally we send to devices that the DRT choose.
1154 14310 : for (const auto& device : devices) {
1155 12747 : auto deviceIdStr = device.toString();
1156 12747 : auto memberUri = conversation.uriFromDevice(deviceIdStr);
1157 12747 : if (memberUri.empty() || deviceIdStr == deviceId)
1158 955 : continue;
1159 11792 : refreshMessage[deviceIdStr] = sendMsgCb_(memberUri,
1160 : device,
1161 47168 : std::map<std::string, std::string> {
1162 23584 : {MIME_TYPE_GIT, text}},
1163 11792 : refreshMessage[deviceIdStr]);
1164 13702 : }
1165 1563 : }
1166 :
1167 : void
1168 56 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1169 : std::string message,
1170 : const std::string& replyTo,
1171 : const std::string& type,
1172 : bool announce,
1173 : OnCommitCb&& onCommit,
1174 : OnDoneCb&& cb)
1175 : {
1176 56 : Json::Value json;
1177 56 : json["body"] = std::move(message);
1178 56 : json["type"] = type;
1179 56 : sendMessage(conversationId,
1180 56 : std::move(json),
1181 : replyTo,
1182 : announce,
1183 56 : std::move(onCommit),
1184 56 : std::move(cb));
1185 56 : }
1186 :
1187 : void
1188 73 : ConversationModule::Impl::sendMessage(const std::string& conversationId,
1189 : Json::Value&& value,
1190 : const std::string& replyTo,
1191 : bool announce,
1192 : OnCommitCb&& onCommit,
1193 : OnDoneCb&& cb)
1194 : {
1195 73 : if (auto conv = getConversation(conversationId)) {
1196 73 : std::lock_guard lk(conv->mtx);
1197 73 : if (conv->conversation)
1198 73 : conv->conversation
1199 219 : ->sendMessage(std::move(value),
1200 : replyTo,
1201 73 : std::move(onCommit),
1202 73 : [this,
1203 : conversationId,
1204 : announce,
1205 73 : cb = std::move(cb)](bool ok, const std::string& commitId) {
1206 73 : if (cb)
1207 0 : cb(ok, commitId);
1208 73 : if (!announce)
1209 0 : return;
1210 73 : if (ok)
1211 72 : sendMessageNotification(conversationId, true, commitId);
1212 : else
1213 1 : JAMI_ERR("Failed to send message to conversation %s",
1214 : conversationId.c_str());
1215 : });
1216 146 : }
1217 73 : }
1218 :
1219 : void
1220 1 : ConversationModule::Impl::editMessage(const std::string& conversationId,
1221 : const std::string& newBody,
1222 : const std::string& editedId)
1223 : {
1224 : // Check that editedId is a valid commit, from ourself and plain/text
1225 1 : auto validCommit = false;
1226 1 : std::string type, tid;
1227 1 : if (auto conv = getConversation(conversationId)) {
1228 1 : std::lock_guard lk(conv->mtx);
1229 1 : if (conv->conversation) {
1230 1 : auto commit = conv->conversation->getCommit(editedId);
1231 1 : if (commit != std::nullopt) {
1232 1 : type = commit->at("type");
1233 1 : if (type == "application/data-transfer+json")
1234 1 : tid = commit->at("tid");
1235 3 : validCommit = commit->at("author") == username_
1236 3 : && (type == "text/plain" || type == "application/data-transfer+json");
1237 : }
1238 1 : }
1239 2 : }
1240 1 : if (!validCommit) {
1241 0 : JAMI_ERROR("Unable to edit commit {:s}", editedId);
1242 0 : return;
1243 : }
1244 : // Commit message edition
1245 1 : Json::Value json;
1246 1 : if (type == "application/data-transfer+json") {
1247 1 : json["tid"] = "";
1248 : // Remove file!
1249 2 : auto path = fileutils::get_data_dir() / accountId_ / "conversation_data" / conversationId
1250 3 : / fmt::format("{}_{}", editedId, tid);
1251 1 : dhtnet::fileutils::remove(path, true);
1252 1 : } else {
1253 0 : json["body"] = newBody;
1254 : }
1255 1 : json["edit"] = editedId;
1256 1 : json["type"] = type;
1257 1 : sendMessage(conversationId, std::move(json));
1258 1 : }
1259 :
1260 : void
1261 265 : ConversationModule::Impl::bootstrapCb(std::string convId)
1262 : {
1263 265 : std::string commitId;
1264 : {
1265 265 : std::lock_guard lk(notSyncedNotificationMtx_);
1266 265 : auto it = notSyncedNotification_.find(convId);
1267 265 : if (it != notSyncedNotification_.end()) {
1268 170 : commitId = it->second;
1269 170 : notSyncedNotification_.erase(it);
1270 : }
1271 265 : }
1272 795 : JAMI_DEBUG("[Account {}] [Conversation {}] Resend last message notification", accountId_, convId);
1273 265 : dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] {
1274 265 : if (auto sthis = w.lock())
1275 265 : sthis->sendMessageNotification(convId, true, commitId);
1276 265 : });
1277 265 : }
1278 :
1279 : void
1280 587 : ConversationModule::Impl::fixStructures(
1281 : std::shared_ptr<JamiAccount> acc,
1282 : const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
1283 : const std::set<std::string>& toRm)
1284 : {
1285 587 : for (const auto& [uri, oldConv, newConv] : updateContactConv) {
1286 0 : updateConvForContact(uri, oldConv, newConv);
1287 : }
1288 : ////////////////////////////////////////////////////////////////
1289 : // Note: This is only to homogenize trust and convRequests
1290 587 : std::vector<std::string> invalidPendingRequests;
1291 : {
1292 587 : auto requests = acc->getTrustRequests();
1293 587 : std::lock_guard lk(conversationsRequestsMtx_);
1294 588 : for (const auto& request : requests) {
1295 1 : auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID);
1296 1 : auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM);
1297 1 : if (itConvId != request.end() && itConvFrom != request.end()) {
1298 : // Check if requests exists or is declined.
1299 1 : auto itReq = conversationsRequests_.find(itConvId->second);
1300 1 : auto declined = itReq == conversationsRequests_.end() || itReq->second.declined;
1301 1 : if (declined) {
1302 3 : JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second);
1303 1 : invalidPendingRequests.emplace_back(itConvFrom->second);
1304 : }
1305 : }
1306 : }
1307 587 : auto requestRemoved = false;
1308 589 : for (auto it = conversationsRequests_.begin(); it != conversationsRequests_.end();) {
1309 2 : if (it->second.from == username_) {
1310 0 : JAMI_WARNING("Detected request from ourself, this makes no sense. Remove {}",
1311 : it->first);
1312 0 : it = conversationsRequests_.erase(it);
1313 : } else {
1314 2 : ++it;
1315 : }
1316 : }
1317 587 : if (requestRemoved) {
1318 0 : saveConvRequests();
1319 : }
1320 587 : }
1321 588 : for (const auto& invalidPendingRequest : invalidPendingRequests)
1322 1 : acc->discardTrustRequest(invalidPendingRequest);
1323 :
1324 : ////////////////////////////////////////////////////////////////
1325 587 : for (const auto& conv : toRm) {
1326 0 : JAMI_ERROR("[Account {}] Remove conversation ({})", accountId_, conv);
1327 0 : removeConversation(conv);
1328 : }
1329 1761 : JAMI_DEBUG("[Account {}] Conversations loaded!", accountId_);
1330 587 : }
1331 :
1332 : void
1333 167 : ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
1334 : const std::string& deviceId,
1335 : const std::string& oldConvId)
1336 : {
1337 167 : std::lock_guard lk(conv->mtx);
1338 167 : const auto& conversationId = conv->info.id;
1339 167 : if (!conv->startFetch(deviceId, true)) {
1340 138 : JAMI_WARNING("[Account {}] [Conversation {}] Already fetching", accountId_, conversationId);
1341 46 : return;
1342 : }
1343 :
1344 121 : onNeedSocket_(
1345 : conversationId,
1346 : deviceId,
1347 121 : [wthis = weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) {
1348 121 : std::lock_guard lk(conv->mtx);
1349 121 : if (conv->pending && !conv->pending->ready) {
1350 119 : conv->pending->removeId = oldConvId;
1351 119 : if (channel) {
1352 110 : conv->pending->ready = true;
1353 110 : conv->pending->deviceId = channel->deviceId().toString();
1354 110 : conv->pending->socket = channel;
1355 110 : if (!conv->pending->cloning) {
1356 110 : conv->pending->cloning = true;
1357 220 : dht::ThreadPool::io().run(
1358 220 : [wthis, conversationId, deviceId = conv->pending->deviceId]() {
1359 220 : if (auto sthis = wthis.lock())
1360 110 : sthis->handlePendingConversation(conversationId, deviceId);
1361 : });
1362 : }
1363 110 : return true;
1364 18 : } else if (auto sthis = wthis.lock()) {
1365 9 : conv->stopFetch(deviceId);
1366 27 : JAMI_WARNING("[Account {}] [Conversation {}] Clone failed. Re-clone in {}s", sthis->accountId_, conversationId, conv->fallbackTimer.count());
1367 18 : conv->fallbackClone->expires_at(std::chrono::steady_clock::now()
1368 9 : + conv->fallbackTimer);
1369 9 : conv->fallbackTimer *= 2;
1370 9 : if (conv->fallbackTimer > MAX_FALLBACK)
1371 0 : conv->fallbackTimer = MAX_FALLBACK;
1372 18 : conv->fallbackClone->async_wait(
1373 : std::bind(&ConversationModule::Impl::fallbackClone,
1374 : sthis,
1375 : std::placeholders::_1,
1376 9 : conversationId));
1377 : }
1378 : }
1379 11 : return false;
1380 121 : },
1381 : MIME_TYPE_GIT);
1382 167 : }
1383 :
1384 : void
1385 7 : ConversationModule::Impl::fallbackClone(const asio::error_code& ec,
1386 : const std::string& conversationId)
1387 : {
1388 7 : if (ec == asio::error::operation_aborted)
1389 2 : return;
1390 6 : auto conv = getConversation(conversationId);
1391 6 : if (!conv || conv->conversation)
1392 1 : return;
1393 5 : auto members = getConversationMembers(conversationId);
1394 15 : for (const auto& member : members)
1395 10 : if (member.at("uri") != username_)
1396 5 : cloneConversationFrom(conversationId, member.at("uri"));
1397 6 : }
1398 :
1399 : void
1400 1271 : ConversationModule::Impl::bootstrap(const std::string& convId)
1401 : {
1402 1271 : std::vector<DeviceId> kd;
1403 : {
1404 1271 : std::unique_lock lk(conversationsMtx_);
1405 1271 : const auto& devices = accountManager_->getKnownDevices();
1406 1271 : kd.reserve(devices.size());
1407 2619 : for (const auto& [id, _] : devices)
1408 1348 : kd.emplace_back(id);
1409 1271 : }
1410 134 : auto bootstrap = [&](auto& conv) {
1411 134 : if (conv) {
1412 : #ifdef LIBJAMI_TEST
1413 134 : conv->onBootstrapStatus(bootstrapCbTest_);
1414 : #endif
1415 134 : conv->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb, this, conv->id()), kd);
1416 : }
1417 134 : };
1418 1271 : std::vector<std::string> toClone;
1419 1271 : std::vector<std::shared_ptr<Conversation>> conversations;
1420 1271 : if (convId.empty()) {
1421 1135 : std::lock_guard lk(convInfosMtx_);
1422 1188 : for (const auto& [conversationId, convInfo] : convInfos_) {
1423 53 : auto conv = getConversation(conversationId);
1424 53 : if (!conv)
1425 0 : return;
1426 53 : if (!conv->conversation && !conv->info.isRemoved()) {
1427 : // Because we're not tracking contact presence in order to sync now,
1428 : // we need to ask to clone requests when bootstraping all conversations
1429 : // else it can stay syncing
1430 9 : toClone.emplace_back(conversationId);
1431 44 : } else if (conv->conversation) {
1432 42 : conversations.emplace_back(conv->conversation);
1433 : }
1434 53 : }
1435 1271 : } else if (auto conv = getConversation(convId)) {
1436 92 : std::lock_guard lk(conv->mtx);
1437 92 : if (conv->conversation)
1438 92 : conversations.emplace_back(conv->conversation);
1439 228 : }
1440 :
1441 1405 : for (const auto& conversation : conversations)
1442 134 : bootstrap(conversation);
1443 1280 : for (const auto& cid : toClone) {
1444 9 : auto members = getConversationMembers(cid);
1445 23 : for (const auto& member : members) {
1446 14 : if (member.at("uri") != username_)
1447 5 : cloneConversationFrom(cid, member.at("uri"));
1448 : }
1449 9 : }
1450 1271 : }
1451 :
1452 : void
1453 152 : ConversationModule::Impl::cloneConversationFrom(const std::string& conversationId,
1454 : const std::string& uri,
1455 : const std::string& oldConvId)
1456 : {
1457 152 : auto memberHash = dht::InfoHash(uri);
1458 152 : if (!memberHash) {
1459 0 : JAMI_WARNING("Invalid member detected: {}", uri);
1460 0 : return;
1461 : }
1462 152 : auto conv = startConversation(conversationId);
1463 152 : std::lock_guard lk(conv->mtx);
1464 152 : conv->info = {conversationId};
1465 152 : conv->info.created = std::time(nullptr);
1466 152 : conv->info.members.emplace(username_);
1467 152 : conv->info.members.emplace(uri);
1468 152 : accountManager_->forEachDevice(memberHash,
1469 144 : [w = weak(), conv, conversationId, oldConvId](
1470 : const std::shared_ptr<dht::crypto::PublicKey>& pk) {
1471 144 : auto sthis = w.lock();
1472 144 : auto deviceId = pk->getLongId().toString();
1473 144 : if (!sthis or deviceId == sthis->deviceId_)
1474 0 : return;
1475 144 : sthis->cloneConversationFrom(conv, deviceId, oldConvId);
1476 144 : });
1477 152 : addConvInfo(conv->info);
1478 152 : }
1479 :
1480 : ////////////////////////////////////////////////////////////////
1481 :
1482 : void
1483 404 : ConversationModule::saveConvRequests(
1484 : const std::string& accountId,
1485 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1486 : {
1487 404 : auto path = fileutils::get_data_dir() / accountId;
1488 404 : saveConvRequestsToPath(path, conversationsRequests);
1489 404 : }
1490 :
1491 : void
1492 1034 : ConversationModule::saveConvRequestsToPath(
1493 : const std::filesystem::path& path,
1494 : const std::map<std::string, ConversationRequest>& conversationsRequests)
1495 : {
1496 1034 : auto p = path / "convRequests";
1497 1034 : std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
1498 1034 : std::ofstream file(p, std::ios::trunc | std::ios::binary);
1499 1034 : msgpack::pack(file, conversationsRequests);
1500 1034 : }
1501 :
1502 : void
1503 2948 : ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations)
1504 : {
1505 2948 : auto path = fileutils::get_data_dir() / accountId;
1506 2948 : saveConvInfosToPath(path, conversations);
1507 2948 : }
1508 :
1509 : void
1510 3578 : ConversationModule::saveConvInfosToPath(const std::filesystem::path& path,
1511 : const ConvInfoMap& conversations)
1512 : {
1513 7156 : std::ofstream file(path / "convInfo", std::ios::trunc | std::ios::binary);
1514 3578 : msgpack::pack(file, conversations);
1515 3578 : }
1516 :
1517 : ////////////////////////////////////////////////////////////////
1518 :
1519 568 : ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account,
1520 : std::shared_ptr<AccountManager> accountManager,
1521 : NeedsSyncingCb&& needsSyncingCb,
1522 : SengMsgCb&& sendMsgCb,
1523 : NeedSocketCb&& onNeedSocket,
1524 : NeedSocketCb&& onNeedSwarmSocket,
1525 : OneToOneRecvCb&& oneToOneRecvCb,
1526 568 : bool autoLoadConversations)
1527 568 : : pimpl_ {std::make_unique<Impl>(std::move(account),
1528 568 : std::move(accountManager),
1529 568 : std::move(needsSyncingCb),
1530 568 : std::move(sendMsgCb),
1531 568 : std::move(onNeedSocket),
1532 568 : std::move(onNeedSwarmSocket),
1533 568 : std::move(oneToOneRecvCb))}
1534 : {
1535 568 : if (autoLoadConversations) {
1536 568 : loadConversations();
1537 : }
1538 568 : }
1539 :
1540 : void
1541 13 : ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager)
1542 : {
1543 13 : std::unique_lock lk(pimpl_->conversationsMtx_);
1544 13 : pimpl_->accountManager_ = accountManager;
1545 13 : }
1546 :
1547 : #ifdef LIBJAMI_TEST
1548 : void
1549 17 : ConversationModule::onBootstrapStatus(
1550 : const std::function<void(std::string, Conversation::BootstrapStatus)>& cb)
1551 : {
1552 17 : pimpl_->bootstrapCbTest_ = cb;
1553 18 : for (auto& c : pimpl_->getConversations())
1554 18 : c->onBootstrapStatus(pimpl_->bootstrapCbTest_);
1555 17 : }
1556 : #endif
1557 :
1558 : void
1559 587 : ConversationModule::loadConversations()
1560 : {
1561 587 : auto acc = pimpl_->account_.lock();
1562 587 : if (!acc)
1563 0 : return;
1564 1761 : JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
1565 : auto conversationsRepositories = dhtnet::fileutils::readDirectory(
1566 1174 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1567 :
1568 587 : std::unique_lock lk(pimpl_->conversationsMtx_);
1569 587 : auto contacts = pimpl_->accountManager_->getContacts(
1570 587 : true); // Avoid to lock configurationMtx while conv Mtx is locked
1571 587 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1572 587 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1573 587 : pimpl_->conversations_.clear();
1574 :
1575 : struct Ctx
1576 : {
1577 : std::mutex cvMtx;
1578 : std::condition_variable cv;
1579 : std::mutex toRmMtx;
1580 : std::set<std::string> toRm;
1581 : std::mutex convMtx;
1582 : size_t convNb;
1583 : std::vector<std::map<std::string, std::string>> contacts;
1584 : std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
1585 : };
1586 587 : auto ctx = std::make_shared<Ctx>();
1587 587 : ctx->convNb = conversationsRepositories.size();
1588 587 : ctx->contacts = std::move(contacts);
1589 :
1590 595 : for (auto&& r : conversationsRepositories) {
1591 8 : dht::ThreadPool::io().run([this, ctx, repository = std::move(r), acc] {
1592 : try {
1593 8 : auto sconv = std::make_shared<SyncedConversation>(repository);
1594 8 : auto conv = std::make_shared<Conversation>(acc, repository);
1595 8 : conv->onMessageStatusChanged([this, repository](const auto& status) {
1596 4 : auto msg = std::make_shared<SyncMsg>();
1597 8 : msg->ms = {{repository, status}};
1598 4 : pimpl_->needsSyncingCb_(std::move(msg));
1599 4 : });
1600 24 : conv->onMembersChanged(
1601 16 : [w = pimpl_->weak_from_this(), repository](const auto& members) {
1602 : // Delay in another thread to avoid deadlocks
1603 0 : dht::ThreadPool::io().run([w, repository, members = std::move(members)] {
1604 0 : if (auto sthis = w.lock())
1605 0 : sthis->setConversationMembers(repository, members);
1606 : });
1607 0 : });
1608 8 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1609 8 : auto members = conv->memberUris(acc->getUsername(), {});
1610 : // NOTE: The following if is here to protect against any incorrect state
1611 : // that can be introduced
1612 8 : if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
1613 : // If we got a 1:1 conversation, but not in the contact details, it's rather a
1614 : // duplicate or a weird state
1615 1 : auto otherUri = *members.begin();
1616 1 : auto itContact = std::find_if(ctx->contacts.cbegin(),
1617 1 : ctx->contacts.cend(),
1618 1 : [&](const auto& c) {
1619 1 : return c.at("id") == otherUri;
1620 : });
1621 1 : if (itContact == ctx->contacts.end()) {
1622 0 : JAMI_WARNING("Contact {} not found", otherUri);
1623 0 : std::lock_guard lkCv {ctx->cvMtx};
1624 0 : --ctx->convNb;
1625 0 : ctx->cv.notify_all();
1626 0 : return;
1627 0 : }
1628 1 : const std::string& convFromDetails = itContact->at("conversationId");
1629 1 : auto removed = std::stoul(itContact->at("removed"));
1630 1 : auto added = std::stoul(itContact->at("added"));
1631 1 : auto isRemoved = removed > added;
1632 1 : if (convFromDetails != repository) {
1633 0 : if (convFromDetails.empty()) {
1634 0 : if (isRemoved) {
1635 : // If details is empty, contact is removed and not banned.
1636 0 : JAMI_ERROR("Conversation {} detected for {} and should be removed",
1637 : repository,
1638 : otherUri);
1639 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1640 0 : ctx->toRm.insert(repository);
1641 0 : } else {
1642 0 : JAMI_ERROR("No conversation detected for {} but one exists ({}). "
1643 : "Update details",
1644 : otherUri,
1645 : repository);
1646 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1647 0 : ctx->updateContactConv.emplace_back(
1648 0 : std::make_tuple(otherUri, convFromDetails, repository));
1649 0 : }
1650 : }
1651 : }
1652 1 : }
1653 : {
1654 8 : std::lock_guard lkMtx {ctx->convMtx};
1655 8 : auto convInfo = pimpl_->convInfos_.find(repository);
1656 8 : if (convInfo == pimpl_->convInfos_.end()) {
1657 3 : JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
1658 1 : sconv->info.created = std::time(nullptr);
1659 1 : sconv->info.lastDisplayed
1660 1 : = conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
1661 : } else {
1662 7 : sconv->info = convInfo->second;
1663 7 : if (convInfo->second.isRemoved()) {
1664 : // A conversation was removed, but repository still exists
1665 0 : conv->setRemovingFlag();
1666 0 : std::lock_guard lkMtx {ctx->toRmMtx};
1667 0 : ctx->toRm.insert(repository);
1668 0 : }
1669 : }
1670 : // Even if we found the conversation in convInfos_, unable to assume that the
1671 : // list of members stored in `convInfo` is correct
1672 : // (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1025). For this
1673 : // reason, we always use the list we got from the conversation repository to set
1674 : // the value of `sconv->info.members`.
1675 8 : members.emplace(acc->getUsername());
1676 8 : sconv->info.members = std::move(members);
1677 : // convInfosMtx_ is already locked
1678 8 : pimpl_->convInfos_[repository] = sconv->info;
1679 8 : }
1680 8 : auto commits = conv->commitsEndedCalls();
1681 :
1682 8 : if (!commits.empty()) {
1683 : // Note: here, this means that some calls were actives while the
1684 : // daemon finished (can be a crash).
1685 : // Notify other in the conversation that the call is finished
1686 0 : pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
1687 : }
1688 8 : sconv->conversation = conv;
1689 8 : std::lock_guard lkMtx {ctx->convMtx};
1690 8 : pimpl_->conversations_.emplace(repository, std::move(sconv));
1691 8 : } catch (const std::logic_error& e) {
1692 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}",
1693 : pimpl_->accountId_,
1694 : e.what());
1695 0 : }
1696 8 : std::lock_guard lkCv {ctx->cvMtx};
1697 8 : --ctx->convNb;
1698 8 : ctx->cv.notify_all();
1699 8 : });
1700 : }
1701 :
1702 587 : std::unique_lock lkCv(ctx->cvMtx);
1703 1182 : ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; });
1704 :
1705 : // Prune any invalid conversations without members and
1706 : // set the removed flag if needed
1707 587 : std::set<std::string> removed;
1708 607 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1709 20 : const auto& info = itInfo->second;
1710 20 : if (info.members.empty()) {
1711 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1712 0 : continue;
1713 : }
1714 20 : if (info.isRemoved())
1715 0 : removed.insert(info.id);
1716 20 : auto itConv = pimpl_->conversations_.find(info.id);
1717 20 : if (itConv == pimpl_->conversations_.end()) {
1718 : // convInfos_ can contain a conversation that is not yet cloned
1719 : // so we need to add it there.
1720 12 : itConv = pimpl_->conversations_
1721 12 : .emplace(info.id, std::make_shared<SyncedConversation>(info))
1722 : .first;
1723 : }
1724 40 : if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation
1725 40 : && info.isRemoved())
1726 0 : itConv->second->conversation->setRemovingFlag();
1727 20 : if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
1728 : // In this case, the conversation is not synced and we only know ourself
1729 0 : if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
1730 0 : JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.",
1731 : pimpl_->accountId_,
1732 : info.id);
1733 0 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
1734 0 : info.id);
1735 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1736 0 : continue;
1737 0 : }
1738 : }
1739 20 : ++itInfo;
1740 : }
1741 : // On oldest version, removeConversation didn't update "appdata/contacts"
1742 : // causing a potential incorrect state between "appdata/contacts" and "appdata/convInfos"
1743 587 : if (!removed.empty())
1744 0 : acc->unlinkConversations(removed);
1745 : // Save if we've removed some invalid entries
1746 587 : pimpl_->saveConvInfos();
1747 :
1748 587 : ilk.unlock();
1749 587 : lk.unlock();
1750 :
1751 1761 : dht::ThreadPool::io().run([w = pimpl_->weak(),
1752 : acc,
1753 587 : updateContactConv = std::move(ctx->updateContactConv),
1754 587 : toRm = std::move(ctx->toRm)]() {
1755 : // Will lock account manager
1756 587 : if (auto shared = w.lock())
1757 587 : shared->fixStructures(acc, updateContactConv, toRm);
1758 587 : });
1759 587 : }
1760 :
1761 : void
1762 0 : ConversationModule::loadSingleConversation(const std::string& convId)
1763 : {
1764 0 : auto acc = pimpl_->account_.lock();
1765 0 : if (!acc)
1766 0 : return;
1767 0 : JAMI_LOG("[Account {}] Start loading conversation {}", pimpl_->accountId_, convId);
1768 :
1769 0 : std::unique_lock lk(pimpl_->conversationsMtx_);
1770 0 : std::unique_lock ilk(pimpl_->convInfosMtx_);
1771 : // Load convInfos to retrieve requests that have been accepted but not yet synchronized.
1772 0 : pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
1773 0 : pimpl_->conversations_.clear();
1774 :
1775 : try {
1776 0 : auto sconv = std::make_shared<SyncedConversation>(convId);
1777 :
1778 0 : auto conv = std::make_shared<Conversation>(acc, convId);
1779 :
1780 0 : conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1781 :
1782 0 : sconv->conversation = conv;
1783 0 : pimpl_->conversations_.emplace(convId, std::move(sconv));
1784 0 : } catch (const std::logic_error& e) {
1785 0 : JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
1786 0 : }
1787 :
1788 : // Add all other conversations as dummy conversations to indicate their existence so
1789 : // isConversation could detect conversations correctly.
1790 : auto conversationsRepositoryIds = dhtnet::fileutils::readDirectory(
1791 0 : fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
1792 0 : for (auto repositoryId : conversationsRepositoryIds) {
1793 0 : if (repositoryId != convId) {
1794 0 : auto conv = std::make_shared<SyncedConversation>(repositoryId);
1795 0 : pimpl_->conversations_.emplace(repositoryId, conv);
1796 0 : }
1797 0 : }
1798 :
1799 : // Add conversations from convInfos_ so isConversation could detect conversations correctly.
1800 : // This includes conversations that have been accepted but are not yet synchronized.
1801 0 : for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
1802 0 : const auto& info = itInfo->second;
1803 0 : if (info.members.empty()) {
1804 0 : itInfo = pimpl_->convInfos_.erase(itInfo);
1805 0 : continue;
1806 : }
1807 0 : auto itConv = pimpl_->conversations_.find(info.id);
1808 0 : if (itConv == pimpl_->conversations_.end()) {
1809 : // convInfos_ can contain a conversation that is not yet cloned
1810 : // so we need to add it there.
1811 0 : pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info));
1812 : }
1813 0 : ++itInfo;
1814 : }
1815 :
1816 0 : ilk.unlock();
1817 0 : lk.unlock();
1818 0 : }
1819 :
1820 : void
1821 1271 : ConversationModule::bootstrap(const std::string& convId)
1822 : {
1823 1271 : pimpl_->bootstrap(convId);
1824 1271 : }
1825 :
1826 : void
1827 0 : ConversationModule::monitor()
1828 : {
1829 0 : for (auto& conv : pimpl_->getConversations())
1830 0 : conv->monitor();
1831 0 : }
1832 :
1833 : void
1834 587 : ConversationModule::clearPendingFetch()
1835 : {
1836 : // Note: This is a workaround. convModule() is kept if account is disabled/re-enabled.
1837 : // iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch
1838 : // is not erased (callback not called), it will block the new messages as it will not
1839 : // sync. The best way to debug this is to get logs from the last ICE connection for
1840 : // syncing the conversation. It may have been killed in some un-expected way avoiding to
1841 : // call the callbacks. This should never happen, but if it's the case, this will allow
1842 : // new messages to be synced correctly.
1843 609 : for (auto& conv : pimpl_->getSyncedConversations()) {
1844 22 : std::lock_guard lk(conv->mtx);
1845 22 : if (conv && conv->pending) {
1846 0 : JAMI_ERR("This is a bug, seems to still fetch to some device on initializing");
1847 0 : conv->pending.reset();
1848 : }
1849 609 : }
1850 587 : }
1851 :
1852 : void
1853 0 : ConversationModule::reloadRequests()
1854 : {
1855 0 : pimpl_->conversationsRequests_ = convRequests(pimpl_->accountId_);
1856 0 : }
1857 :
1858 : std::vector<std::string>
1859 7 : ConversationModule::getConversations() const
1860 : {
1861 7 : std::vector<std::string> result;
1862 7 : std::lock_guard lk(pimpl_->convInfosMtx_);
1863 7 : result.reserve(pimpl_->convInfos_.size());
1864 14 : for (const auto& [key, conv] : pimpl_->convInfos_) {
1865 7 : if (conv.isRemoved())
1866 3 : continue;
1867 4 : result.emplace_back(key);
1868 : }
1869 14 : return result;
1870 7 : }
1871 :
1872 : std::string
1873 562 : ConversationModule::getOneToOneConversation(const std::string& uri) const noexcept
1874 : {
1875 562 : return pimpl_->getOneToOneConversation(uri);
1876 : }
1877 :
1878 : bool
1879 0 : ConversationModule::updateConvForContact(const std::string& uri,
1880 : const std::string& oldConv,
1881 : const std::string& newConv)
1882 : {
1883 0 : return pimpl_->updateConvForContact(uri, oldConv, newConv);
1884 : }
1885 :
1886 : std::vector<std::map<std::string, std::string>>
1887 11 : ConversationModule::getConversationRequests() const
1888 : {
1889 11 : std::vector<std::map<std::string, std::string>> requests;
1890 11 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1891 11 : requests.reserve(pimpl_->conversationsRequests_.size());
1892 22 : for (const auto& [id, request] : pimpl_->conversationsRequests_) {
1893 11 : if (request.declined)
1894 6 : continue; // Do not add declined requests
1895 5 : requests.emplace_back(request.toMap());
1896 : }
1897 22 : return requests;
1898 11 : }
1899 :
1900 : void
1901 91 : ConversationModule::onTrustRequest(const std::string& uri,
1902 : const std::string& conversationId,
1903 : const std::vector<uint8_t>& payload,
1904 : time_t received)
1905 : {
1906 91 : auto oldConv = getOneToOneConversation(uri);
1907 91 : if (!oldConv.empty() && pimpl_->isConversation(oldConv)) {
1908 : // If there is already an active one to one conversation here, it's an active
1909 : // contact and the contact will reclone this activeConv, so ignore the request
1910 24 : JAMI_WARNING(
1911 : "Contact is sending a request for a non active conversation. Ignore. They will "
1912 : "clone the old one");
1913 8 : return;
1914 : }
1915 83 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1916 83 : ConversationRequest req;
1917 83 : req.from = uri;
1918 83 : req.conversationId = conversationId;
1919 83 : req.received = std::time(nullptr);
1920 249 : req.metadatas = ConversationRepository::infosFromVCard(vCard::utils::toMap(
1921 166 : std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size())));
1922 83 : auto reqMap = req.toMap();
1923 83 : if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
1924 58 : lk.unlock();
1925 58 : emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_,
1926 : conversationId,
1927 : uri,
1928 : payload,
1929 : received);
1930 58 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1931 : conversationId,
1932 : reqMap);
1933 58 : pimpl_->needsSyncingCb_({});
1934 : } else {
1935 75 : JAMI_DEBUG("[Account {}] Received a request for a conversation "
1936 : "already existing. Ignore",
1937 : pimpl_->accountId_);
1938 : }
1939 91 : }
1940 :
1941 : void
1942 207 : ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
1943 : {
1944 207 : ConversationRequest req(value);
1945 207 : auto isOneToOne = req.isOneToOne();
1946 207 : std::string oldConv;
1947 207 : if (isOneToOne) {
1948 2 : oldConv = pimpl_->getOneToOneConversation(from);
1949 : }
1950 207 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
1951 621 : JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
1952 : pimpl_->accountId_,
1953 : req.conversationId,
1954 : from);
1955 207 : auto convId = req.conversationId;
1956 :
1957 : // Already accepted request, do nothing
1958 207 : if (pimpl_->isConversation(convId))
1959 75 : return;
1960 132 : auto oldReq = pimpl_->getRequest(convId);
1961 132 : if (oldReq != std::nullopt) {
1962 66 : JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. "
1963 : "Ignore. Declined: {}",
1964 : pimpl_->accountId_,
1965 : static_cast<int>(oldReq->declined));
1966 22 : return;
1967 : }
1968 :
1969 110 : if (!oldConv.empty()) {
1970 1 : lk.unlock();
1971 : // Already a conversation with the contact.
1972 : // If there is already an active one to one conversation here, it's an active
1973 : // contact and the contact will reclone this activeConv, so ignore the request
1974 3 : JAMI_WARNING(
1975 : "Contact is sending a request for a non active conversation. Ignore. They will "
1976 : "clone the old one");
1977 1 : return;
1978 : }
1979 :
1980 109 : req.received = std::time(nullptr);
1981 109 : req.from = from;
1982 109 : auto reqMap = req.toMap();
1983 109 : if (pimpl_->addConversationRequest(convId, std::move(req))) {
1984 107 : lk.unlock();
1985 : // Note: no need to sync here because other connected devices should receive
1986 : // the same conversation request. Will sync when the conversation will be added
1987 107 : if (isOneToOne)
1988 1 : pimpl_->oneToOneRecvCb_(convId, from);
1989 107 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
1990 : convId,
1991 : reqMap);
1992 : }
1993 524 : }
1994 :
1995 : std::string
1996 3 : ConversationModule::peerFromConversationRequest(const std::string& convId) const
1997 : {
1998 3 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
1999 3 : auto it = pimpl_->conversationsRequests_.find(convId);
2000 3 : if (it != pimpl_->conversationsRequests_.end()) {
2001 3 : return it->second.from;
2002 : }
2003 0 : return {};
2004 3 : }
2005 :
2006 : void
2007 106 : ConversationModule::onNeedConversationRequest(const std::string& from,
2008 : const std::string& conversationId)
2009 : {
2010 106 : pimpl_->withConversation(conversationId, [&](auto& conversation) {
2011 106 : if (!conversation.isMember(from, true)) {
2012 0 : JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId);
2013 0 : return;
2014 : }
2015 318 : JAMI_LOG("{} is asking a new invite for {}", from, conversationId);
2016 106 : pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
2017 : });
2018 106 : }
2019 :
2020 : void
2021 181 : ConversationModule::acceptConversationRequest(const std::string& conversationId,
2022 : const std::string& deviceId)
2023 : {
2024 : // For all conversation members, try to open a git channel with this conversation ID
2025 181 : std::unique_lock lkCr(pimpl_->conversationsRequestsMtx_);
2026 181 : auto request = pimpl_->getRequest(conversationId);
2027 181 : if (request == std::nullopt) {
2028 39 : lkCr.unlock();
2029 39 : if (auto conv = pimpl_->getConversation(conversationId)) {
2030 33 : std::unique_lock lk(conv->mtx);
2031 33 : if (!conv->conversation) {
2032 23 : lk.unlock();
2033 23 : pimpl_->cloneConversationFrom(conv, deviceId);
2034 : }
2035 72 : }
2036 117 : JAMI_WARNING("[Account {}] Request not found for conversation {}",
2037 : pimpl_->accountId_,
2038 : conversationId);
2039 39 : return;
2040 : }
2041 142 : pimpl_->rmConversationRequest(conversationId);
2042 142 : lkCr.unlock();
2043 142 : pimpl_->accountManager_->acceptTrustRequest(request->from, true);
2044 142 : cloneConversationFrom(conversationId, request->from);
2045 220 : }
2046 :
2047 : void
2048 6 : ConversationModule::declineConversationRequest(const std::string& conversationId)
2049 : {
2050 6 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2051 6 : auto it = pimpl_->conversationsRequests_.find(conversationId);
2052 6 : if (it != pimpl_->conversationsRequests_.end()) {
2053 6 : it->second.declined = std::time(nullptr);
2054 6 : pimpl_->saveConvRequests();
2055 : }
2056 6 : pimpl_->syncingMetadatas_.erase(conversationId);
2057 6 : pimpl_->saveMetadata();
2058 6 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2059 : conversationId);
2060 6 : pimpl_->needsSyncingCb_({});
2061 6 : }
2062 :
2063 : std::string
2064 135 : ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash& otherMember)
2065 : {
2066 135 : auto acc = pimpl_->account_.lock();
2067 135 : if (!acc)
2068 0 : return {};
2069 135 : std::vector<DeviceId> kd;
2070 272 : for (const auto& [id, _] : acc->getKnownDevices())
2071 272 : kd.emplace_back(id);
2072 : // Create the conversation object
2073 135 : std::shared_ptr<Conversation> conversation;
2074 : try {
2075 135 : conversation = std::make_shared<Conversation>(acc, mode, otherMember.toString());
2076 135 : auto conversationId = conversation->id();
2077 135 : conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
2078 517 : auto msg = std::make_shared<SyncMsg>();
2079 1034 : msg->ms = {{conversationId, status}};
2080 517 : pimpl_->needsSyncingCb_(std::move(msg));
2081 517 : });
2082 270 : conversation->onMembersChanged(
2083 135 : [w = pimpl_->weak_from_this(), conversationId](const auto& members) {
2084 : // Delay in another thread to avoid deadlocks
2085 932 : dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
2086 932 : if (auto sthis = w.lock())
2087 466 : sthis->setConversationMembers(conversationId, members);
2088 : });
2089 466 : });
2090 135 : conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
2091 : #ifdef LIBJAMI_TEST
2092 135 : conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
2093 : #endif
2094 270 : conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
2095 135 : pimpl_.get(),
2096 : conversationId),
2097 : kd);
2098 135 : } catch (const std::exception& e) {
2099 0 : JAMI_ERROR("[Account {}] Error while generating a conversation {}",
2100 : pimpl_->accountId_,
2101 : e.what());
2102 0 : return {};
2103 0 : }
2104 135 : auto convId = conversation->id();
2105 135 : auto conv = pimpl_->startConversation(convId);
2106 135 : std::unique_lock lk(conv->mtx);
2107 135 : conv->info.created = std::time(nullptr);
2108 135 : conv->info.members.emplace(pimpl_->username_);
2109 135 : if (otherMember)
2110 54 : conv->info.members.emplace(otherMember.toString());
2111 135 : conv->conversation = conversation;
2112 135 : addConvInfo(conv->info);
2113 135 : lk.unlock();
2114 :
2115 135 : pimpl_->needsSyncingCb_({});
2116 135 : emitSignal<libjami::ConversationSignal::ConversationReady>(pimpl_->accountId_, convId);
2117 135 : return convId;
2118 135 : }
2119 :
2120 : void
2121 142 : ConversationModule::cloneConversationFrom(const std::string& conversationId,
2122 : const std::string& uri,
2123 : const std::string& oldConvId)
2124 : {
2125 142 : pimpl_->cloneConversationFrom(conversationId, uri, oldConvId);
2126 142 : }
2127 :
2128 : // Message send/load
2129 : void
2130 56 : ConversationModule::sendMessage(const std::string& conversationId,
2131 : std::string message,
2132 : const std::string& replyTo,
2133 : const std::string& type,
2134 : bool announce,
2135 : OnCommitCb&& onCommit,
2136 : OnDoneCb&& cb)
2137 : {
2138 112 : pimpl_->sendMessage(conversationId,
2139 56 : std::move(message),
2140 : replyTo,
2141 : type,
2142 : announce,
2143 56 : std::move(onCommit),
2144 56 : std::move(cb));
2145 56 : }
2146 :
2147 : void
2148 16 : ConversationModule::sendMessage(const std::string& conversationId,
2149 : Json::Value&& value,
2150 : const std::string& replyTo,
2151 : bool announce,
2152 : OnCommitCb&& onCommit,
2153 : OnDoneCb&& cb)
2154 : {
2155 32 : pimpl_->sendMessage(conversationId,
2156 16 : std::move(value),
2157 : replyTo,
2158 : announce,
2159 16 : std::move(onCommit),
2160 16 : std::move(cb));
2161 16 : }
2162 :
2163 : void
2164 1 : ConversationModule::editMessage(const std::string& conversationId,
2165 : const std::string& newBody,
2166 : const std::string& editedId)
2167 : {
2168 1 : pimpl_->editMessage(conversationId, newBody, editedId);
2169 1 : }
2170 :
2171 : void
2172 0 : ConversationModule::reactToMessage(const std::string& conversationId,
2173 : const std::string& newBody,
2174 : const std::string& reactToId)
2175 : {
2176 : // Commit message edition
2177 0 : Json::Value json;
2178 0 : json["body"] = newBody;
2179 0 : json["react-to"] = reactToId;
2180 0 : json["type"] = "text/plain";
2181 0 : pimpl_->sendMessage(conversationId, std::move(json));
2182 0 : }
2183 :
2184 : void
2185 99 : ConversationModule::addCallHistoryMessage(const std::string& uri,
2186 : uint64_t duration_ms,
2187 : const std::string& reason)
2188 : {
2189 99 : auto finalUri = uri.substr(0, uri.find("@ring.dht"));
2190 99 : finalUri = finalUri.substr(0, uri.find("@jami.dht"));
2191 99 : auto convId = getOneToOneConversation(finalUri);
2192 99 : if (!convId.empty()) {
2193 3 : Json::Value value;
2194 3 : value["to"] = finalUri;
2195 3 : value["type"] = "application/call-history+json";
2196 3 : value["duration"] = std::to_string(duration_ms);
2197 3 : if (!reason.empty())
2198 3 : value["reason"] = reason;
2199 3 : sendMessage(convId, std::move(value));
2200 3 : }
2201 99 : }
2202 :
2203 : bool
2204 12 : ConversationModule::onMessageDisplayed(const std::string& peer,
2205 : const std::string& conversationId,
2206 : const std::string& interactionId)
2207 : {
2208 12 : if (auto conv = pimpl_->getConversation(conversationId)) {
2209 12 : std::unique_lock lk(conv->mtx);
2210 12 : if (auto conversation = conv->conversation) {
2211 12 : lk.unlock();
2212 12 : return conversation->setMessageDisplayed(peer, interactionId);
2213 12 : }
2214 24 : }
2215 0 : return false;
2216 : }
2217 :
2218 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
2219 156 : ConversationModule::convMessageStatus() const
2220 : {
2221 156 : std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
2222 219 : for (const auto& conv : pimpl_->getConversations()) {
2223 63 : auto d = conv->messageStatus();
2224 63 : if (!d.empty())
2225 54 : messageStatus[conv->id()] = std::move(d);
2226 219 : }
2227 156 : return messageStatus;
2228 0 : }
2229 :
2230 : uint32_t
2231 0 : ConversationModule::loadConversationMessages(const std::string& conversationId,
2232 : const std::string& fromMessage,
2233 : size_t n)
2234 : {
2235 0 : auto acc = pimpl_->account_.lock();
2236 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2237 0 : std::lock_guard lk(conv->mtx);
2238 0 : if (conv->conversation) {
2239 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2240 0 : LogOptions options;
2241 0 : options.from = fromMessage;
2242 0 : options.nbOfCommits = n;
2243 0 : conv->conversation->loadMessages(
2244 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2245 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2246 0 : accountId,
2247 0 : conversationId,
2248 : messages);
2249 0 : },
2250 : options);
2251 0 : return id;
2252 0 : }
2253 0 : }
2254 0 : return 0;
2255 0 : }
2256 :
2257 : void
2258 0 : ConversationModule::clearCache(const std::string& conversationId)
2259 : {
2260 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2261 0 : std::lock_guard lk(conv->mtx);
2262 0 : if (conv->conversation) {
2263 0 : conv->conversation->clearCache();
2264 : }
2265 0 : }
2266 0 : }
2267 :
2268 : uint32_t
2269 2 : ConversationModule::loadConversation(const std::string& conversationId,
2270 : const std::string& fromMessage,
2271 : size_t n)
2272 : {
2273 2 : auto acc = pimpl_->account_.lock();
2274 2 : if (auto conv = pimpl_->getConversation(conversationId)) {
2275 2 : std::lock_guard lk(conv->mtx);
2276 2 : if (conv->conversation) {
2277 2 : const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2278 2 : LogOptions options;
2279 2 : options.from = fromMessage;
2280 2 : options.nbOfCommits = n;
2281 4 : conv->conversation->loadMessages2(
2282 2 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2283 4 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2284 2 : accountId,
2285 2 : conversationId,
2286 : messages);
2287 2 : },
2288 : options);
2289 2 : return id;
2290 2 : }
2291 4 : }
2292 0 : return 0;
2293 2 : }
2294 :
2295 : uint32_t
2296 0 : ConversationModule::loadConversationUntil(const std::string& conversationId,
2297 : const std::string& fromMessage,
2298 : const std::string& toMessage)
2299 : {
2300 0 : auto acc = pimpl_->account_.lock();
2301 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2302 0 : std::lock_guard lk(conv->mtx);
2303 0 : if (conv->conversation) {
2304 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2305 0 : LogOptions options;
2306 0 : options.from = fromMessage;
2307 0 : options.to = toMessage;
2308 0 : options.includeTo = true;
2309 0 : conv->conversation->loadMessages(
2310 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2311 0 : emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
2312 0 : accountId,
2313 0 : conversationId,
2314 : messages);
2315 0 : },
2316 : options);
2317 0 : return id;
2318 0 : }
2319 0 : }
2320 0 : return 0;
2321 0 : }
2322 :
2323 : uint32_t
2324 0 : ConversationModule::loadSwarmUntil(const std::string& conversationId,
2325 : const std::string& fromMessage,
2326 : const std::string& toMessage)
2327 : {
2328 0 : auto acc = pimpl_->account_.lock();
2329 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2330 0 : std::lock_guard lk(conv->mtx);
2331 0 : if (conv->conversation) {
2332 0 : const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2333 0 : LogOptions options;
2334 0 : options.from = fromMessage;
2335 0 : options.to = toMessage;
2336 0 : options.includeTo = true;
2337 0 : conv->conversation->loadMessages2(
2338 0 : [accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
2339 0 : emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
2340 0 : accountId,
2341 0 : conversationId,
2342 : messages);
2343 0 : },
2344 : options);
2345 0 : return id;
2346 0 : }
2347 0 : }
2348 0 : return 0;
2349 0 : }
2350 :
2351 : std::shared_ptr<TransferManager>
2352 70 : ConversationModule::dataTransfer(const std::string& conversationId) const
2353 : {
2354 70 : return pimpl_->withConversation(conversationId,
2355 138 : [](auto& conversation) { return conversation.dataTransfer(); });
2356 : }
2357 :
2358 : bool
2359 13 : ConversationModule::onFileChannelRequest(const std::string& conversationId,
2360 : const std::string& member,
2361 : const std::string& fileId,
2362 : bool verifyShaSum) const
2363 : {
2364 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2365 13 : std::filesystem::path path;
2366 13 : std::string sha3sum;
2367 13 : std::unique_lock lk(conv->mtx);
2368 13 : if (!conv->conversation)
2369 0 : return false;
2370 13 : if (!conv->conversation->onFileChannelRequest(member, fileId, path, sha3sum))
2371 0 : return false;
2372 :
2373 : // Release the lock here to prevent the sha3 calculation from blocking other threads.
2374 13 : lk.unlock();
2375 13 : if (!std::filesystem::is_regular_file(path)) {
2376 0 : JAMI_WARNING("[Account {:s}] [Conversation {}] {:s} asked for non existing file {}",
2377 : pimpl_->accountId_,
2378 : conversationId,
2379 : member,
2380 : fileId);
2381 0 : return false;
2382 : }
2383 : // Check that our file is correct before sending
2384 13 : if (verifyShaSum && sha3sum != fileutils::sha3File(path)) {
2385 3 : JAMI_WARNING("[Account {:s}] [Conversation {}] {:s} asked for file {:s}, but our version is not "
2386 : "complete or corrupted",
2387 : pimpl_->accountId_,
2388 : conversationId,
2389 : member,
2390 : fileId);
2391 1 : return false;
2392 : }
2393 12 : return true;
2394 26 : }
2395 0 : return false;
2396 : }
2397 :
2398 : bool
2399 13 : ConversationModule::downloadFile(const std::string& conversationId,
2400 : const std::string& interactionId,
2401 : const std::string& fileId,
2402 : const std::string& path)
2403 : {
2404 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2405 13 : std::lock_guard lk(conv->mtx);
2406 13 : if (conv->conversation)
2407 13 : return conv->conversation->downloadFile(interactionId, fileId, path, "", "");
2408 26 : }
2409 0 : return false;
2410 : }
2411 :
2412 : void
2413 636 : ConversationModule::syncConversations(const std::string& peer, const std::string& deviceId)
2414 : {
2415 : // Sync conversations where peer is member
2416 636 : std::set<std::string> toFetch;
2417 636 : std::set<std::string> toClone;
2418 1247 : for (const auto& conv : pimpl_->getSyncedConversations()) {
2419 611 : std::lock_guard lk(conv->mtx);
2420 611 : if (conv->conversation) {
2421 598 : if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
2422 436 : toFetch.emplace(conv->info.id);
2423 : }
2424 13 : } else if (!conv->info.isRemoved()
2425 33 : && std::find(conv->info.members.begin(), conv->info.members.end(), peer)
2426 33 : != conv->info.members.end()) {
2427 : // In this case the conversation was never cloned (can be after an import)
2428 10 : toClone.emplace(conv->info.id);
2429 : }
2430 1247 : }
2431 1072 : for (const auto& cid : toFetch)
2432 436 : pimpl_->fetchNewCommits(peer, deviceId, cid);
2433 646 : for (const auto& cid : toClone)
2434 10 : pimpl_->cloneConversation(deviceId, peer, cid);
2435 1272 : if (pimpl_->syncCnt.load() == 0)
2436 178 : emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_);
2437 636 : }
2438 :
2439 : void
2440 127 : ConversationModule::onSyncData(const SyncMsg& msg,
2441 : const std::string& peerId,
2442 : const std::string& deviceId)
2443 : {
2444 127 : std::vector<std::string> toClone;
2445 205 : for (const auto& [key, convInfo] : msg.c) {
2446 78 : const auto& convId = convInfo.id;
2447 : {
2448 78 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2449 78 : pimpl_->rmConversationRequest(convId);
2450 78 : }
2451 :
2452 78 : auto conv = pimpl_->startConversation(convInfo);
2453 78 : std::unique_lock lk(conv->mtx);
2454 : // Skip outdated info
2455 78 : if (std::max(convInfo.created, convInfo.removed)
2456 78 : < std::max(conv->info.created, conv->info.removed))
2457 3 : continue;
2458 75 : if (not convInfo.isRemoved()) {
2459 : // If multi devices, it can detect a conversation that was already
2460 : // removed, so just check if the convinfo contains a removed conv
2461 67 : if (conv->info.removed) {
2462 0 : if (conv->info.removed >= convInfo.created) {
2463 : // Only reclone if re-added, else the peer is not synced yet (could be
2464 : // offline before)
2465 0 : continue;
2466 : }
2467 0 : JAMI_DEBUG("Re-add previously removed conversation {:s}", convId);
2468 : }
2469 67 : conv->info = convInfo;
2470 67 : if (!conv->conversation) {
2471 32 : if (deviceId != "") {
2472 32 : pimpl_->cloneConversation(deviceId, peerId, conv);
2473 : } else {
2474 : // In this case, information is from JAMS
2475 : // JAMS does not store the conversation itself, so we
2476 : // must use information to clone the conversation
2477 0 : addConvInfo(convInfo);
2478 0 : toClone.emplace_back(convId);
2479 : }
2480 : }
2481 : } else {
2482 8 : if (conv->conversation && !conv->conversation->isRemoving()) {
2483 1 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
2484 : convId);
2485 1 : conv->conversation->setRemovingFlag();
2486 : }
2487 8 : auto update = false;
2488 8 : if (!conv->info.removed) {
2489 1 : update = true;
2490 1 : conv->info.removed = std::time(nullptr);
2491 : }
2492 8 : if (convInfo.erased && !conv->info.erased) {
2493 1 : conv->info.erased = std::time(nullptr);
2494 1 : pimpl_->addConvInfo(conv->info);
2495 1 : pimpl_->removeRepositoryImpl(*conv, false);
2496 7 : } else if (update) {
2497 1 : pimpl_->addConvInfo(conv->info);
2498 : }
2499 : }
2500 81 : }
2501 :
2502 127 : for (const auto& cid : toClone) {
2503 0 : auto members = getConversationMembers(cid);
2504 0 : for (const auto& member : members) {
2505 0 : if (member.at("uri") != pimpl_->username_)
2506 0 : cloneConversationFrom(cid, member.at("uri"));
2507 : }
2508 0 : }
2509 :
2510 146 : for (const auto& [convId, req] : msg.cr) {
2511 19 : if (req.from == pimpl_->username_) {
2512 0 : JAMI_WARNING("Detected request from ourself, ignore {}.", convId);
2513 15 : continue;
2514 0 : }
2515 19 : std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
2516 19 : if (pimpl_->isConversation(convId)) {
2517 : // Already handled request
2518 0 : pimpl_->rmConversationRequest(convId);
2519 0 : continue;
2520 : }
2521 :
2522 : // New request
2523 19 : if (!pimpl_->addConversationRequest(convId, req))
2524 10 : continue;
2525 9 : lk.unlock();
2526 :
2527 9 : if (req.declined != 0) {
2528 : // Request declined
2529 15 : JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
2530 : pimpl_->accountId_,
2531 : convId,
2532 : deviceId);
2533 5 : pimpl_->syncingMetadatas_.erase(convId);
2534 5 : pimpl_->saveMetadata();
2535 5 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
2536 : convId);
2537 5 : continue;
2538 5 : }
2539 :
2540 12 : JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})",
2541 : pimpl_->accountId_,
2542 : convId,
2543 : deviceId);
2544 :
2545 4 : emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
2546 : convId,
2547 8 : req.toMap());
2548 19 : }
2549 :
2550 : // Updates preferences for conversations
2551 127 : for (const auto& [convId, p] : msg.p) {
2552 0 : if (auto conv = pimpl_->getConversation(convId)) {
2553 0 : std::unique_lock lk(conv->mtx);
2554 0 : if (conv->conversation) {
2555 0 : auto conversation = conv->conversation;
2556 0 : lk.unlock();
2557 0 : conversation->updatePreferences(p);
2558 0 : } else if (conv->pending) {
2559 0 : conv->pending->preferences = p;
2560 : }
2561 0 : }
2562 : }
2563 :
2564 : // Updates displayed for conversations
2565 184 : for (const auto& [convId, ms] : msg.ms) {
2566 57 : if (auto conv = pimpl_->getConversation(convId)) {
2567 57 : std::unique_lock lk(conv->mtx);
2568 57 : if (conv->conversation) {
2569 23 : auto conversation = conv->conversation;
2570 23 : lk.unlock();
2571 23 : conversation->updateMessageStatus(ms);
2572 57 : } else if (conv->pending) {
2573 31 : conv->pending->status = ms;
2574 : }
2575 114 : }
2576 : }
2577 127 : }
2578 :
2579 : bool
2580 2 : ConversationModule::needsSyncingWith(const std::string& memberUri, const std::string& deviceId) const
2581 : {
2582 : // Check if a conversation needs to fetch remote or to be cloned
2583 2 : std::lock_guard lk(pimpl_->conversationsMtx_);
2584 2 : for (const auto& [key, ci] : pimpl_->conversations_) {
2585 1 : std::lock_guard lk(ci->mtx);
2586 1 : if (ci->conversation) {
2587 0 : if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false))
2588 0 : return true;
2589 1 : } else if (!ci->info.removed
2590 3 : && std::find(ci->info.members.begin(), ci->info.members.end(), memberUri)
2591 3 : != ci->info.members.end()) {
2592 : // In this case the conversation was never cloned (can be after an import)
2593 1 : return true;
2594 : }
2595 1 : }
2596 1 : return false;
2597 2 : }
2598 :
2599 : void
2600 1004 : ConversationModule::setFetched(const std::string& conversationId,
2601 : const std::string& deviceId,
2602 : const std::string& commitId)
2603 : {
2604 1004 : if (auto conv = pimpl_->getConversation(conversationId)) {
2605 1004 : std::lock_guard lk(conv->mtx);
2606 1004 : if (conv->conversation) {
2607 1004 : bool remove = conv->conversation->isRemoving();
2608 1004 : conv->conversation->hasFetched(deviceId, commitId);
2609 1004 : if (remove)
2610 1 : pimpl_->removeRepositoryImpl(*conv, true);
2611 : }
2612 2008 : }
2613 1004 : }
2614 :
2615 : void
2616 12919 : ConversationModule::fetchNewCommits(const std::string& peer,
2617 : const std::string& deviceId,
2618 : const std::string& conversationId,
2619 : const std::string& commitId)
2620 : {
2621 12919 : pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId);
2622 12919 : }
2623 :
2624 : void
2625 110 : ConversationModule::addConversationMember(const std::string& conversationId,
2626 : const dht::InfoHash& contactUri,
2627 : bool sendRequest)
2628 : {
2629 110 : auto conv = pimpl_->getConversation(conversationId);
2630 110 : if (not conv || not conv->conversation) {
2631 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2632 0 : return;
2633 : }
2634 110 : std::unique_lock lk(conv->mtx);
2635 :
2636 110 : auto contactUriStr = contactUri.toString();
2637 110 : if (conv->conversation->isMember(contactUriStr, true)) {
2638 0 : JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUriStr, conversationId);
2639 : // Note: This should not be necessary, but if for whatever reason the other side didn't
2640 : // join we should not forbid new invites
2641 0 : auto invite = conv->conversation->generateInvitation();
2642 0 : lk.unlock();
2643 0 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2644 0 : return;
2645 0 : }
2646 :
2647 110 : conv->conversation->addMember(
2648 : contactUriStr,
2649 110 : [this, conv, conversationId, sendRequest, contactUriStr](bool ok,
2650 321 : const std::string& commitId) {
2651 110 : if (ok) {
2652 108 : std::unique_lock lk(conv->mtx);
2653 108 : pimpl_->sendMessageNotification(*conv->conversation,
2654 : true,
2655 : commitId); // For the other members
2656 108 : if (sendRequest) {
2657 105 : auto invite = conv->conversation->generateInvitation();
2658 105 : lk.unlock();
2659 105 : pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
2660 105 : }
2661 108 : }
2662 110 : });
2663 110 : }
2664 :
2665 : void
2666 13 : ConversationModule::removeConversationMember(const std::string& conversationId,
2667 : const dht::InfoHash& contactUri,
2668 : bool isDevice)
2669 : {
2670 13 : auto contactUriStr = contactUri.toString();
2671 13 : if (auto conv = pimpl_->getConversation(conversationId)) {
2672 13 : std::lock_guard lk(conv->mtx);
2673 13 : if (conv->conversation)
2674 13 : return conv->conversation
2675 13 : ->removeMember(contactUriStr,
2676 : isDevice,
2677 24 : [this, conversationId](bool ok, const std::string& commitId) {
2678 13 : if (ok) {
2679 11 : pimpl_->sendMessageNotification(conversationId,
2680 : true,
2681 : commitId);
2682 : }
2683 26 : });
2684 26 : }
2685 13 : }
2686 :
2687 : std::vector<std::map<std::string, std::string>>
2688 76 : ConversationModule::getConversationMembers(const std::string& conversationId,
2689 : bool includeBanned) const
2690 : {
2691 76 : return pimpl_->getConversationMembers(conversationId, includeBanned);
2692 : }
2693 :
2694 : uint32_t
2695 0 : ConversationModule::countInteractions(const std::string& convId,
2696 : const std::string& toId,
2697 : const std::string& fromId,
2698 : const std::string& authorUri) const
2699 : {
2700 0 : if (auto conv = pimpl_->getConversation(convId)) {
2701 0 : std::lock_guard lk(conv->mtx);
2702 0 : if (conv->conversation)
2703 0 : return conv->conversation->countInteractions(toId, fromId, authorUri);
2704 0 : }
2705 0 : return 0;
2706 : }
2707 :
2708 : void
2709 0 : ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const
2710 : {
2711 0 : if (convId.empty()) {
2712 0 : auto convs = pimpl_->getConversations();
2713 0 : if (convs.empty()) {
2714 0 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2715 : req,
2716 0 : pimpl_->accountId_,
2717 0 : std::string {},
2718 0 : std::vector<std::map<std::string, std::string>> {});
2719 0 : return;
2720 : }
2721 0 : auto finishedFlag = std::make_shared<std::atomic_int>(convs.size());
2722 0 : for (const auto& conv : convs) {
2723 0 : conv->search(req, filter, finishedFlag);
2724 : }
2725 0 : } else if (auto conv = pimpl_->getConversation(convId)) {
2726 0 : std::lock_guard lk(conv->mtx);
2727 0 : if (conv->conversation)
2728 0 : conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1));
2729 0 : }
2730 : }
2731 :
2732 : void
2733 4 : ConversationModule::updateConversationInfos(const std::string& conversationId,
2734 : const std::map<std::string, std::string>& infos,
2735 : bool sync)
2736 : {
2737 4 : auto conv = pimpl_->getConversation(conversationId);
2738 4 : if (not conv or not conv->conversation) {
2739 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2740 0 : return;
2741 : }
2742 4 : std::lock_guard lk(conv->mtx);
2743 4 : conv->conversation
2744 4 : ->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) {
2745 4 : if (ok && sync) {
2746 4 : pimpl_->sendMessageNotification(conversationId, true, commitId);
2747 0 : } else if (sync)
2748 0 : JAMI_WARNING("Unable to update info on {:s}", conversationId);
2749 4 : });
2750 4 : }
2751 :
2752 : std::map<std::string, std::string>
2753 1 : ConversationModule::conversationInfos(const std::string& conversationId) const
2754 : {
2755 : {
2756 1 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2757 1 : auto itReq = pimpl_->conversationsRequests_.find(conversationId);
2758 1 : if (itReq != pimpl_->conversationsRequests_.end())
2759 0 : return itReq->second.metadatas;
2760 1 : }
2761 1 : if (auto conv = pimpl_->getConversation(conversationId)) {
2762 1 : std::lock_guard lk(conv->mtx);
2763 1 : std::map<std::string, std::string> md;
2764 : {
2765 1 : auto syncingMetadatasIt = pimpl_->syncingMetadatas_.find(conversationId);
2766 1 : if (syncingMetadatasIt != pimpl_->syncingMetadatas_.end()) {
2767 1 : if (conv->conversation) {
2768 0 : pimpl_->syncingMetadatas_.erase(syncingMetadatasIt);
2769 0 : pimpl_->saveMetadata();
2770 : } else {
2771 1 : md = syncingMetadatasIt->second;
2772 : }
2773 : }
2774 : }
2775 1 : if (conv->conversation)
2776 0 : return conv->conversation->infos();
2777 : else
2778 1 : return md;
2779 2 : }
2780 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2781 0 : return {};
2782 : }
2783 :
2784 : void
2785 1 : ConversationModule::setConversationPreferences(const std::string& conversationId,
2786 : const std::map<std::string, std::string>& prefs)
2787 : {
2788 1 : if (auto conv = pimpl_->getConversation(conversationId)) {
2789 1 : std::unique_lock lk(conv->mtx);
2790 1 : if (not conv->conversation) {
2791 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2792 0 : return;
2793 : }
2794 1 : auto conversation = conv->conversation;
2795 1 : lk.unlock();
2796 1 : conversation->updatePreferences(prefs);
2797 1 : auto msg = std::make_shared<SyncMsg>();
2798 2 : msg->p = {{conversationId, conversation->preferences(true)}};
2799 1 : pimpl_->needsSyncingCb_(std::move(msg));
2800 2 : }
2801 : }
2802 :
2803 : std::map<std::string, std::string>
2804 10 : ConversationModule::getConversationPreferences(const std::string& conversationId,
2805 : bool includeCreated) const
2806 : {
2807 10 : if (auto conv = pimpl_->getConversation(conversationId)) {
2808 10 : std::lock_guard lk(conv->mtx);
2809 10 : if (conv->conversation)
2810 10 : return conv->conversation->preferences(includeCreated);
2811 20 : }
2812 0 : return {};
2813 : }
2814 :
2815 : std::map<std::string, std::map<std::string, std::string>>
2816 156 : ConversationModule::convPreferences() const
2817 : {
2818 156 : std::map<std::string, std::map<std::string, std::string>> p;
2819 219 : for (const auto& conv : pimpl_->getConversations()) {
2820 63 : auto prefs = conv->preferences(true);
2821 63 : if (!prefs.empty())
2822 0 : p[conv->id()] = std::move(prefs);
2823 219 : }
2824 156 : return p;
2825 0 : }
2826 :
2827 : std::vector<uint8_t>
2828 0 : ConversationModule::conversationVCard(const std::string& conversationId) const
2829 : {
2830 0 : if (auto conv = pimpl_->getConversation(conversationId)) {
2831 0 : std::lock_guard lk(conv->mtx);
2832 0 : if (conv->conversation)
2833 0 : return conv->conversation->vCard();
2834 0 : }
2835 0 : JAMI_ERROR("Conversation {:s} does not exist", conversationId);
2836 0 : return {};
2837 : }
2838 :
2839 : bool
2840 3551 : ConversationModule::isBanned(const std::string& convId, const std::string& uri) const
2841 : {
2842 : dhtnet::tls::TrustStore::PermissionStatus status;
2843 : {
2844 3551 : std::lock_guard lk(pimpl_->conversationsMtx_);
2845 3551 : status = pimpl_->accountManager_->getCertificateStatus(uri);
2846 3551 : }
2847 3551 : if (auto conv = pimpl_->getConversation(convId)) {
2848 3544 : std::lock_guard lk(conv->mtx);
2849 3544 : if (!conv->conversation)
2850 40 : return true;
2851 3504 : if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE)
2852 2997 : return conv->conversation->isBanned(uri);
2853 : // If 1:1 we check the certificate status
2854 507 : return status == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
2855 7095 : }
2856 7 : return true;
2857 : }
2858 :
2859 : void
2860 24 : ConversationModule::removeContact(const std::string& uri, bool banned)
2861 : {
2862 : // Remove linked conversation's requests
2863 : {
2864 24 : std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
2865 24 : auto update = false;
2866 24 : for (auto it = pimpl_->conversationsRequests_.begin();
2867 28 : it != pimpl_->conversationsRequests_.end();
2868 4 : ++it) {
2869 4 : if (it->second.from == uri && !it->second.declined) {
2870 12 : JAMI_DEBUG("Declining conversation request {:s} from {:s}", it->first, uri);
2871 4 : pimpl_->syncingMetadatas_.erase(it->first);
2872 4 : pimpl_->saveMetadata();
2873 4 : emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(
2874 4 : pimpl_->accountId_, it->first);
2875 4 : update = true;
2876 4 : it->second.declined = std::time(nullptr);
2877 : }
2878 : }
2879 24 : if (update) {
2880 4 : pimpl_->saveConvRequests();
2881 4 : pimpl_->needsSyncingCb_({});
2882 : }
2883 24 : }
2884 24 : if (banned) {
2885 8 : auto conversationId = getOneToOneConversation(uri);
2886 13 : pimpl_->withConversation(conversationId, [&](auto& conv) { conv.shutdownConnections(); });
2887 8 : return; // Keep the conversation in banned model but stop connections
2888 8 : }
2889 :
2890 : // Removed contacts should not be linked to any conversation
2891 16 : pimpl_->accountManager_->updateContactConversation(uri, "");
2892 :
2893 : // Remove all one-to-one conversations with the removed contact
2894 16 : auto isSelf = uri == pimpl_->username_;
2895 16 : std::vector<std::string> toRm;
2896 15 : auto removeConvInfo = [&](const auto& conv, const auto& members) {
2897 1 : if ((isSelf && members.size() == 1)
2898 16 : || (!isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
2899 : // Mark the conversation as removed if it wasn't already
2900 14 : if (!conv->info.isRemoved()) {
2901 12 : conv->info.removed = std::time(nullptr);
2902 12 : emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
2903 12 : conv->info.id);
2904 12 : pimpl_->addConvInfo(conv->info);
2905 12 : return true;
2906 : }
2907 : }
2908 3 : return false;
2909 16 : };
2910 : {
2911 16 : std::lock_guard lk(pimpl_->conversationsMtx_);
2912 31 : for (auto& [convId, conv] : pimpl_->conversations_) {
2913 15 : std::lock_guard lk(conv->mtx);
2914 15 : if (conv->conversation) {
2915 : try {
2916 : // Note it's important to check getUsername(), else
2917 : // removing self can remove all conversations
2918 12 : if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) {
2919 12 : auto initMembers = conv->conversation->getInitialMembers();
2920 12 : if (removeConvInfo(conv, initMembers))
2921 11 : toRm.emplace_back(convId);
2922 12 : }
2923 0 : } catch (const std::exception& e) {
2924 0 : JAMI_WARN("%s", e.what());
2925 0 : }
2926 : } else {
2927 3 : removeConvInfo(conv, conv->info.members);
2928 : }
2929 15 : }
2930 16 : }
2931 27 : for (const auto& id : toRm)
2932 11 : pimpl_->removeRepository(id, true, true);
2933 16 : }
2934 :
2935 : bool
2936 6 : ConversationModule::removeConversation(const std::string& conversationId)
2937 : {
2938 6 : return pimpl_->removeConversation(conversationId);
2939 : }
2940 :
2941 : void
2942 0 : ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId)
2943 : {
2944 0 : if (auto conv = pimpl_->getConversation(oldConvId)) {
2945 0 : std::lock_guard lk(conv->mtx);
2946 0 : if (conv->conversation) {
2947 0 : std::promise<bool> waitLoad;
2948 0 : std::future<bool> fut = waitLoad.get_future();
2949 : // we should wait for loadMessage, because it will be deleted after this.
2950 0 : conv->conversation->loadMessages(
2951 0 : [&](auto&& messages) {
2952 0 : std::reverse(messages.begin(),
2953 : messages.end()); // Log is inverted as we want to replay
2954 0 : std::lock_guard lk(pimpl_->replayMtx_);
2955 0 : pimpl_->replay_[newConvId] = std::move(messages);
2956 0 : waitLoad.set_value(true);
2957 0 : },
2958 : {});
2959 0 : fut.wait();
2960 0 : }
2961 0 : }
2962 0 : }
2963 :
2964 : bool
2965 65 : ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const
2966 : {
2967 65 : if (conversationId.empty()) {
2968 55 : std::lock_guard lk(pimpl_->conversationsMtx_);
2969 55 : return std::find_if(pimpl_->conversations_.cbegin(),
2970 55 : pimpl_->conversations_.cend(),
2971 10 : [&](const auto& conv) {
2972 10 : return conv.second->conversation
2973 10 : && conv.second->conversation->isHosting(confId);
2974 : })
2975 110 : != pimpl_->conversations_.cend();
2976 65 : } else if (auto conv = pimpl_->getConversation(conversationId)) {
2977 10 : if (conv->conversation) {
2978 10 : return conv->conversation->isHosting(confId);
2979 : }
2980 10 : }
2981 0 : return false;
2982 : }
2983 :
2984 : std::vector<std::map<std::string, std::string>>
2985 17 : ConversationModule::getActiveCalls(const std::string& conversationId) const
2986 : {
2987 34 : return pimpl_->withConversation(conversationId, [](const auto& conversation) {
2988 17 : return conversation.currentCalls();
2989 17 : });
2990 : }
2991 :
2992 : std::shared_ptr<SIPCall>
2993 22 : ConversationModule::call(
2994 : const std::string& url,
2995 : const std::vector<libjami::MediaMap>& mediaList,
2996 : std::function<void(const std::string&, const DeviceId&, const std::shared_ptr<SIPCall>&)>&& cb)
2997 : {
2998 88 : std::string conversationId = "", confId = "", uri = "", deviceId = "";
2999 22 : if (url.find('/') == std::string::npos) {
3000 13 : conversationId = url;
3001 : } else {
3002 9 : auto parameters = jami::split_string(url, '/');
3003 9 : if (parameters.size() != 4) {
3004 0 : JAMI_ERROR("Incorrect url {:s}", url);
3005 0 : return {};
3006 : }
3007 9 : conversationId = parameters[0];
3008 9 : uri = parameters[1];
3009 9 : deviceId = parameters[2];
3010 9 : confId = parameters[3];
3011 9 : }
3012 :
3013 22 : auto conv = pimpl_->getConversation(conversationId);
3014 22 : if (!conv)
3015 0 : return {};
3016 22 : std::unique_lock lk(conv->mtx);
3017 22 : if (!conv->conversation) {
3018 0 : JAMI_ERROR("Conversation {:s} not found", conversationId);
3019 0 : return {};
3020 : }
3021 :
3022 : // Check if we want to join a specific conference
3023 : // So, if confId is specified or if there is some activeCalls
3024 : // or if we are the default host.
3025 22 : auto activeCalls = conv->conversation->currentCalls();
3026 22 : auto infos = conv->conversation->infos();
3027 22 : auto itRdvAccount = infos.find("rdvAccount");
3028 22 : auto itRdvDevice = infos.find("rdvDevice");
3029 22 : auto sendCallRequest = false;
3030 22 : if (!confId.empty()) {
3031 9 : sendCallRequest = true;
3032 27 : JAMI_DEBUG("Calling self, join conference");
3033 13 : } else if (!activeCalls.empty()) {
3034 : // Else, we try to join active calls
3035 0 : sendCallRequest = true;
3036 0 : auto& ac = *activeCalls.rbegin();
3037 0 : confId = ac.at("id");
3038 0 : uri = ac.at("uri");
3039 0 : deviceId = ac.at("device");
3040 16 : } else if (itRdvAccount != infos.end() && itRdvDevice != infos.end()
3041 16 : && !itRdvAccount->second.empty()) {
3042 : // Else, creates "to" (accountId/deviceId/conversationId/confId) and ask remote host
3043 3 : sendCallRequest = true;
3044 3 : uri = itRdvAccount->second;
3045 3 : deviceId = itRdvDevice->second;
3046 3 : confId = "0";
3047 9 : JAMI_DEBUG("Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
3048 : }
3049 22 : lk.unlock();
3050 :
3051 22 : auto account = pimpl_->account_.lock();
3052 : std::vector<libjami::MediaMap> mediaMap
3053 22 : = mediaList.empty() ? MediaAttribute::mediaAttributesToMediaMaps(
3054 41 : pimpl_->account_.lock()->createDefaultMediaList(
3055 60 : pimpl_->account_.lock()->isVideoEnabled()))
3056 41 : : mediaList;
3057 :
3058 22 : if (!sendCallRequest || (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_)) {
3059 11 : confId = confId == "0" ? Manager::instance().callFactory.getNewCallID() : confId;
3060 : // TODO attach host with media list
3061 11 : hostConference(conversationId, confId, "", mediaMap);
3062 11 : return {};
3063 : }
3064 :
3065 : // Else we need to create a call
3066 11 : auto& manager = Manager::instance();
3067 : std::shared_ptr<SIPCall> call = manager.callFactory.newSipCall(account,
3068 : Call::CallType::OUTGOING,
3069 11 : mediaMap);
3070 :
3071 11 : if (not call)
3072 0 : return {};
3073 :
3074 0 : auto callUri = fmt::format("{}/{}/{}/{}", conversationId, uri, deviceId, confId);
3075 44 : account->getIceOptions([call,
3076 11 : accountId = account->getAccountID(),
3077 : callUri,
3078 11 : uri = std::move(uri),
3079 : conversationId,
3080 : deviceId,
3081 11 : cb = std::move(cb)](auto&& opts) {
3082 11 : if (call->isIceEnabled()) {
3083 11 : if (not call->createIceMediaTransport(false)
3084 22 : or not call->initIceMediaTransport(true,
3085 : std::forward<dhtnet::IceTransportOptions>(opts))) {
3086 0 : return;
3087 : }
3088 : }
3089 33 : JAMI_DEBUG("New outgoing call with {}", uri);
3090 11 : call->setPeerNumber(uri);
3091 11 : call->setPeerUri("swarm:" + uri);
3092 :
3093 33 : JAMI_DEBUG("Calling: {:s}", callUri);
3094 11 : call->setState(Call::ConnectionState::TRYING);
3095 11 : call->setPeerNumber(callUri);
3096 11 : call->setPeerUri("rdv:" + callUri);
3097 11 : call->addStateListener([accountId, conversationId](Call::CallState call_state,
3098 : Call::ConnectionState cnx_state,
3099 : int) {
3100 62 : if (cnx_state == Call::ConnectionState::DISCONNECTED
3101 22 : && call_state == Call::CallState::MERROR) {
3102 1 : emitSignal<libjami::ConfigurationSignal::NeedsHost>(accountId, conversationId);
3103 1 : return true;
3104 : }
3105 61 : return true;
3106 : });
3107 11 : cb(callUri, DeviceId(deviceId), call);
3108 : });
3109 :
3110 11 : return call;
3111 22 : }
3112 :
3113 : void
3114 14 : ConversationModule::hostConference(const std::string& conversationId,
3115 : const std::string& confId,
3116 : const std::string& callId,
3117 : const std::vector<libjami::MediaMap>& mediaList)
3118 : {
3119 14 : auto acc = pimpl_->account_.lock();
3120 14 : if (!acc)
3121 0 : return;
3122 14 : auto conf = acc->getConference(confId);
3123 14 : auto createConf = !conf;
3124 14 : std::shared_ptr<SIPCall> call;
3125 14 : if (!callId.empty()) {
3126 3 : call = std::dynamic_pointer_cast<SIPCall>(acc->getCall(callId));
3127 3 : if (!call) {
3128 0 : JAMI_WARNING("No call with id {} found", callId);
3129 0 : return;
3130 : }
3131 : }
3132 14 : if (createConf) {
3133 14 : conf = std::make_shared<Conference>(acc, confId);
3134 14 : acc->attach(conf);
3135 : }
3136 :
3137 14 : if (!callId.empty())
3138 3 : conf->addSubCall(callId);
3139 :
3140 14 : if (callId.empty())
3141 11 : conf->attachHost(mediaList);
3142 :
3143 14 : if (createConf) {
3144 14 : emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(),
3145 : conversationId,
3146 14 : conf->getConfId());
3147 : } else {
3148 0 : conf->reportMediaNegotiationStatus();
3149 0 : emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(),
3150 0 : conf->getConfId(),
3151 : conf->getStateStr());
3152 0 : return;
3153 : }
3154 :
3155 14 : auto conv = pimpl_->getConversation(conversationId);
3156 14 : if (!conv)
3157 0 : return;
3158 14 : std::unique_lock lk(conv->mtx);
3159 14 : if (!conv->conversation) {
3160 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3161 0 : return;
3162 : }
3163 : // Add commit to conversation
3164 14 : Json::Value value;
3165 14 : value["uri"] = pimpl_->username_;
3166 14 : value["device"] = pimpl_->deviceId_;
3167 14 : value["confId"] = conf->getConfId();
3168 14 : value["type"] = "application/call-history+json";
3169 28 : conv->conversation->hostConference(std::move(value),
3170 14 : [w = pimpl_->weak(),
3171 : conversationId](bool ok, const std::string& commitId) {
3172 14 : if (ok) {
3173 14 : if (auto shared = w.lock())
3174 14 : shared->sendMessageNotification(conversationId,
3175 : true,
3176 14 : commitId);
3177 : } else {
3178 0 : JAMI_ERR("Failed to send message to conversation %s",
3179 : conversationId.c_str());
3180 : }
3181 14 : });
3182 :
3183 : // When conf finished = remove host & commit
3184 : // Master call, so when it's stopped, the conference will be stopped (as we use the hold
3185 : // state for detaching the call)
3186 28 : conf->onShutdown([w = pimpl_->weak(),
3187 14 : accountUri = pimpl_->username_,
3188 14 : confId = conf->getConfId(),
3189 : conversationId,
3190 : conv](int duration) {
3191 14 : auto shared = w.lock();
3192 14 : if (shared) {
3193 11 : Json::Value value;
3194 11 : value["uri"] = accountUri;
3195 11 : value["device"] = shared->deviceId_;
3196 11 : value["confId"] = confId;
3197 11 : value["type"] = "application/call-history+json";
3198 11 : value["duration"] = std::to_string(duration);
3199 :
3200 11 : std::lock_guard lk(conv->mtx);
3201 11 : if (!conv->conversation) {
3202 0 : JAMI_ERROR("Conversation {} not found", conversationId);
3203 0 : return;
3204 : }
3205 33 : conv->conversation->removeActiveConference(
3206 22 : std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
3207 11 : if (ok) {
3208 11 : if (auto shared = w.lock()) {
3209 11 : shared->sendMessageNotification(conversationId, true, commitId);
3210 11 : }
3211 : } else {
3212 0 : JAMI_ERROR("Failed to send message to conversation {}", conversationId);
3213 : }
3214 11 : });
3215 11 : }
3216 14 : });
3217 14 : }
3218 :
3219 : std::map<std::string, ConvInfo>
3220 743 : ConversationModule::convInfos(const std::string& accountId)
3221 : {
3222 1486 : return convInfosFromPath(fileutils::get_data_dir() / accountId);
3223 : }
3224 :
3225 : std::map<std::string, ConvInfo>
3226 776 : ConversationModule::convInfosFromPath(const std::filesystem::path& path)
3227 : {
3228 776 : std::map<std::string, ConvInfo> convInfos;
3229 : try {
3230 : // read file
3231 1552 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
3232 776 : auto file = fileutils::loadFile("convInfo", path);
3233 : // load values
3234 776 : msgpack::unpacked result;
3235 776 : msgpack::unpack(result, (const char*) file.data(), file.size());
3236 773 : result.get().convert(convInfos);
3237 785 : } catch (const std::exception& e) {
3238 3 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3239 3 : }
3240 776 : return convInfos;
3241 0 : }
3242 :
3243 : std::map<std::string, ConversationRequest>
3244 724 : ConversationModule::convRequests(const std::string& accountId)
3245 : {
3246 724 : auto path = fileutils::get_data_dir() / accountId;
3247 1448 : return convRequestsFromPath(path.string());
3248 724 : }
3249 :
3250 : std::map<std::string, ConversationRequest>
3251 757 : ConversationModule::convRequestsFromPath(const std::filesystem::path& path)
3252 : {
3253 757 : std::map<std::string, ConversationRequest> convRequests;
3254 : try {
3255 : // read file
3256 1514 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convRequests"));
3257 757 : auto file = fileutils::loadFile("convRequests", path);
3258 : // load values
3259 757 : msgpack::unpacked result;
3260 757 : msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
3261 757 : result.get().convert(convRequests);
3262 757 : } catch (const std::exception& e) {
3263 0 : JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
3264 0 : }
3265 757 : return convRequests;
3266 0 : }
3267 :
3268 : void
3269 135 : ConversationModule::addConvInfo(const ConvInfo& info)
3270 : {
3271 135 : pimpl_->addConvInfo(info);
3272 135 : }
3273 :
3274 : void
3275 1982 : ConversationModule::Impl::setConversationMembers(const std::string& convId,
3276 : const std::set<std::string>& members)
3277 : {
3278 1982 : if (auto conv = getConversation(convId)) {
3279 1982 : std::lock_guard lk(conv->mtx);
3280 1982 : conv->info.members = members;
3281 1982 : addConvInfo(conv->info);
3282 3964 : }
3283 1982 : }
3284 :
3285 : std::shared_ptr<Conversation>
3286 0 : ConversationModule::getConversation(const std::string& convId)
3287 : {
3288 0 : if (auto conv = pimpl_->getConversation(convId)) {
3289 0 : std::lock_guard lk(conv->mtx);
3290 0 : return conv->conversation;
3291 0 : }
3292 0 : return nullptr;
3293 : }
3294 :
3295 : std::shared_ptr<dhtnet::ChannelSocket>
3296 4965 : ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const
3297 : {
3298 4965 : if (auto conv = pimpl_->getConversation(convId)) {
3299 4965 : std::lock_guard lk(conv->mtx);
3300 4965 : if (conv->conversation)
3301 9328 : return conv->conversation->gitSocket(DeviceId(deviceId));
3302 301 : else if (conv->pending)
3303 299 : return conv->pending->socket;
3304 9930 : }
3305 2 : return nullptr;
3306 : }
3307 :
3308 : void
3309 0 : ConversationModule::addGitSocket(std::string_view deviceId,
3310 : std::string_view convId,
3311 : const std::shared_ptr<dhtnet::ChannelSocket>& channel)
3312 : {
3313 0 : if (auto conv = pimpl_->getConversation(convId)) {
3314 0 : std::lock_guard lk(conv->mtx);
3315 0 : conv->conversation->addGitSocket(DeviceId(deviceId), channel);
3316 0 : } else
3317 0 : JAMI_WARNING("addGitSocket: Unable to find conversation {:s}", convId);
3318 0 : }
3319 :
3320 : void
3321 762 : ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
3322 : {
3323 1512 : pimpl_->withConversation(convId, [&](auto& conv) { conv.removeGitSocket(DeviceId(deviceId)); });
3324 762 : }
3325 :
3326 : void
3327 589 : ConversationModule::shutdownConnections()
3328 : {
3329 901 : for (const auto& c : pimpl_->getSyncedConversations()) {
3330 312 : std::lock_guard lkc(c->mtx);
3331 312 : if (c->conversation)
3332 285 : c->conversation->shutdownConnections();
3333 312 : if (c->pending)
3334 13 : c->pending->socket = {};
3335 901 : }
3336 589 : }
3337 : void
3338 599 : ConversationModule::addSwarmChannel(const std::string& conversationId,
3339 : std::shared_ptr<dhtnet::ChannelSocket> channel)
3340 : {
3341 599 : pimpl_->withConversation(conversationId,
3342 598 : [&](auto& conv) { conv.addSwarmChannel(std::move(channel)); });
3343 599 : }
3344 :
3345 : void
3346 0 : ConversationModule::connectivityChanged()
3347 : {
3348 0 : for (const auto& conv : pimpl_->getConversations())
3349 0 : conv->connectivityChanged();
3350 0 : }
3351 :
3352 : std::shared_ptr<Typers>
3353 9 : ConversationModule::getTypers(const std::string& convId)
3354 : {
3355 9 : if (auto c = pimpl_->getConversation(convId)) {
3356 9 : std::lock_guard lk(c->mtx);
3357 9 : if (c->conversation)
3358 9 : return c->conversation->typers();
3359 18 : }
3360 0 : return nullptr;
3361 : }
3362 :
3363 : } // namespace jami
|