Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "conversation.h"
18 :
19 : #include "account_const.h"
20 : #include "fileutils.h"
21 : #include "jamiaccount.h"
22 : #include "client/ring_signal.h"
23 :
24 : #include <charconv>
25 : #include <json/json.h>
26 : #include <string_view>
27 : #include <opendht/thread_pool.h>
28 : #include <tuple>
29 : #include <optional>
30 : #include "swarm/swarm_manager.h"
31 : #ifdef ENABLE_PLUGIN
32 : #include "manager.h"
33 : #include "plugin/jamipluginmanager.h"
34 : #include "plugin/streamdata.h"
35 : #endif
36 : #include "jami/conversation_interface.h"
37 :
38 : namespace jami {
39 :
40 : static const char* const LAST_MODIFIED = "lastModified";
41 44 : static const auto jsonBuilder = [] {
42 44 : Json::StreamWriterBuilder wbuilder;
43 44 : wbuilder["commentStyle"] = "None";
44 44 : wbuilder["indentation"] = "";
45 44 : return wbuilder;
46 0 : }();
47 :
48 13 : ConvInfo::ConvInfo(const Json::Value& json)
49 : {
50 13 : id = json[ConversationMapKeys::ID].asString();
51 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
52 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
53 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
54 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
55 20 : members.emplace(v["uri"].asString());
56 : }
57 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
58 13 : }
59 :
60 : Json::Value
61 25 : ConvInfo::toJson() const
62 : {
63 25 : Json::Value json;
64 25 : json[ConversationMapKeys::ID] = id;
65 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
66 25 : if (removed) {
67 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
68 : }
69 25 : if (erased) {
70 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
71 : }
72 64 : for (const auto& m : members) {
73 39 : Json::Value member;
74 39 : member["uri"] = m;
75 39 : json[ConversationMapKeys::MEMBERS].append(member);
76 39 : }
77 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
78 25 : return json;
79 0 : }
80 :
81 : // ConversationRequest
82 343 : ConversationRequest::ConversationRequest(const Json::Value& json)
83 : {
84 343 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
85 343 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
86 343 : from = json[ConversationMapKeys::FROM].asString();
87 343 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
88 343 : auto& md = json[ConversationMapKeys::METADATAS];
89 689 : for (const auto& member : md.getMemberNames()) {
90 346 : metadatas.emplace(member, md[member].asString());
91 343 : }
92 343 : }
93 :
94 : Json::Value
95 2 : ConversationRequest::toJson() const
96 : {
97 2 : Json::Value json;
98 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
99 2 : json[ConversationMapKeys::FROM] = from;
100 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
101 2 : if (declined)
102 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
103 4 : for (const auto& [key, value] : metadatas) {
104 2 : json[ConversationMapKeys::METADATAS][key] = value;
105 : }
106 2 : return json;
107 0 : }
108 :
109 : std::map<std::string, std::string>
110 213 : ConversationRequest::toMap() const
111 : {
112 213 : auto result = metadatas;
113 213 : result[ConversationMapKeys::ID] = conversationId;
114 213 : result[ConversationMapKeys::FROM] = from;
115 213 : if (declined)
116 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
117 213 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
118 213 : return result;
119 0 : }
120 :
121 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
122 :
123 : struct History
124 : {
125 : MessageList messageList {};
126 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
127 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
128 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
129 : };
130 :
131 : class Conversation::Impl
132 : {
133 : public:
134 181 : Impl(const std::shared_ptr<JamiAccount>& account,
135 : ConversationMode mode,
136 : const std::string& otherMember = "")
137 181 : : repository_(ConversationRepository::createConversation(account, mode, otherMember))
138 181 : , account_(account)
139 181 : , accountId_(account->getAccountID())
140 181 : , userId_(account->getUsername())
141 543 : , deviceId_(account->currentDeviceId())
142 : {
143 181 : if (!repository_) {
144 0 : throw std::logic_error("Unable to create repository");
145 : }
146 181 : init(account);
147 181 : }
148 :
149 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
150 18 : : account_(account)
151 18 : , accountId_(account->getAccountID())
152 18 : , userId_(account->getUsername())
153 36 : , deviceId_(account->currentDeviceId())
154 : {
155 18 : repository_ = std::make_unique<ConversationRepository>(account, conversationId);
156 18 : if (!repository_) {
157 0 : throw std::logic_error("Unable to create repository");
158 : }
159 18 : init(account);
160 18 : }
161 :
162 190 : Impl(const std::shared_ptr<JamiAccount>& account,
163 : const std::string& remoteDevice,
164 : const std::string& conversationId)
165 190 : : account_(account)
166 190 : , accountId_(account->getAccountID())
167 190 : , userId_(account->getUsername())
168 380 : , deviceId_(account->currentDeviceId())
169 : {
170 190 : std::vector<ConversationCommit> commits;
171 380 : repository_ = ConversationRepository::cloneConversation(account,
172 : remoteDevice,
173 : conversationId,
174 184 : [&](auto c) {
175 184 : commits = std::move(c);
176 189 : });
177 189 : if (!repository_) {
178 10 : emitSignal<libjami::ConversationSignal::OnConversationError>(
179 5 : accountId_, conversationId, EFETCH, "Unable to clone repository");
180 5 : throw std::logic_error("Unable to clone repository");
181 : }
182 : // To detect current active calls, we need to check history
183 368 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
184 552 : / "conversation_data" / conversationId;
185 184 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
186 184 : initActiveCalls(repository_->convCommitsToMap(commits));
187 184 : init(account);
188 382 : }
189 :
190 383 : void init(const std::shared_ptr<JamiAccount>& account) {
191 383 : ioContext_ = Manager::instance().ioContext();
192 383 : fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
193 : swarmManager_
194 766 : = std::make_shared<SwarmManager>(NodeId(deviceId_),
195 766 : Manager::instance().getSeededRandomEngine(),
196 383 : [account = account_](const DeviceId& deviceId) {
197 2572 : if (auto acc = account.lock()) {
198 2572 : return acc->isConnectedWith(deviceId);
199 2572 : }
200 0 : return false;
201 383 : });
202 383 : swarmManager_->setMobility(account->isMobile());
203 : transferManager_
204 383 : = std::make_shared<TransferManager>(accountId_,
205 : "",
206 383 : repository_->id(),
207 766 : Manager::instance().getSeededRandomEngine());
208 766 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
209 1149 : / "conversation_data" / repository_->id();
210 383 : fetchedPath_ = conversationDataPath_ / "fetched";
211 383 : statusPath_ = conversationDataPath_ / "status";
212 383 : sendingPath_ = conversationDataPath_ / "sending";
213 383 : preferencesPath_ = conversationDataPath_ / ConversationMapKeys::PREFERENCES;
214 383 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
215 383 : hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
216 383 : loadActiveCalls();
217 383 : loadStatus();
218 383 : typers_ = std::make_shared<Typers>(account, repository_->id());
219 383 : }
220 :
221 1092 : const std::string& toString() const
222 : {
223 1092 : if (fmtStr_.empty()) {
224 372 : if (repository_->mode() == ConversationMode::ONE_TO_ONE) {
225 125 : auto peer = userId_;
226 374 : for (const auto& member : repository_->getInitialMembers()) {
227 249 : if (member != userId_) {
228 124 : peer = member;
229 : }
230 125 : }
231 125 : fmtStr_ = fmt::format("[Conversation (1:1) {}]", peer);
232 125 : } else {
233 494 : fmtStr_ = fmt::format("[Conversation {}]", repository_->id());
234 : }
235 : }
236 1092 : return fmtStr_;
237 : }
238 : mutable std::string fmtStr_;
239 :
240 383 : ~Impl()
241 : {
242 : try {
243 383 : if (fallbackTimer_)
244 383 : fallbackTimer_->cancel();
245 0 : } catch (const std::exception& e) {
246 0 : JAMI_ERROR("[Conversation {:s}] {:s}", toString(), e.what());
247 0 : }
248 383 : }
249 :
250 : /**
251 : * If, for whatever reason, the daemon is stopped while hosting a conference,
252 : * we need to announce the end of this call when restarting.
253 : * To avoid to keep active calls forever.
254 : */
255 : std::vector<std::string> commitsEndedCalls();
256 : bool isAdmin() const;
257 : std::filesystem::path repoPath() const;
258 :
259 280 : void announce(const std::string& commitId, bool commitFromSelf = false) const
260 : {
261 280 : std::vector<std::string> vec;
262 280 : if (!commitId.empty())
263 278 : vec.emplace_back(commitId);
264 280 : announce(vec, commitFromSelf);
265 280 : }
266 :
267 294 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false) const
268 : {
269 294 : std::vector<ConversationCommit> convcommits;
270 294 : convcommits.reserve(commits.size());
271 600 : for (const auto& cid : commits) {
272 306 : auto commit = repository_->getCommit(cid);
273 306 : if (commit != std::nullopt) {
274 306 : convcommits.emplace_back(*commit);
275 : }
276 306 : }
277 294 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
278 294 : }
279 :
280 : /**
281 : * Initialize activeCalls_ from the list of commits in the repository
282 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
283 : */
284 184 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
285 : {
286 184 : std::unordered_set<std::string> invalidHostUris;
287 184 : std::unordered_set<std::string> invalidCallIds;
288 :
289 184 : std::lock_guard lk(activeCallsMtx_);
290 1163 : for (const auto& commit : commits) {
291 979 : if (commit.at("type") == "member") {
292 : // Each commit of type "member" has an "action" field whose value can be one
293 : // of the following: "add", "join", "remove", "ban", "unban"
294 : // In the case of "remove" and "ban", we need to add the member's URI to
295 : // invalidHostUris to ensure that any call they may have started in the past
296 : // is no longer considered active.
297 : // For the other actions, there's no harm in adding the member's URI anyway,
298 : // since it's not possible to start hosting a call before joining the swarm (or
299 : // before getting unbanned in the case of previously banned members).
300 774 : invalidHostUris.emplace(commit.at("uri"));
301 411 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
302 411 : && commit.find("device") != commit.end()) {
303 : // The commit indicates either the end or the beginning of a call
304 : // (depending on whether there is a "duration" field or not).
305 1 : auto convId = repository_->id();
306 2 : auto confId = commit.at("confId");
307 2 : auto uri = commit.at("uri");
308 2 : auto device = commit.at("device");
309 :
310 2 : if (commit.find("duration") == commit.end()
311 1 : && invalidCallIds.find(confId) == invalidCallIds.end()
312 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
313 1 : std::map<std::string, std::string> activeCall;
314 1 : activeCall["id"] = confId;
315 1 : activeCall["uri"] = uri;
316 1 : activeCall["device"] = device;
317 1 : activeCalls_.emplace_back(activeCall);
318 0 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
319 : convId,
320 : confId,
321 : device,
322 : uri);
323 1 : }
324 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
325 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
326 : // there are sometimes multiple commits indicating the beginning of the same call.)
327 1 : invalidCallIds.emplace(confId);
328 1 : }
329 : }
330 184 : saveActiveCalls();
331 184 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
332 184 : repository_->id(),
333 184 : activeCalls_);
334 184 : }
335 :
336 : /**
337 : * Update activeCalls_ via announced commits (in load or via new commits)
338 : * @param commit Commit to check
339 : * @param eraseOnly If we want to ignore added commits
340 : * @param emitSig If we want to emit to client
341 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
342 : */
343 80 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
344 : bool eraseOnly = false,
345 : bool emitSig = true) const
346 : {
347 80 : if (!repository_)
348 0 : return;
349 80 : if (commit.at("type") == "member") {
350 : // In this case, we need to check if we are not removing a hosting member or device
351 20 : std::lock_guard lk(activeCallsMtx_);
352 20 : auto it = activeCalls_.begin();
353 20 : auto updateActives = false;
354 21 : while (it != activeCalls_.end()) {
355 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
356 3 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
357 : it->at("id"),
358 : commit.at("uri"));
359 1 : it = activeCalls_.erase(it);
360 1 : updateActives = true;
361 : } else {
362 0 : ++it;
363 : }
364 : }
365 20 : if (updateActives) {
366 1 : saveActiveCalls();
367 1 : if (emitSig)
368 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
369 1 : repository_->id(),
370 1 : activeCalls_);
371 : }
372 20 : return;
373 20 : }
374 : // Else, it's a call information
375 177 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
376 177 : && commit.find("device") != commit.end()) {
377 57 : auto convId = repository_->id();
378 114 : auto confId = commit.at("confId");
379 114 : auto uri = commit.at("uri");
380 114 : auto device = commit.at("device");
381 57 : std::lock_guard lk(activeCallsMtx_);
382 57 : auto itActive = std::find_if(activeCalls_.begin(),
383 : activeCalls_.end(),
384 26 : [&](const auto& value) {
385 78 : return value.at("id") == confId
386 52 : && value.at("uri") == uri
387 130 : && value.at("device") == device;
388 : });
389 57 : if (commit.find("duration") == commit.end()) {
390 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
391 93 : JAMI_DEBUG(
392 : "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
393 : convId,
394 : confId,
395 : device,
396 : uri);
397 31 : std::map<std::string, std::string> activeCall;
398 31 : activeCall["id"] = confId;
399 31 : activeCall["uri"] = uri;
400 31 : activeCall["device"] = device;
401 31 : activeCalls_.emplace_back(activeCall);
402 31 : saveActiveCalls();
403 31 : if (emitSig)
404 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
405 : repository_
406 31 : ->id(),
407 31 : activeCalls_);
408 31 : }
409 : } else {
410 26 : if (itActive != activeCalls_.end()) {
411 26 : itActive = activeCalls_.erase(itActive);
412 : // Unlikely, but we must ensure that no duplicate exists
413 26 : while (itActive != activeCalls_.end()) {
414 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
415 0 : return value.at("id") == confId && value.at("uri") == uri
416 0 : && value.at("device") == device;
417 : });
418 0 : if (itActive != activeCalls_.end()) {
419 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
420 0 : itActive = activeCalls_.erase(itActive);
421 : }
422 : }
423 :
424 26 : if (eraseOnly) {
425 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
426 : "{:s}, account {:s}",
427 : convId,
428 : confId,
429 : device,
430 : uri);
431 : } else {
432 78 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
433 : convId,
434 : confId,
435 : device,
436 : uri);
437 : }
438 : }
439 26 : saveActiveCalls();
440 26 : if (emitSig)
441 26 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
442 26 : repository_->id(),
443 26 : activeCalls_);
444 : }
445 57 : }
446 : }
447 :
448 1196 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false) const
449 : {
450 1196 : if (!repository_)
451 0 : return;
452 1196 : auto convId = repository_->id();
453 1196 : auto ok = !commits.empty();
454 3586 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
455 1196 : addToHistory(commits, true, commitFromSelf);
456 1196 : if (ok) {
457 1194 : bool announceMember = false;
458 2454 : for (const auto& c : commits) {
459 : // Announce member events
460 1260 : if (c.at("type") == "member") {
461 935 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
462 935 : const auto& uri = c.at("uri");
463 935 : const auto& actionStr = c.at("action");
464 935 : auto action = -1;
465 935 : if (actionStr == "add")
466 440 : action = 0;
467 495 : else if (actionStr == "join")
468 474 : action = 1;
469 21 : else if (actionStr == "remove")
470 1 : action = 2;
471 20 : else if (actionStr == "ban")
472 19 : action = 3;
473 1 : else if (actionStr == "unban")
474 1 : action = 4;
475 935 : if (actionStr == "ban" || actionStr == "remove") {
476 : // In this case, a potential host was removed during a call.
477 20 : updateActiveCalls(c);
478 20 : typers_->removeTyper(uri);
479 : }
480 935 : if (action != -1) {
481 935 : announceMember = true;
482 1870 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(
483 935 : accountId_, convId, uri, action);
484 : }
485 : }
486 325 : } else if (c.at("type") == "application/call-history+json") {
487 60 : updateActiveCalls(c);
488 : }
489 : #ifdef ENABLE_PLUGIN
490 : auto& pluginChatManager
491 1260 : = Manager::instance().getJamiPluginManager().getChatServicesManager();
492 1260 : if (pluginChatManager.hasHandlers()) {
493 4 : auto cm = std::make_shared<JamiMessage>(accountId_,
494 : convId,
495 8 : c.at("author") != userId_,
496 : c,
497 8 : false);
498 4 : cm->isSwarm = true;
499 4 : pluginChatManager.publishMessage(std::move(cm));
500 4 : }
501 : #endif
502 : // announce message
503 1260 : emitSignal<libjami::ConversationSignal::MessageReceived>(accountId_, convId, c);
504 : }
505 :
506 1194 : if (announceMember && onMembersChanged_) {
507 920 : onMembersChanged_(repository_->memberUris("", {}));
508 : }
509 : }
510 1196 : }
511 :
512 383 : void loadStatus()
513 : {
514 : try {
515 : // read file
516 764 : auto file = fileutils::loadFile(statusPath_);
517 : // load values
518 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
519 2 : std::lock_guard lk {messageStatusMtx_};
520 2 : oh.get().convert(messagesStatus_);
521 383 : } catch (const std::exception& e) {
522 381 : }
523 383 : }
524 1100 : void saveStatus()
525 : {
526 1100 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
527 1100 : msgpack::pack(file, messagesStatus_);
528 1100 : }
529 :
530 401 : void loadActiveCalls() const
531 : {
532 : try {
533 : // read file
534 790 : auto file = fileutils::loadFile(activeCallsPath_);
535 : // load values
536 12 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
537 12 : std::lock_guard lk {activeCallsMtx_};
538 12 : oh.get().convert(activeCalls_);
539 401 : } catch (const std::exception& e) {
540 389 : return;
541 389 : }
542 : }
543 :
544 260 : void saveActiveCalls() const
545 : {
546 260 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
547 260 : msgpack::pack(file, activeCalls_);
548 260 : }
549 :
550 18 : void loadHostedCalls() const
551 : {
552 : try {
553 : // read file
554 30 : auto file = fileutils::loadFile(hostedCallsPath_);
555 : // load values
556 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
557 6 : std::lock_guard lk {activeCallsMtx_};
558 6 : oh.get().convert(hostedCalls_);
559 18 : } catch (const std::exception& e) {
560 12 : return;
561 12 : }
562 : }
563 :
564 43 : void saveHostedCalls() const
565 : {
566 43 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
567 43 : msgpack::pack(file, hostedCalls_);
568 43 : }
569 :
570 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
571 :
572 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
573 : bool includeLeft,
574 : bool includeBanned) const;
575 :
576 7641 : std::string_view bannedType(const std::string& uri) const
577 : {
578 7641 : auto repo = repoPath();
579 0 : auto crt = fmt::format("{}.crt", uri);
580 15290 : auto bannedMember = repo / "banned" / "members" / crt;
581 7644 : if (std::filesystem::is_regular_file(bannedMember))
582 16 : return "members"sv;
583 15256 : auto bannedAdmin = repo / "banned" / "admins" / crt;
584 7628 : if (std::filesystem::is_regular_file(bannedAdmin))
585 0 : return "admins"sv;
586 15260 : auto bannedInvited = repo / "banned" / "invited" / uri;
587 7630 : if (std::filesystem::is_regular_file(bannedInvited))
588 0 : return "invited"sv;
589 15259 : auto bannedDevice = repo / "banned" / "devices" / crt;
590 7630 : if (std::filesystem::is_regular_file(bannedDevice))
591 0 : return "devices"sv;
592 7630 : return {};
593 7646 : }
594 :
595 5560 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
596 : {
597 5560 : auto deviceSockets = gitSocketList_.find(deviceId);
598 11120 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
599 : }
600 :
601 2605 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
602 : {
603 2605 : gitSocketList_[deviceId] = socket;
604 2606 : }
605 791 : void removeGitSocket(const DeviceId& deviceId)
606 : {
607 791 : auto deviceSockets = gitSocketList_.find(deviceId);
608 791 : if (deviceSockets != gitSocketList_.end())
609 434 : gitSocketList_.erase(deviceSockets);
610 791 : }
611 :
612 : /**
613 : * Remove all git sockets and all DRT nodes associated with the given peer.
614 : * This is used when a swarm member is banned to ensure that we stop syncing
615 : * with them or sending them message notifications.
616 : */
617 : void disconnectFromPeer(const std::string& peerUri);
618 :
619 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
620 : bool includeLeft) const;
621 :
622 : std::mutex membersMtx_ {};
623 : std::set<std::string> checkedMembers_; // Store members we tried
624 : std::function<void()> bootstrapCb_;
625 : #ifdef LIBJAMI_TESTABLE
626 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
627 : #endif
628 :
629 : std::mutex writeMtx_ {};
630 : std::unique_ptr<ConversationRepository> repository_;
631 : std::shared_ptr<SwarmManager> swarmManager_;
632 : std::weak_ptr<JamiAccount> account_;
633 : std::string accountId_ {};
634 : std::string userId_;
635 : std::string deviceId_;
636 : std::atomic_bool isRemoving_ {false};
637 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
638 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
639 : History* optHistory = nullptr);
640 : void pull(const std::string& deviceId);
641 : std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
642 :
643 : // Avoid multiple fetch/merges at the same time.
644 : std::mutex pullcbsMtx_ {};
645 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch
646 : std::shared_ptr<TransferManager> transferManager_ {};
647 : std::filesystem::path conversationDataPath_ {};
648 : std::filesystem::path fetchedPath_ {};
649 :
650 : // Manage last message displayed and status
651 : std::filesystem::path sendingPath_ {};
652 : std::filesystem::path preferencesPath_ {};
653 : OnMembersChanged onMembersChanged_ {};
654 :
655 : // Manage hosted calls on this device
656 : std::filesystem::path hostedCallsPath_ {};
657 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
658 : // Manage active calls for this conversation (can be hosted by other devices)
659 : std::filesystem::path activeCallsPath_ {};
660 : mutable std::mutex activeCallsMtx_ {};
661 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
662 :
663 : GitSocketList gitSocketList_ {};
664 :
665 : // Bootstrap
666 : std::shared_ptr<asio::io_context> ioContext_;
667 : std::unique_ptr<asio::steady_timer> fallbackTimer_;
668 :
669 :
670 : /**
671 : * Loaded history represents the linearized history to show for clients
672 : */
673 : mutable History loadedHistory_ {};
674 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
675 : const std::vector<std::map<std::string, std::string>>& commits,
676 : bool messageReceived = false,
677 : bool commitFromSelf = false,
678 : History* history = nullptr) const;
679 : // While loading the history, we need to avoid:
680 : // - reloading history (can just be ignored)
681 : // - adding new commits (should wait for history to be loaded)
682 : bool isLoadingHistory_ {false};
683 : mutable std::mutex historyMtx_ {};
684 : mutable std::condition_variable historyCv_ {};
685 :
686 : void handleReaction(History& history,
687 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
688 : void handleEdition(History& history,
689 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
690 : bool messageReceived) const;
691 : bool handleMessage(History& history,
692 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
693 : bool messageReceived) const;
694 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
695 : History& history) const;
696 : /**
697 : * {uri, {
698 : * {"fetch", "commitId"},
699 : * {"fetched_ts", "timestamp"},
700 : * {"read", "commitId"},
701 : * {"read_ts", "timestamp"}
702 : * }
703 : * }
704 : */
705 : mutable std::mutex messageStatusMtx_;
706 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
707 : std::filesystem::path statusPath_ {};
708 : mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
709 : /**
710 : * Status: 0 = commited, 1 = fetched, 2 = read
711 : * This cache the curent status to add in the messages
712 : */
713 : // Note: only store int32_t cause it's easy to pass to dbus this way
714 : // memberToStatus serves as a cache for loading messages
715 : mutable std::map<std::string, int32_t> memberToStatus;
716 :
717 :
718 : // futureStatus is used to store the status for receiving messages
719 : // (because we're not sure to fetch the commit before receiving a status change for this)
720 : mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
721 : // Update internal structures regarding status
722 : void updateStatus(const std::string& uri,
723 : libjami::Account::MessageStates status,
724 : const std::string& commitId,
725 : const std::string& ts,
726 : bool emit = false);
727 :
728 :
729 : std::shared_ptr<Typers> typers_;
730 : };
731 :
732 : bool
733 17 : Conversation::Impl::isAdmin() const
734 : {
735 34 : auto adminsPath = repoPath() / "admins";
736 34 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
737 17 : }
738 :
739 : void
740 17 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
741 : {
742 : // Remove nodes from swarmManager
743 17 : const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
744 17 : std::vector<NodeId> toRemove;
745 40 : for (const auto node : nodes)
746 23 : if (peerUri == repository_->uriFromDevice(node.toString()))
747 12 : toRemove.emplace_back(node);
748 17 : swarmManager_->deleteNode(toRemove);
749 :
750 : // Remove git sockets with this member
751 40 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
752 23 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
753 12 : it = gitSocketList_.erase(it);
754 : else
755 11 : ++it;
756 : }
757 17 : }
758 :
759 : std::vector<std::map<std::string, std::string>>
760 423 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
761 : {
762 423 : std::vector<std::map<std::string, std::string>> result;
763 423 : auto members = repository_->members();
764 423 : std::lock_guard lk(messageStatusMtx_);
765 1150 : for (const auto& member : members) {
766 727 : if (member.role == MemberRole::BANNED && !includeBanned) {
767 166 : continue;
768 : }
769 727 : if (member.role == MemberRole::INVITED && !includeInvited)
770 166 : continue;
771 561 : if (member.role == MemberRole::LEFT && !includeLeft)
772 0 : continue;
773 561 : auto mm = member.map();
774 561 : mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
775 561 : result.emplace_back(std::move(mm));
776 561 : }
777 846 : return result;
778 423 : }
779 :
780 : std::vector<std::string>
781 18 : Conversation::Impl::commitsEndedCalls()
782 : {
783 : // Handle current calls
784 18 : std::vector<std::string> commits {};
785 18 : std::unique_lock lk(writeMtx_);
786 18 : std::unique_lock lkA(activeCallsMtx_);
787 18 : for (const auto& hostedCall : hostedCalls_) {
788 : // In this case, this means that we left
789 : // the conference while still hosting it, so activeCalls
790 : // will not be correctly updated
791 : // We don't need to send notifications there, as peers will sync with presence
792 0 : Json::Value value;
793 0 : value["uri"] = userId_;
794 0 : value["device"] = deviceId_;
795 0 : value["confId"] = hostedCall.first;
796 0 : value["type"] = "application/call-history+json";
797 0 : auto now = std::chrono::system_clock::now();
798 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
799 0 : .count();
800 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
801 0 : auto itActive = std::find_if(activeCalls_.begin(),
802 : activeCalls_.end(),
803 0 : [this, confId = hostedCall.first](const auto& value) {
804 0 : return value.at("id") == confId && value.at("uri") == userId_
805 0 : && value.at("device") == deviceId_;
806 : });
807 0 : if (itActive != activeCalls_.end())
808 0 : activeCalls_.erase(itActive);
809 0 : commits.emplace_back(repository_->commitMessage(Json::writeString(jsonBuilder, value)));
810 :
811 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
812 0 : }
813 18 : hostedCalls_.clear();
814 18 : saveActiveCalls();
815 18 : saveHostedCalls();
816 36 : return commits;
817 18 : }
818 :
819 : std::filesystem::path
820 11748 : Conversation::Impl::repoPath() const
821 : {
822 23502 : return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
823 : }
824 :
825 : std::vector<std::map<std::string, std::string>>
826 25 : Conversation::Impl::loadMessages(const LogOptions& options)
827 : {
828 25 : if (!repository_)
829 0 : return {};
830 25 : std::vector<ConversationCommit> commits;
831 25 : auto startLogging = options.from == "";
832 25 : auto breakLogging = false;
833 50 : repository_->log(
834 91 : [&](const auto& id, const auto& author, const auto& commit) {
835 91 : if (!commits.empty()) {
836 : // Set linearized parent
837 66 : commits.rbegin()->linearized_parent = id;
838 : }
839 91 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
840 0 : return CallbackResult::Skip;
841 : }
842 91 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
843 0 : return CallbackResult::Break; // Stop logging
844 91 : if (breakLogging)
845 0 : return CallbackResult::Break; // Stop logging
846 91 : if (id == options.to) {
847 18 : if (options.includeTo)
848 0 : breakLogging = true; // For the next commit
849 : else
850 18 : return CallbackResult::Break; // Stop logging
851 : }
852 :
853 73 : if (!startLogging && options.from != "" && options.from == id)
854 0 : startLogging = true;
855 73 : if (!startLogging)
856 0 : return CallbackResult::Skip; // Start logging after this one
857 :
858 73 : if (options.fastLog) {
859 0 : if (options.authorUri != "") {
860 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
861 0 : return CallbackResult::Break; // Found author, stop
862 : }
863 : }
864 : // Used to only count commit
865 0 : commits.emplace(commits.end(), ConversationCommit {});
866 0 : return CallbackResult::Skip;
867 : }
868 :
869 73 : return CallbackResult::Ok; // Continue
870 : },
871 73 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
872 73 : [](auto, auto, auto) { return false; },
873 25 : options.from,
874 25 : options.logIfNotFound);
875 25 : return repository_->convCommitsToMap(commits);
876 25 : }
877 :
878 : std::vector<libjami::SwarmMessage>
879 1570 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
880 : {
881 1570 : if (!optHistory) {
882 2 : std::lock_guard lock(historyMtx_);
883 2 : if (!repository_ || isLoadingHistory_)
884 0 : return {};
885 2 : isLoadingHistory_ = true;
886 2 : }
887 :
888 : // By convention, if options.nbOfCommits is zero, then we
889 : // don't impose a limit on the number of commits returned.
890 1570 : bool limitNbOfCommits = options.nbOfCommits > 0;
891 :
892 1570 : auto startLogging = options.from == "";
893 1570 : auto breakLogging = false;
894 1570 : auto currentHistorySize = loadedHistory_.messageList.size();
895 1570 : std::vector<std::string> replies;
896 1570 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
897 3140 : repository_->log(
898 : /* preCondition */
899 11376 : [&](const auto& id, const auto& author, const auto& commit) {
900 11376 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
901 2 : return CallbackResult::Skip;
902 : }
903 11374 : if (id == options.to) {
904 566 : if (options.includeTo)
905 0 : breakLogging = true; // For the next commit
906 : }
907 11374 : if (replies.empty()) { // This avoid load until
908 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
909 : // until this commit
910 22748 : if ((limitNbOfCommits
911 12175 : && (loadedHistory_.messageList.size() - currentHistorySize)
912 801 : == options.nbOfCommits))
913 3 : return CallbackResult::Break; // Stop logging
914 11371 : if (breakLogging)
915 0 : return CallbackResult::Break; // Stop logging
916 11371 : if (id == options.to && !options.includeTo) {
917 565 : return CallbackResult::Break; // Stop logging
918 : }
919 : }
920 :
921 10806 : if (!startLogging && options.from != "" && options.from == id)
922 1095 : startLogging = true;
923 10807 : if (!startLogging)
924 25 : return CallbackResult::Skip; // Start logging after this one
925 :
926 10782 : if (options.fastLog) {
927 9976 : if (options.authorUri != "") {
928 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
929 1 : return CallbackResult::Break; // Found author, stop
930 : }
931 : }
932 : }
933 :
934 10781 : return CallbackResult::Ok; // Continue
935 : },
936 : /* emplaceCb */
937 10780 : [&](auto&& cc) {
938 10780 : if(limitNbOfCommits && (msgList.size() == options.nbOfCommits))
939 339 : return;
940 10442 : auto optMessage = repository_->convCommitToMap(cc);
941 10443 : if (!optMessage.has_value())
942 1 : return;
943 10441 : auto message = optMessage.value();
944 10442 : if (message.find("reply-to") != message.end()) {
945 1 : auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
946 1 : if(it == replies.end()) {
947 1 : replies.emplace_back(message.at("reply-to"));
948 : }
949 : }
950 10441 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
951 10442 : if (it != replies.end()) {
952 1 : replies.erase(it);
953 : }
954 10442 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
955 10442 : if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) {
956 0 : firstMsg = *loadedHistory_.messageList.rbegin();
957 : }
958 31322 : auto added = addToHistory({message}, false, false, optHistory);
959 10441 : if (!added.empty() && firstMsg) {
960 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
961 0 : repository_->id(),
962 0 : *firstMsg);
963 : }
964 10441 : msgList.insert(msgList.end(), added.begin(), added.end());
965 10443 : },
966 : /* postCondition */
967 10781 : [&](auto, auto, auto) {
968 : // Stop logging if there was a limit set on the number of commits
969 : // to return and we reached it. This isn't strictly necessary since
970 : // the check at the beginning of `emplaceCb` ensures that we won't
971 : // return too many messages, but it prevents us from needlessly
972 : // iterating over a (potentially) large number of commits.
973 10781 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
974 : },
975 1570 : options.from,
976 1570 : options.logIfNotFound);
977 :
978 : // Convert for client (remove ptr)
979 1570 : std::vector<libjami::SwarmMessage> ret;
980 1570 : ret.reserve(msgList.size());
981 12003 : for (const auto& msg: msgList) {
982 10433 : ret.emplace_back(*msg);
983 : }
984 1570 : if (!optHistory) {
985 2 : std::lock_guard lock(historyMtx_);
986 2 : isLoadingHistory_ = false;
987 2 : historyCv_.notify_all();
988 2 : }
989 :
990 1570 : return ret;
991 1570 : }
992 :
993 : void
994 3 : Conversation::Impl::handleReaction(History& history,
995 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
996 : {
997 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
998 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
999 3 : if (peditIt != history.pendingEditions.end()) {
1000 0 : auto oldBody = sharedCommit->body;
1001 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1002 0 : if (sharedCommit->body.at("body").empty())
1003 0 : return;
1004 0 : history.pendingEditions.erase(peditIt);
1005 0 : }
1006 3 : if (it != history.quickAccess.end()) {
1007 3 : it->second->reactions.emplace_back(sharedCommit->body);
1008 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
1009 3 : repository_->id(),
1010 3 : it->second->id,
1011 3 : sharedCommit->body);
1012 : } else {
1013 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
1014 : }
1015 : }
1016 :
1017 : void
1018 8 : Conversation::Impl::handleEdition(History& history,
1019 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1020 : bool messageReceived) const
1021 : {
1022 16 : auto editId = sharedCommit->body.at("edit");
1023 8 : auto it = history.quickAccess.find(editId);
1024 8 : if (it != history.quickAccess.end()) {
1025 6 : auto baseCommit = it->second;
1026 6 : if (baseCommit) {
1027 6 : auto itReact = baseCommit->body.find("react-to");
1028 6 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
1029 12 : "tid" : "body";
1030 6 : auto body = sharedCommit->body.at(toReplace);
1031 : // Edit reaction
1032 6 : if (itReact != baseCommit->body.end()) {
1033 1 : baseCommit->body[toReplace] = body; // Replace body if pending
1034 1 : it = history.quickAccess.find(itReact->second);
1035 1 : auto itPending = history.pendingReactions.find(itReact->second);
1036 1 : if (it != history.quickAccess.end()) {
1037 1 : baseCommit = it->second; // Base commit
1038 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1039 1 : baseCommit->reactions.end(),
1040 1 : [&](const auto& reaction) {
1041 1 : return reaction.at("id") == editId;
1042 : });
1043 1 : if (itPreviousReact != baseCommit->reactions.end()) {
1044 1 : (*itPreviousReact)[toReplace] = body;
1045 1 : if (body.empty()) {
1046 1 : baseCommit->reactions.erase(itPreviousReact);
1047 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
1048 : repository_
1049 1 : ->id(),
1050 1 : baseCommit->id,
1051 : editId);
1052 : }
1053 : }
1054 0 : } else if (itPending != history.pendingReactions.end()) {
1055 : // Else edit if pending
1056 0 : auto itReaction = std::find_if(itPending->second.begin(),
1057 0 : itPending->second.end(),
1058 0 : [&](const auto& reaction) {
1059 0 : return reaction.at("id") == editId;
1060 : });
1061 0 : if (itReaction != itPending->second.end()) {
1062 0 : (*itReaction)[toReplace] = body;
1063 0 : if (body.empty())
1064 0 : itPending->second.erase(itReaction);
1065 : }
1066 : } else {
1067 : // Add to pending edtions
1068 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1069 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1070 : }
1071 : } else {
1072 : // Normal message
1073 5 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1074 5 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1075 5 : if (toReplace == "tid") {
1076 : // Avoid to replace fileId in client
1077 1 : it->second->body["fileId"] = "";
1078 : }
1079 : // Remove reactions
1080 5 : if (sharedCommit->body.at(toReplace).empty())
1081 2 : it->second->reactions.clear();
1082 5 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1083 : }
1084 6 : }
1085 6 : } else {
1086 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1087 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1088 : }
1089 8 : }
1090 :
1091 : bool
1092 11671 : Conversation::Impl::handleMessage(History& history,
1093 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1094 : bool messageReceived) const
1095 : {
1096 11671 : if (messageReceived) {
1097 : // For a received message, we place it at the beginning of the list
1098 1218 : if (!history.messageList.empty())
1099 974 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1100 1218 : history.messageList.emplace_front(sharedCommit);
1101 : } else {
1102 : // For a loaded message, we load from newest to oldest
1103 : // So we change the parent of the last message.
1104 10453 : if (!history.messageList.empty())
1105 8893 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1106 10452 : history.messageList.emplace_back(sharedCommit);
1107 : }
1108 : // Handle pending reactions/editions
1109 11669 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1110 11670 : if (reactIt != history.pendingReactions.end()) {
1111 0 : for (const auto& commitBody : reactIt->second)
1112 0 : sharedCommit->reactions.emplace_back(commitBody);
1113 0 : history.pendingReactions.erase(reactIt);
1114 : }
1115 11670 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1116 11671 : if (peditIt != history.pendingEditions.end()) {
1117 0 : auto oldBody = sharedCommit->body;
1118 0 : if (sharedCommit->type == "application/data-transfer+json") {
1119 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1120 0 : sharedCommit->body["fileId"] = "";
1121 : } else {
1122 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1123 : }
1124 0 : peditIt->second.pop_front();
1125 0 : for (const auto& commit : peditIt->second) {
1126 0 : sharedCommit->editions.emplace_back(commit->body);
1127 : }
1128 0 : sharedCommit->editions.emplace_back(oldBody);
1129 0 : history.pendingEditions.erase(peditIt);
1130 0 : }
1131 : // Announce to client
1132 11671 : if (messageReceived)
1133 2436 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
1134 1218 : repository_->id(),
1135 1218 : *sharedCommit);
1136 11671 : return !messageReceived;
1137 : }
1138 :
1139 11682 : void Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
1140 : History& history) const
1141 : {
1142 :
1143 11682 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1144 11682 : auto currentMessage = message;
1145 :
1146 23731 : while(parentIt != history.quickAccess.end()){
1147 12050 : const auto& parent = parentIt->second;
1148 12824 : for (const auto& [peer, value] : message->status) {
1149 11729 : auto parentStatusIt = parent->status.find(peer);
1150 11729 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1151 774 : parent->status[peer] = value;
1152 1548 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
1153 774 : accountId_,
1154 774 : repository_->id(),
1155 : peer,
1156 774 : parent->id,
1157 : value);
1158 : }
1159 10955 : else if(parentStatusIt->second >= value){
1160 10955 : break;
1161 : }
1162 : }
1163 12050 : currentMessage = parent;
1164 12050 : parentIt = history.quickAccess.find(parent->linearizedParent);
1165 : }
1166 11679 : }
1167 :
1168 :
1169 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1170 11658 : Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::string>>& commits,
1171 : bool messageReceived,
1172 : bool commitFromSelf,
1173 : History* optHistory) const
1174 : {
1175 11658 : auto acc = account_.lock();
1176 11658 : if (!acc)
1177 0 : return {};
1178 11658 : auto username = acc->getUsername();
1179 11658 : if (messageReceived && (!optHistory && isLoadingHistory_)) {
1180 0 : std::unique_lock lk(historyMtx_);
1181 0 : historyCv_.wait(lk, [&] { return !isLoadingHistory_; });
1182 0 : }
1183 11658 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1184 11721 : auto addCommit = [&](const auto& commit) {
1185 11721 : auto* history = optHistory ? optHistory : &loadedHistory_;
1186 23442 : auto commitId = commit.at("id");
1187 11721 : if (history->quickAccess.find(commitId) != history->quickAccess.end())
1188 9 : return; // Already present
1189 11713 : auto typeIt = commit.find("type");
1190 11712 : auto reactToIt = commit.find("react-to");
1191 11713 : auto editIt = commit.find("edit");
1192 : // Nothing to show for the client, skip
1193 11713 : if (typeIt != commit.end() && typeIt->second == "merge")
1194 31 : return;
1195 :
1196 11681 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1197 11681 : sharedCommit->fromMapStringString(commit);
1198 : // Set message status based on cache (only on history for client)
1199 11681 : if (!commitFromSelf && optHistory == nullptr) {
1200 929 : std::lock_guard lk(messageStatusMtx_);
1201 13582 : for (const auto& member: repository_->members()) {
1202 : // If we have a status cached, use it
1203 12652 : auto itFuture = futureStatus.find(sharedCommit->id);
1204 12653 : if (itFuture != futureStatus.end()) {
1205 20 : sharedCommit->status = std::move(itFuture->second);
1206 20 : futureStatus.erase(itFuture);
1207 940 : continue;
1208 : }
1209 : // Else we need to compute the status.
1210 12634 : auto& cache = memberToStatus[member.uri];
1211 12634 : if (cache == 0) {
1212 : // Message is sending, sent or displayed
1213 1039 : cache = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1214 : }
1215 12634 : if (!messageReceived) {
1216 : // For loading previous messages, there is 3 cases. Last value cached is displayed, so is every previous commits
1217 : // Else, if last value is sent, we can compare to the last read commit to update the cache
1218 : // Finally if it's sending, we check last fetched commit
1219 12 : if (cache == static_cast<int32_t>(libjami::Account::MessageStates::SENT)) {
1220 0 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1221 0 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1222 : }
1223 12 : } else if (cache <= static_cast<int32_t>(libjami::Account::MessageStates::SENDING)) { // SENDING or UNKNOWN
1224 : // cache can be upgraded to displayed or sent
1225 9 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1226 1 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1227 8 : } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1228 0 : cache = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1229 : }
1230 : }
1231 12 : if(static_cast<int32_t>(cache) > sharedCommit->status[member.uri]){
1232 12 : sharedCommit->status[member.uri] = static_cast<int32_t>(cache);
1233 : }
1234 : } else {
1235 : // If member is author of the message received, they already saw it
1236 12622 : if (member.uri == commit.at("author")) {
1237 : // If member is the author of the commit, they are considered as displayed (same for all previous commits)
1238 920 : messagesStatus_[member.uri]["read"] = sharedCommit->id;
1239 920 : messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1240 920 : sharedCommit->status[commit.at("author")] = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1241 920 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1242 920 : continue;
1243 : }
1244 : // For receiving messages, every commit is considered as SENDING, unless we got a update
1245 11700 : auto status = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1246 11700 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1247 0 : status = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1248 11701 : } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1249 0 : status = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1250 : }
1251 11701 : if(static_cast<int32_t>(status) > sharedCommit->status[member.uri]){
1252 11700 : sharedCommit->status[member.uri] = static_cast<int32_t>(status);
1253 : }
1254 : }
1255 : }
1256 929 : }
1257 11681 : history->quickAccess[commitId] = sharedCommit;
1258 :
1259 11680 : if (reactToIt != commit.end() && !reactToIt->second.empty()) {
1260 3 : handleReaction(*history, sharedCommit);
1261 11678 : } else if (editIt != commit.end() && !editIt->second.empty()) {
1262 8 : handleEdition(*history, sharedCommit, messageReceived);
1263 11669 : } else if (handleMessage(*history, sharedCommit, messageReceived)) {
1264 10452 : messages.emplace_back(sharedCommit);
1265 : }
1266 11681 : rectifyStatus(sharedCommit, *history);
1267 11719 : };
1268 11658 : std::for_each(commits.begin(), commits.end(), addCommit);
1269 :
1270 11657 : return messages;
1271 11657 : }
1272 :
1273 181 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1274 : ConversationMode mode,
1275 181 : const std::string& otherMember)
1276 181 : : pimpl_ {new Impl {account, mode, otherMember}}
1277 181 : {}
1278 :
1279 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1280 18 : const std::string& conversationId)
1281 18 : : pimpl_ {new Impl {account, conversationId}}
1282 18 : {}
1283 :
1284 190 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1285 : const std::string& remoteDevice,
1286 190 : const std::string& conversationId)
1287 190 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1288 190 : {}
1289 :
1290 383 : Conversation::~Conversation() {}
1291 :
1292 : std::string
1293 5535 : Conversation::id() const
1294 : {
1295 5535 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1296 : }
1297 :
1298 : void
1299 136 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1300 : {
1301 : try {
1302 136 : if (mode() == ConversationMode::ONE_TO_ONE) {
1303 : // Only authorize to add left members
1304 1 : auto initialMembers = getInitialMembers();
1305 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1306 1 : if (it == initialMembers.end()) {
1307 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1308 1 : cb(false, "");
1309 1 : return;
1310 : }
1311 1 : }
1312 0 : } catch (const std::exception& e) {
1313 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1314 0 : cb(false, "");
1315 0 : return;
1316 0 : }
1317 135 : if (isMember(contactUri, true)) {
1318 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1319 0 : cb(false, "");
1320 0 : return;
1321 : }
1322 135 : if (isBanned(contactUri)) {
1323 2 : if (pimpl_->isAdmin()) {
1324 3 : dht::ThreadPool::io().run(
1325 2 : [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1326 1 : if (auto sthis = w.lock()) {
1327 1 : auto members = sthis->pimpl_->repository_->members();
1328 1 : auto type = sthis->pimpl_->bannedType(contactUri);
1329 1 : if (type.empty()) {
1330 0 : cb(false, {});
1331 0 : return;
1332 : }
1333 1 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1334 2 : }
1335 : });
1336 : } else {
1337 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1338 1 : cb(false, "");
1339 : }
1340 2 : return;
1341 : }
1342 :
1343 133 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1344 133 : if (auto sthis = w.lock()) {
1345 : // Add member files and commit
1346 133 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1347 133 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1348 133 : sthis->pimpl_->announce(commit, true);
1349 133 : lk.unlock();
1350 133 : if (cb)
1351 133 : cb(!commit.empty(), commit);
1352 266 : }
1353 133 : });
1354 : }
1355 :
1356 : std::shared_ptr<dhtnet::ChannelSocket>
1357 5560 : Conversation::gitSocket(const DeviceId& deviceId) const
1358 : {
1359 5560 : return pimpl_->gitSocket(deviceId);
1360 : }
1361 :
1362 : void
1363 2606 : Conversation::addGitSocket(const DeviceId& deviceId,
1364 : const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1365 : {
1366 2606 : pimpl_->addGitSocket(deviceId, socket);
1367 2605 : }
1368 :
1369 : void
1370 791 : Conversation::removeGitSocket(const DeviceId& deviceId)
1371 : {
1372 791 : pimpl_->removeGitSocket(deviceId);
1373 791 : }
1374 :
1375 : void
1376 389 : Conversation::shutdownConnections()
1377 : {
1378 389 : pimpl_->fallbackTimer_->cancel();
1379 389 : pimpl_->gitSocketList_.clear();
1380 389 : if (pimpl_->swarmManager_)
1381 389 : pimpl_->swarmManager_->shutdown();
1382 389 : std::lock_guard lk(pimpl_->membersMtx_);
1383 389 : pimpl_->checkedMembers_.clear();
1384 389 : }
1385 :
1386 : void
1387 0 : Conversation::connectivityChanged()
1388 : {
1389 0 : if (pimpl_->swarmManager_)
1390 0 : pimpl_->swarmManager_->maintainBuckets();
1391 0 : }
1392 :
1393 : std::vector<jami::DeviceId>
1394 0 : Conversation::getDeviceIdList() const
1395 : {
1396 0 : return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1397 : }
1398 :
1399 : std::shared_ptr<Typers>
1400 9 : Conversation::typers() const
1401 : {
1402 9 : return pimpl_->typers_;
1403 : }
1404 :
1405 : bool
1406 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1407 : {
1408 0 : if (!pimpl_->swarmManager_)
1409 0 : return false;
1410 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1411 : }
1412 :
1413 : void
1414 1 : Conversation::Impl::voteUnban(const std::string& contactUri,
1415 : const std::string_view type,
1416 : const OnDoneCb& cb)
1417 : {
1418 : // Check if admin
1419 1 : if (!isAdmin()) {
1420 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1421 0 : cb(false, {});
1422 0 : return;
1423 : }
1424 :
1425 : // Vote for removal
1426 1 : std::unique_lock lk(writeMtx_);
1427 1 : auto voteCommit = repository_->voteUnban(contactUri, type);
1428 1 : if (voteCommit.empty()) {
1429 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1430 0 : cb(false, "");
1431 0 : return;
1432 : }
1433 :
1434 1 : auto lastId = voteCommit;
1435 1 : std::vector<std::string> commits;
1436 1 : commits.emplace_back(voteCommit);
1437 :
1438 : // If admin, check vote
1439 2 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1440 1 : if (!resolveCommit.empty()) {
1441 1 : commits.emplace_back(resolveCommit);
1442 1 : lastId = resolveCommit;
1443 1 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1444 : }
1445 1 : announce(commits, true);
1446 1 : lk.unlock();
1447 1 : if (cb)
1448 1 : cb(!lastId.empty(), lastId);
1449 1 : }
1450 :
1451 : void
1452 14 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1453 : {
1454 42 : dht::ThreadPool::io().run([w = weak(),
1455 14 : contactUri = std::move(contactUri),
1456 14 : isDevice = std::move(isDevice),
1457 14 : cb = std::move(cb)] {
1458 14 : if (auto sthis = w.lock()) {
1459 : // Check if admin
1460 14 : if (!sthis->pimpl_->isAdmin()) {
1461 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1462 1 : cb(false, {});
1463 2 : return;
1464 : }
1465 :
1466 : // Get current user type
1467 13 : std::string type;
1468 13 : if (isDevice) {
1469 0 : type = "devices";
1470 : } else {
1471 13 : auto members = sthis->pimpl_->repository_->members();
1472 28 : for (const auto& member : members) {
1473 28 : if (member.uri == contactUri) {
1474 13 : if (member.role == MemberRole::INVITED) {
1475 2 : type = "invited";
1476 11 : } else if (member.role == MemberRole::ADMIN) {
1477 1 : type = "admins";
1478 10 : } else if (member.role == MemberRole::MEMBER) {
1479 10 : type = "members";
1480 : }
1481 13 : break;
1482 : }
1483 : }
1484 13 : if (type.empty()) {
1485 0 : cb(false, {});
1486 0 : return;
1487 : }
1488 13 : }
1489 :
1490 : // Vote for removal
1491 13 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1492 13 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1493 13 : if (voteCommit.empty()) {
1494 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1495 1 : cb(false, "");
1496 1 : return;
1497 : }
1498 :
1499 12 : auto lastId = voteCommit;
1500 12 : std::vector<std::string> commits;
1501 12 : commits.emplace_back(voteCommit);
1502 :
1503 : // If admin, check vote
1504 24 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1505 12 : if (!resolveCommit.empty()) {
1506 12 : commits.emplace_back(resolveCommit);
1507 12 : lastId = resolveCommit;
1508 12 : JAMI_WARN("Vote solved for %s. %s banned",
1509 : contactUri.c_str(),
1510 : isDevice ? "Device" : "Member");
1511 12 : sthis->pimpl_->disconnectFromPeer(contactUri);
1512 : }
1513 :
1514 12 : sthis->pimpl_->announce(commits, true);
1515 12 : lk.unlock();
1516 12 : cb(!lastId.empty(), lastId);
1517 29 : }
1518 : });
1519 14 : }
1520 :
1521 : std::vector<std::map<std::string, std::string>>
1522 423 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1523 : {
1524 423 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1525 : }
1526 :
1527 : std::set<std::string>
1528 2239 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1529 : {
1530 2239 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1531 : }
1532 :
1533 : std::vector<NodeId>
1534 1965 : Conversation::peersToSyncWith() const
1535 : {
1536 1965 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1537 1965 : const auto& nodes = routingTable.getNodes();
1538 1965 : const auto& mobiles = routingTable.getMobileNodes();
1539 1965 : std::vector<NodeId> s;
1540 1965 : s.reserve(nodes.size() + mobiles.size());
1541 1965 : s.insert(s.end(), nodes.begin(), nodes.end());
1542 1965 : s.insert(s.end(), mobiles.begin(), mobiles.end());
1543 14335 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1544 12371 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1545 200 : s.emplace_back(deviceId);
1546 3930 : return s;
1547 1965 : }
1548 :
1549 : bool
1550 1965 : Conversation::isBootstraped() const
1551 : {
1552 1965 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1553 1965 : return !routingTable.getNodes().empty();
1554 : }
1555 :
1556 : std::string
1557 18609 : Conversation::uriFromDevice(const std::string& deviceId) const
1558 : {
1559 18609 : return pimpl_->repository_->uriFromDevice(deviceId);
1560 : }
1561 :
1562 : void
1563 0 : Conversation::monitor()
1564 : {
1565 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1566 0 : }
1567 :
1568 : std::string
1569 184 : Conversation::join()
1570 : {
1571 184 : return pimpl_->repository_->join();
1572 : }
1573 :
1574 : bool
1575 4091 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1576 : {
1577 4091 : auto repoPath = pimpl_->repoPath();
1578 4093 : auto invitedPath = repoPath / "invited";
1579 4092 : auto adminsPath = repoPath / "admins";
1580 4093 : auto membersPath = repoPath / "members";
1581 16370 : std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1582 4092 : if (includeInvited)
1583 3419 : pathsToCheck.emplace_back(invitedPath);
1584 9419 : for (const auto& path : pathsToCheck) {
1585 39773 : for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1586 34446 : std::string_view crtUri = certificate;
1587 34445 : auto crtIt = crtUri.find(".crt");
1588 34425 : if (path != invitedPath && crtIt == std::string_view::npos) {
1589 0 : JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1590 0 : continue;
1591 0 : }
1592 34457 : if (crtIt != std::string_view::npos)
1593 33507 : crtUri = crtUri.substr(0, crtIt);
1594 34444 : if (crtUri == uri)
1595 3331 : return true;
1596 8658 : }
1597 : }
1598 :
1599 762 : if (includeInvited && mode() == ConversationMode::ONE_TO_ONE) {
1600 3 : for (const auto& member : getInitialMembers()) {
1601 2 : if (member == uri)
1602 0 : return true;
1603 1 : }
1604 : }
1605 :
1606 762 : return false;
1607 4092 : }
1608 :
1609 : bool
1610 7642 : Conversation::isBanned(const std::string& uri) const
1611 : {
1612 7642 : return !pimpl_->bannedType(uri).empty();
1613 : }
1614 :
1615 : void
1616 0 : Conversation::sendMessage(std::string&& message,
1617 : const std::string& type,
1618 : const std::string& replyTo,
1619 : OnCommitCb&& onCommit,
1620 : OnDoneCb&& cb)
1621 : {
1622 0 : Json::Value json;
1623 0 : json["body"] = std::move(message);
1624 0 : json["type"] = type;
1625 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1626 0 : }
1627 :
1628 : void
1629 139 : Conversation::sendMessage(Json::Value&& value,
1630 : const std::string& replyTo,
1631 : OnCommitCb&& onCommit,
1632 : OnDoneCb&& cb)
1633 : {
1634 139 : if (!replyTo.empty()) {
1635 2 : auto commit = pimpl_->repository_->getCommit(replyTo);
1636 2 : if (commit == std::nullopt) {
1637 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1638 1 : return;
1639 : }
1640 1 : value["reply-to"] = replyTo;
1641 2 : }
1642 552 : dht::ThreadPool::io().run(
1643 414 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1644 138 : if (auto sthis = w.lock()) {
1645 138 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1646 138 : auto commit = sthis->pimpl_->repository_->commitMessage(
1647 138 : Json::writeString(jsonBuilder, value));
1648 138 : lk.unlock();
1649 138 : if (onCommit)
1650 11 : onCommit(commit);
1651 138 : sthis->pimpl_->announce(commit, true);
1652 138 : if (cb)
1653 138 : cb(!commit.empty(), commit);
1654 276 : }
1655 138 : });
1656 : }
1657 :
1658 : void
1659 1 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1660 : {
1661 1 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1662 1 : if (auto sthis = w.lock()) {
1663 1 : std::vector<std::string> commits;
1664 1 : commits.reserve(messages.size());
1665 1 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1666 3 : for (const auto& message : messages) {
1667 2 : auto commit = sthis->pimpl_->repository_->commitMessage(
1668 2 : Json::writeString(jsonBuilder, message));
1669 2 : commits.emplace_back(std::move(commit));
1670 2 : }
1671 1 : lk.unlock();
1672 1 : sthis->pimpl_->announce(commits, true);
1673 1 : if (cb)
1674 1 : cb(commits);
1675 2 : }
1676 1 : });
1677 1 : }
1678 :
1679 : std::optional<std::map<std::string, std::string>>
1680 14851 : Conversation::getCommit(const std::string& commitId) const
1681 : {
1682 14851 : auto commit = pimpl_->repository_->getCommit(commitId);
1683 14851 : if (commit == std::nullopt)
1684 2642 : return std::nullopt;
1685 12208 : return pimpl_->repository_->convCommitToMap(*commit);
1686 14851 : }
1687 :
1688 : void
1689 7 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1690 : {
1691 7 : if (!cb)
1692 0 : return;
1693 7 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1694 7 : if (auto sthis = w.lock()) {
1695 7 : cb(sthis->pimpl_->loadMessages(options));
1696 7 : }
1697 7 : });
1698 : }
1699 :
1700 : void
1701 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1702 : {
1703 2 : if (!cb)
1704 0 : return;
1705 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1706 2 : if (auto sthis = w.lock()) {
1707 2 : cb(sthis->pimpl_->loadMessages2(options));
1708 2 : }
1709 2 : });
1710 : }
1711 :
1712 : void
1713 0 : Conversation::clearCache()
1714 : {
1715 0 : pimpl_->loadedHistory_.messageList.clear();
1716 0 : pimpl_->loadedHistory_.quickAccess.clear();
1717 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1718 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1719 0 : pimpl_->memberToStatus.clear();
1720 0 : }
1721 :
1722 : std::string
1723 3067 : Conversation::lastCommitId() const
1724 : {
1725 3067 : LogOptions options;
1726 3065 : options.nbOfCommits = 1;
1727 3065 : options.skipMerge = true;
1728 3065 : History optHistory;
1729 : {
1730 3064 : std::lock_guard lk(pimpl_->historyMtx_);
1731 3066 : if (!pimpl_->loadedHistory_.messageList.empty())
1732 5203 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1733 3066 : }
1734 :
1735 464 : std::lock_guard lk(pimpl_->writeMtx_);
1736 464 : auto res = pimpl_->loadMessages2(options, &optHistory);
1737 464 : if (res.empty())
1738 4 : return {};
1739 920 : return (*optHistory.messageList.begin())->id;
1740 3067 : }
1741 :
1742 : std::vector<std::map<std::string, std::string>>
1743 2249 : Conversation::Impl::mergeHistory(const std::string& uri)
1744 : {
1745 2249 : if (not repository_) {
1746 0 : JAMI_WARN("Invalid repo. Abort merge");
1747 0 : return {};
1748 : }
1749 4498 : auto remoteHead = repository_->remoteHead(uri);
1750 2249 : if (remoteHead.empty()) {
1751 0 : JAMI_WARN("Unable to get HEAD of %s", uri.c_str());
1752 0 : return {};
1753 : }
1754 :
1755 : // Validate commit
1756 2249 : auto [newCommits, err] = repository_->validFetch(uri);
1757 2249 : if (newCommits.empty()) {
1758 1347 : if (err)
1759 18 : JAMI_ERR("Unable to validate history with %s", uri.c_str());
1760 1347 : repository_->removeBranchWith(uri);
1761 1347 : return {};
1762 : }
1763 :
1764 : // If validated, merge
1765 902 : auto [ok, cid] = repository_->merge(remoteHead);
1766 902 : if (!ok) {
1767 0 : JAMI_ERR("Unable to merge history with %s", uri.c_str());
1768 0 : repository_->removeBranchWith(uri);
1769 0 : return {};
1770 : }
1771 902 : if (!cid.empty()) {
1772 : // A merge commit was generated, should be added in new commits
1773 16 : auto commit = repository_->getCommit(cid);
1774 16 : if (commit != std::nullopt)
1775 15 : newCommits.emplace_back(*commit);
1776 16 : }
1777 :
1778 2706 : JAMI_DEBUG("Successfully merge history with {:s}", uri);
1779 902 : auto result = repository_->convCommitsToMap(newCommits);
1780 1833 : for (auto& commit : result) {
1781 931 : auto it = commit.find("type");
1782 931 : if (it != commit.end() && it->second == "member") {
1783 770 : repository_->refreshMembers();
1784 :
1785 770 : if (commit["action"] == "ban")
1786 5 : disconnectFromPeer(commit["uri"]);
1787 : }
1788 : }
1789 901 : return result;
1790 2248 : }
1791 :
1792 : bool
1793 2421 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1794 : {
1795 2421 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1796 2421 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1797 2422 : auto& pullcbs = it->second;
1798 2422 : auto itPull = std::find_if(pullcbs.begin(),
1799 0 : pullcbs.end(),
1800 4848 : [&](const auto& elem) { return std::get<0>(elem) == commitId; });
1801 2422 : if (itPull != pullcbs.end()) {
1802 15 : JAMI_DEBUG("Ignoring request to pull from {:s} with commit {:s}: pull already in progress", deviceId, commitId);
1803 5 : cb(false);
1804 5 : return false;
1805 : }
1806 7247 : JAMI_DEBUG("Pulling from {:s} with commit {:s}", deviceId, commitId);
1807 2417 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1808 2417 : if (notInProgress)
1809 2373 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1810 2373 : if (auto sthis_ = w.lock())
1811 2373 : sthis_->pimpl_->pull(deviceId);
1812 2373 : });
1813 2417 : return true;
1814 2422 : }
1815 :
1816 : void
1817 2373 : Conversation::Impl::pull(const std::string& deviceId)
1818 : {
1819 2373 : auto& repo = repository_;
1820 :
1821 2373 : std::string commitId;
1822 2373 : OnPullCb cb;
1823 : while (true) {
1824 : {
1825 4790 : std::lock_guard lk(pullcbsMtx_);
1826 4790 : auto it = fetchingRemotes_.find(deviceId);
1827 4790 : if (it == fetchingRemotes_.end()) {
1828 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1829 0 : break;
1830 : }
1831 4790 : auto& pullcbs = it->second;
1832 4790 : if (pullcbs.empty()) {
1833 2373 : fetchingRemotes_.erase(it);
1834 2373 : break;
1835 : }
1836 2417 : auto& elem = pullcbs.front();
1837 2417 : commitId = std::move(std::get<0>(elem));
1838 2417 : cb = std::move(std::get<1>(elem));
1839 2417 : pullcbs.pop_front();
1840 4790 : }
1841 : // If recently fetched, the commit can already be there, so no need to do complex operations
1842 2417 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1843 163 : cb(true);
1844 168 : continue;
1845 : }
1846 : // Pull from remote
1847 2253 : auto fetched = repo->fetch(deviceId);
1848 2254 : if (!fetched) {
1849 5 : cb(false);
1850 5 : continue;
1851 : }
1852 2249 : auto oldHead = repo->getHead();
1853 2249 : std::string newHead = oldHead;
1854 2249 : std::unique_lock lk(writeMtx_);
1855 2249 : auto commits = mergeHistory(deviceId);
1856 2249 : if (!commits.empty()) {
1857 902 : newHead = commits.rbegin()->at("id");
1858 : // Note: Because clients needs to linearize the history, they need to know all commits
1859 : // that can be updated.
1860 : // In this case, all commits until the common merge base should be announced.
1861 : // The client ill need to update it's model after this.
1862 902 : std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1863 902 : auto newHeadCommit = repo->getCommit(newHead);
1864 902 : if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1865 18 : mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1866 18 : LogOptions options;
1867 18 : options.to = mergeBase;
1868 18 : auto updatedCommits = loadMessages(options);
1869 : // We announce commits from oldest to update to newest. This generally avoid
1870 : // to get detached commits until they are all announced.
1871 18 : std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1872 18 : announce(updatedCommits);
1873 18 : } else {
1874 884 : announce(commits);
1875 : }
1876 902 : }
1877 2249 : lk.unlock();
1878 :
1879 2249 : bool commitFound = false;
1880 2249 : if (commitId != "") {
1881 : // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1882 : // We need to check if we actually got it; the fact that the fetch above was
1883 : // successful doesn't guarantee that we did.
1884 2187 : for (const auto& commit : commits) {
1885 908 : if (commit.at("id") == commitId) {
1886 895 : commitFound = true;
1887 895 : break;
1888 : }
1889 : }
1890 : } else {
1891 75 : commitFound = true;
1892 : }
1893 2249 : if (!commitFound)
1894 3837 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1895 : deviceId, commitId);
1896 : // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1897 : // for commit `commitId` to other members of the swarm. It's important that we only
1898 : // send these notifications if we actually have the commit. Otherwise, we can end up
1899 : // in a situation where the members of the swarm keep sending notifications to each
1900 : // other for a commit that none of them have (note that we are unable to rule this out, as
1901 : // nothing prevents a malicious user from intentionally sending a notification with
1902 : // a fake commit ID).
1903 2249 : if (cb)
1904 2249 : cb(commitFound);
1905 : // Announce if profile changed
1906 2248 : if (oldHead != newHead) {
1907 902 : auto diffStats = repo->diffStats(newHead, oldHead);
1908 902 : auto changedFiles = repo->changedFiles(diffStats);
1909 902 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1910 1804 : != changedFiles.end()) {
1911 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
1912 10 : accountId_, repo->id(), repo->infos());
1913 : }
1914 902 : }
1915 4665 : }
1916 2373 : }
1917 :
1918 : void
1919 2420 : Conversation::sync(const std::string& member,
1920 : const std::string& deviceId,
1921 : OnPullCb&& cb,
1922 : std::string commitId)
1923 : {
1924 2420 : pull(deviceId, std::move(cb), commitId);
1925 2422 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1926 2422 : auto sthis = w.lock();
1927 : // For waiting request, downloadFile
1928 2422 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1929 0 : auto path = fileutils::get_data_dir() / sthis->pimpl_->accountId_
1930 0 : / "conversation_data" / sthis->id() / wr.fileId;
1931 0 : auto start = fileutils::size(path);
1932 0 : if (start < 0)
1933 0 : start = 0;
1934 0 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId, start);
1935 2422 : }
1936 2422 : });
1937 2422 : }
1938 :
1939 : std::map<std::string, std::string>
1940 279 : Conversation::generateInvitation() const
1941 : {
1942 : // Invite the new member to the conversation
1943 279 : Json::Value root;
1944 279 : auto& metadata = root[ConversationMapKeys::METADATAS];
1945 564 : for (const auto& [k, v] : infos()) {
1946 285 : if (v.size() >= 64000) {
1947 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1948 0 : continue;
1949 0 : }
1950 285 : metadata[k] = v;
1951 279 : }
1952 279 : root[ConversationMapKeys::CONVERSATIONID] = id();
1953 837 : return {{"application/invite+json", Json::writeString(jsonBuilder, root)}};
1954 279 : }
1955 :
1956 : std::string
1957 6 : Conversation::leave()
1958 : {
1959 6 : setRemovingFlag();
1960 6 : std::lock_guard lk(pimpl_->writeMtx_);
1961 12 : return pimpl_->repository_->leave();
1962 6 : }
1963 :
1964 : void
1965 11 : Conversation::setRemovingFlag()
1966 : {
1967 11 : pimpl_->isRemoving_ = true;
1968 11 : }
1969 :
1970 : bool
1971 4607 : Conversation::isRemoving()
1972 : {
1973 4607 : return pimpl_->isRemoving_;
1974 : }
1975 :
1976 : void
1977 24 : Conversation::erase()
1978 : {
1979 24 : if (pimpl_->conversationDataPath_ != "")
1980 24 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1981 24 : if (!pimpl_->repository_)
1982 0 : return;
1983 24 : std::lock_guard lk(pimpl_->writeMtx_);
1984 24 : pimpl_->repository_->erase();
1985 24 : }
1986 :
1987 : ConversationMode
1988 4990 : Conversation::mode() const
1989 : {
1990 4990 : return pimpl_->repository_->mode();
1991 : }
1992 :
1993 : std::vector<std::string>
1994 34 : Conversation::getInitialMembers() const
1995 : {
1996 34 : return pimpl_->repository_->getInitialMembers();
1997 : }
1998 :
1999 : bool
2000 0 : Conversation::isInitialMember(const std::string& uri) const
2001 : {
2002 0 : auto members = getInitialMembers();
2003 0 : return std::find(members.begin(), members.end(), uri) != members.end();
2004 0 : }
2005 :
2006 : void
2007 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2008 : {
2009 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2010 9 : if (auto sthis = w.lock()) {
2011 9 : auto& repo = sthis->pimpl_->repository_;
2012 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
2013 9 : auto commit = repo->updateInfos(map);
2014 9 : sthis->pimpl_->announce(commit, true);
2015 9 : lk.unlock();
2016 9 : if (cb)
2017 9 : cb(!commit.empty(), commit);
2018 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2019 18 : sthis->pimpl_->accountId_, repo->id(), repo->infos());
2020 18 : }
2021 9 : });
2022 9 : }
2023 :
2024 : std::map<std::string, std::string>
2025 326 : Conversation::infos() const
2026 : {
2027 326 : return pimpl_->repository_->infos();
2028 : }
2029 :
2030 : void
2031 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2032 : {
2033 8 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2034 8 : auto prefs = map;
2035 8 : auto itLast = prefs.find(LAST_MODIFIED);
2036 8 : if (itLast != prefs.end()) {
2037 3 : if (std::filesystem::is_regular_file(filePath)) {
2038 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
2039 : try {
2040 1 : if (lastModified >= std::stoul(itLast->second))
2041 0 : return;
2042 0 : } catch (...) {
2043 0 : return;
2044 0 : }
2045 : }
2046 3 : prefs.erase(itLast);
2047 : }
2048 :
2049 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2050 8 : msgpack::pack(file, prefs);
2051 16 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_,
2052 16 : id(),
2053 8 : std::move(prefs));
2054 8 : }
2055 :
2056 : std::map<std::string, std::string>
2057 109 : Conversation::preferences(bool includeLastModified) const
2058 : {
2059 : try {
2060 109 : std::map<std::string, std::string> preferences;
2061 109 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2062 207 : auto file = fileutils::loadFile(filePath);
2063 11 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2064 11 : oh.get().convert(preferences);
2065 11 : if (includeLastModified)
2066 8 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
2067 11 : return preferences;
2068 305 : } catch (const std::exception& e) {
2069 98 : }
2070 98 : return {};
2071 : }
2072 :
2073 : std::vector<uint8_t>
2074 0 : Conversation::vCard() const
2075 : {
2076 : try {
2077 0 : return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
2078 0 : } catch (...) {
2079 0 : }
2080 0 : return {};
2081 : }
2082 :
2083 : std::shared_ptr<TransferManager>
2084 2520 : Conversation::dataTransfer() const
2085 : {
2086 2520 : return pimpl_->transferManager_;
2087 : }
2088 :
2089 : bool
2090 13 : Conversation::onFileChannelRequest(const std::string& member,
2091 : const std::string& fileId,
2092 : bool verifyShaSum) const
2093 : {
2094 13 : if (!isMember(member))
2095 0 : return false;
2096 :
2097 13 : auto sep = fileId.find('_');
2098 13 : if (sep == std::string::npos)
2099 0 : return false;
2100 :
2101 13 : auto interactionId = fileId.substr(0, sep);
2102 13 : auto commit = getCommit(interactionId);
2103 26 : if (commit == std::nullopt || commit->find("type") == commit->end()
2104 26 : || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
2105 26 : || commit->at("type") != "application/data-transfer+json") {
2106 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}", pimpl_->accountId_, member, interactionId);
2107 0 : return false;
2108 : }
2109 :
2110 13 : auto path = dataTransfer()->path(fileId);
2111 :
2112 13 : if (!std::filesystem::is_regular_file(path)) {
2113 : // Check if dangling symlink
2114 1 : if (std::filesystem::is_symlink(path)) {
2115 1 : dhtnet::fileutils::remove(path, true);
2116 : }
2117 3 : JAMI_WARNING("[Account {:s}] {:s} asked for non existing file {} in {:s}",
2118 : pimpl_->accountId_,
2119 : member,
2120 : fileId,
2121 : id());
2122 1 : return false;
2123 : }
2124 : // Check that our file is correct before sending
2125 12 : if (verifyShaSum && commit->at("sha3sum") != fileutils::sha3File(path)) {
2126 3 : JAMI_WARNING(
2127 : "[Account {:s}] {:s} asked for file {:s} in {:s}, but our version is not complete or corrupted",
2128 : pimpl_->accountId_,
2129 : member,
2130 : fileId,
2131 : id());
2132 1 : return false;
2133 : }
2134 11 : return true;
2135 13 : }
2136 :
2137 : bool
2138 11 : Conversation::downloadFile(const std::string& interactionId,
2139 : const std::string& fileId,
2140 : const std::string& path,
2141 : const std::string&,
2142 : const std::string& deviceId,
2143 : std::size_t start,
2144 : std::size_t end)
2145 : {
2146 11 : auto commit = getCommit(interactionId);
2147 11 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2148 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2149 0 : return false;
2150 : }
2151 11 : auto tid = commit->find("tid");
2152 11 : auto sha3sum = commit->find("sha3sum");
2153 11 : auto size_str = commit->find("totalSize");
2154 :
2155 11 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2156 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2157 0 : return false;
2158 : }
2159 :
2160 11 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2161 11 : if (totalSize < 0) {
2162 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2163 0 : return false;
2164 : }
2165 :
2166 : // Be sure to not lock conversation
2167 22 : dht::ThreadPool().io().run([w = weak(),
2168 : deviceId,
2169 : fileId,
2170 : interactionId,
2171 11 : sha3sum = sha3sum->second,
2172 : path,
2173 : totalSize,
2174 : start,
2175 22 : end] {
2176 11 : if (auto shared = w.lock()) {
2177 11 : auto acc = shared->pimpl_->account_.lock();
2178 11 : if (!acc)
2179 0 : return;
2180 11 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2181 11 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2182 22 : }
2183 : });
2184 11 : return true;
2185 11 : }
2186 :
2187 : void
2188 1109 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2189 : {
2190 1109 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2191 1109 : auto sthis = w.lock();
2192 1109 : if (!sthis)
2193 0 : return;
2194 : // Update fetched for Uri
2195 1109 : auto uri = sthis->uriFromDevice(deviceId);
2196 1109 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2197 44 : return;
2198 : // When a user fetches a commit, the message is sent for this person
2199 1065 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2200 1153 : });
2201 1109 : }
2202 :
2203 :
2204 : void
2205 1108 : Conversation::Impl::updateStatus(const std::string& uri,
2206 : libjami::Account::MessageStates st,
2207 : const std::string& commitId,
2208 : const std::string& ts,
2209 : bool emit)
2210 : {
2211 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2212 1108 : LogOptions options;
2213 1108 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2214 : {
2215 : // Update internal structures.
2216 1108 : std::lock_guard lk(messageStatusMtx_);
2217 1108 : auto& status = messagesStatus_[uri];
2218 1108 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2219 1108 : if (oldStatus == commitId)
2220 8 : return; // Nothing to do
2221 1100 : options.to = oldStatus;
2222 1100 : options.from = commitId;
2223 1100 : oldStatus = commitId;
2224 1100 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2225 1099 : saveStatus();
2226 1100 : if (emit)
2227 1074 : newStatus[uri].insert(status.begin(), status.end());
2228 1108 : }
2229 1100 : if (emit && messageStatusCb_) {
2230 1074 : messageStatusCb_(newStatus);
2231 : }
2232 : // Update messages status for all commit between the old and new one
2233 1100 : options.logIfNotFound = false;
2234 1100 : options.fastLog = true;
2235 1100 : History optHistory;
2236 1100 : std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status.
2237 1100 : auto res = loadMessages2(options, &optHistory);
2238 1100 : if (res.size() == 0) {
2239 : // In this case, commit is not received yet, so we cache it
2240 8 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2241 : }
2242 11062 : for (const auto& [cid, _]: optHistory.quickAccess) {
2243 9962 : auto message = loadedHistory_.quickAccess.find(cid);
2244 9962 : if (message != loadedHistory_.quickAccess.end()) {
2245 : // Update message and emit to client,
2246 1864 : if(static_cast<int32_t>(st) > message->second->status[uri]){
2247 1860 : message->second->status[uri] = static_cast<int32_t>(st);
2248 3720 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
2249 1860 : accountId_,
2250 1860 : repository_->id(),
2251 : uri,
2252 : cid,
2253 : static_cast<int>(st));
2254 : }
2255 : } else {
2256 : // In this case, commit is not loaded by client, so we cache it
2257 : // No need to emit to client, they will get a correct status on load.
2258 8098 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2259 : }
2260 : }
2261 1116 : }
2262 :
2263 : bool
2264 19 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2265 : {
2266 19 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2267 19 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2268 2 : return false; // Nothing to do
2269 17 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2270 17 : auto sthis = w.lock();
2271 17 : if (!sthis)
2272 0 : return;
2273 17 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2274 17 : });
2275 17 : return true;
2276 19 : }
2277 :
2278 : std::map<std::string, std::map<std::string, std::string>>
2279 91 : Conversation::messageStatus() const
2280 : {
2281 91 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2282 182 : return pimpl_->messagesStatus_;
2283 91 : }
2284 :
2285 : void
2286 59 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2287 : {
2288 59 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2289 59 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2290 132 : for (const auto& [uri, status] : messageStatus) {
2291 73 : auto& oldMs = pimpl_->messagesStatus_[uri];
2292 73 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2293 23 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2294 23 : stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2295 : }
2296 : }
2297 73 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2298 3 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2299 3 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2300 : }
2301 : }
2302 : }
2303 59 : lk.unlock();
2304 :
2305 85 : for (const auto& [status, uri, commitId, ts] : stVec) {
2306 26 : pimpl_->updateStatus(uri, status, commitId, ts);
2307 : }
2308 59 : }
2309 :
2310 : void
2311 383 : Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2312 : {
2313 383 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2314 383 : pimpl_->messageStatusCb_ = cb;
2315 383 : }
2316 :
2317 : #ifdef LIBJAMI_TESTABLE
2318 : void
2319 509 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2320 : {
2321 509 : pimpl_->bootstrapCbTest_ = cb;
2322 509 : }
2323 : #endif
2324 :
2325 : void
2326 608 : Conversation::checkBootstrapMember(const asio::error_code& ec,
2327 : std::vector<std::map<std::string, std::string>> members)
2328 : {
2329 608 : if (ec == asio::error::operation_aborted)
2330 319 : return;
2331 581 : auto acc = pimpl_->account_.lock();
2332 581 : if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2333 13 : return;
2334 : // We bootstrap the DRT with devices who already wrote in the repository.
2335 : // However, in a conversation, a large number of devices may just watch
2336 : // the conversation, but never write any message.
2337 568 : std::unique_lock lock(pimpl_->membersMtx_);
2338 :
2339 568 : std::string uri;
2340 853 : while (!members.empty()) {
2341 292 : auto member = std::move(members.back());
2342 292 : members.pop_back();
2343 292 : uri = std::move(member.at("uri"));
2344 292 : if (uri != pimpl_->userId_
2345 292 : && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2346 7 : break;
2347 292 : }
2348 279 : auto fallbackFailed = [](auto sthis) {
2349 837 : JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback failed. Wait for remote connections.",
2350 : sthis->pimpl_->toString(),
2351 : fmt::ptr(sthis->pimpl_->swarmManager_.get()));
2352 : #ifdef LIBJAMI_TESTABLE
2353 279 : if (sthis->pimpl_->bootstrapCbTest_)
2354 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2355 : #endif
2356 279 : };
2357 : // If members is empty, we finished the fallback un-successfully
2358 568 : if (members.empty() && uri.empty()) {
2359 279 : lock.unlock();
2360 279 : fallbackFailed(this);
2361 279 : return;
2362 : }
2363 :
2364 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2365 289 : pimpl_->checkedMembers_.emplace(uri);
2366 289 : auto devices = std::make_shared<std::vector<NodeId>>();
2367 1156 : acc->forEachDevice(
2368 578 : dht::InfoHash(uri),
2369 287 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2370 : // Test if already sent
2371 287 : if (auto sthis = w.lock()) {
2372 285 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2373 277 : devices->emplace_back(dev->getLongId());
2374 287 : }
2375 287 : },
2376 289 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2377 289 : auto sthis = w.lock();
2378 289 : if (!sthis)
2379 4 : return;
2380 285 : auto checkNext = true;
2381 285 : if (ok && devices->size() != 0) {
2382 : #ifdef LIBJAMI_TESTABLE
2383 274 : if (sthis->pimpl_->bootstrapCbTest_)
2384 6 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2385 : #endif
2386 822 : JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback with member: {}",
2387 : sthis->pimpl_->toString(),
2388 : fmt::ptr(sthis->pimpl_->swarmManager_),
2389 : uri);
2390 274 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2391 0 : checkNext = false;
2392 : }
2393 285 : if (checkNext) {
2394 : // Check next member
2395 285 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2396 570 : sthis->pimpl_->fallbackTimer_->async_wait(
2397 570 : std::bind(&Conversation::checkBootstrapMember,
2398 : sthis,
2399 : std::placeholders::_1,
2400 285 : std::move(members)));
2401 : } else {
2402 : // In this case, all members are checked. Fallback failed
2403 0 : fallbackFailed(sthis);
2404 : }
2405 289 : });
2406 1139 : }
2407 :
2408 : void
2409 508 : Conversation::bootstrap(std::function<void()> onBootstraped,
2410 : const std::vector<DeviceId>& knownDevices)
2411 : {
2412 508 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2413 0 : return;
2414 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2415 : // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2416 : // If it works, the callback onConnectionChanged will be called with ok=true
2417 508 : pimpl_->bootstrapCb_ = std::move(onBootstraped);
2418 508 : std::vector<DeviceId> devices = knownDevices;
2419 1521 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2420 1013 : if (!isBanned(member))
2421 1013 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2422 508 : }
2423 1524 : JAMI_DEBUG("{}[SwarmManager {}] Bootstrap with {} device(s)",
2424 : pimpl_->toString(),
2425 : fmt::ptr(pimpl_->swarmManager_),
2426 : devices.size());
2427 : // set callback
2428 323 : auto fallback = [](auto sthis, bool now = false) {
2429 : // Fallback
2430 323 : auto acc = sthis->pimpl_->account_.lock();
2431 323 : if (!acc)
2432 0 : return;
2433 323 : auto members = sthis->getMembers(false, false);
2434 323 : std::shuffle(members.begin(), members.end(), acc->rand);
2435 323 : if (now) {
2436 292 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2437 : } else {
2438 31 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2439 93 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2440 62 : - std::chrono::seconds(timeForBootstrap));
2441 93 : JAMI_DEBUG("{}[SwarmManager {}] Fallback in {} seconds",
2442 : sthis->pimpl_->toString(),
2443 : fmt::ptr(sthis->pimpl_->swarmManager_.get()),
2444 : (20 - timeForBootstrap));
2445 : }
2446 646 : sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2447 : sthis,
2448 : std::placeholders::_1,
2449 323 : std::move(members)));
2450 323 : };
2451 :
2452 508 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2453 : // This will call methods from accounts, so trigger on another thread.
2454 496 : dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2455 496 : auto sthis = w.lock();
2456 496 : if (!sthis)
2457 0 : return;
2458 496 : if (ok) {
2459 : // Bootstrap succeeded!
2460 : {
2461 465 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2462 465 : sthis->pimpl_->checkedMembers_.clear();
2463 465 : }
2464 465 : if (sthis->pimpl_->bootstrapCb_)
2465 465 : sthis->pimpl_->bootstrapCb_();
2466 : #ifdef LIBJAMI_TESTABLE
2467 465 : if (sthis->pimpl_->bootstrapCbTest_)
2468 10 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2469 : #endif
2470 465 : return;
2471 : }
2472 31 : fallback(sthis);
2473 496 : });
2474 496 : });
2475 : {
2476 508 : std::lock_guard lock(pimpl_->membersMtx_);
2477 508 : pimpl_->checkedMembers_.clear();
2478 508 : }
2479 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2480 508 : if (pimpl_->swarmManager_->isShutdown()) {
2481 19 : pimpl_->swarmManager_->restart();
2482 19 : pimpl_->swarmManager_->maintainBuckets();
2483 489 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2484 292 : fallback(this, true);
2485 : }
2486 508 : }
2487 :
2488 : std::vector<std::string>
2489 18 : Conversation::commitsEndedCalls()
2490 : {
2491 18 : pimpl_->loadActiveCalls();
2492 18 : pimpl_->loadHostedCalls();
2493 18 : auto commits = pimpl_->commitsEndedCalls();
2494 18 : if (!commits.empty()) {
2495 : // Announce to client
2496 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2497 0 : if (auto sthis = w.lock())
2498 0 : sthis->pimpl_->announce(commits, true);
2499 0 : });
2500 : }
2501 18 : return commits;
2502 0 : }
2503 :
2504 : void
2505 383 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2506 : {
2507 383 : pimpl_->onMembersChanged_ = std::move(cb);
2508 383 : pimpl_->repository_->onMembersChanged([w=weak()] (const std::set<std::string>& memberUris) {
2509 1072 : if (auto sthis = w.lock())
2510 1071 : sthis->pimpl_->onMembersChanged_(memberUris);
2511 1072 : });
2512 383 : }
2513 :
2514 : void
2515 383 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2516 : {
2517 766 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2518 : w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2519 832 : if (auto sthis = w.lock())
2520 830 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2521 1598 : };
2522 383 : }
2523 :
2524 : void
2525 998 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2526 : {
2527 998 : auto deviceId = channel->deviceId();
2528 : // Transmit avatar if necessary
2529 : // We do this here, because at this point we know both sides are connected and in
2530 : // the same conversation
2531 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2532 998 : auto cert = channel->peerCertificate();
2533 998 : if (!cert || !cert->issuer)
2534 0 : return;
2535 998 : auto member = cert->issuer->getId().toString();
2536 998 : pimpl_->swarmManager_->addChannel(std::move(channel));
2537 997 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2538 996 : auto sthis = w.lock();
2539 996 : if (auto account = a.lock()) {
2540 998 : account->sendProfile(sthis->id(), member, deviceId.toString());
2541 998 : }
2542 998 : });
2543 998 : }
2544 :
2545 : uint32_t
2546 4 : Conversation::countInteractions(const std::string& toId,
2547 : const std::string& fromId,
2548 : const std::string& authorUri) const
2549 : {
2550 4 : LogOptions options;
2551 4 : options.to = toId;
2552 4 : options.from = fromId;
2553 4 : options.authorUri = authorUri;
2554 4 : options.logIfNotFound = false;
2555 4 : options.fastLog = true;
2556 4 : History history;
2557 4 : auto res = pimpl_->loadMessages2(options, &history);
2558 8 : return res.size();
2559 4 : }
2560 :
2561 : void
2562 4 : Conversation::search(uint32_t req,
2563 : const Filter& filter,
2564 : const std::shared_ptr<std::atomic_int>& flag) const
2565 : {
2566 : // Because logging a conversation can take quite some time,
2567 : // do it asynchronously
2568 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2569 4 : if (auto sthis = w.lock()) {
2570 4 : History history;
2571 4 : std::vector<std::map<std::string, std::string>> commits {};
2572 : // std::regex_constants::ECMAScript is the default flag.
2573 4 : auto re = std::regex(filter.regexSearch,
2574 4 : filter.caseSensitive ? std::regex_constants::ECMAScript
2575 4 : : std::regex_constants::icase);
2576 4 : sthis->pimpl_->repository_->log(
2577 20 : [&](const auto& id, const auto& author, auto& commit) {
2578 100 : if (!filter.author.empty()
2579 20 : && filter.author != sthis->uriFromDevice(author.email)) {
2580 : // Filter author
2581 0 : return CallbackResult::Skip;
2582 : }
2583 20 : auto commitTime = git_commit_time(commit.get());
2584 20 : if (filter.before && filter.before < commitTime) {
2585 : // Only get commits before this date
2586 0 : return CallbackResult::Skip;
2587 : }
2588 20 : if (filter.after && filter.after > commitTime) {
2589 : // Only get commits before this date
2590 0 : if (git_commit_parentcount(commit.get()) <= 1)
2591 0 : return CallbackResult::Break;
2592 : else
2593 0 : return CallbackResult::Skip; // Because we are sorting it with
2594 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2595 : }
2596 :
2597 20 : return CallbackResult::Ok; // Continue
2598 : },
2599 20 : [&](auto&& cc) {
2600 40 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2601 40 : sthis->pimpl_->addToHistory({optMessage.value()}, false, false, &history);
2602 20 : },
2603 20 : [&](auto id, auto, auto) {
2604 20 : if (id == filter.lastId)
2605 0 : return true;
2606 20 : return false;
2607 : },
2608 : "",
2609 : false);
2610 : // Search on generated history
2611 24 : for (auto& message : history.messageList) {
2612 20 : auto contentType = message->type;
2613 20 : auto isSearchable = contentType == "text/plain"
2614 20 : || contentType == "application/data-transfer+json";
2615 20 : if (filter.type.empty() && !isSearchable) {
2616 : // Not searchable, at least for now
2617 8 : continue;
2618 12 : } else if (contentType == filter.type || filter.type.empty()) {
2619 12 : if (isSearchable) {
2620 : // If it's a text match the body, else the display name
2621 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2622 36 : : message->body.at("displayName");
2623 12 : std::smatch body_match;
2624 12 : if (std::regex_search(body, body_match, re)) {
2625 5 : auto commit = message->body;
2626 5 : commit["id"] = message->id;
2627 5 : commit["type"] = message->type;
2628 5 : commits.emplace_back(commit);
2629 5 : }
2630 12 : } else {
2631 : // Matching type, just add it to the results
2632 0 : commits.emplace_back(message->body);
2633 : }
2634 :
2635 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2636 0 : break;
2637 : }
2638 20 : }
2639 :
2640 4 : if (commits.size() > 0)
2641 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2642 3 : sthis->pimpl_->accountId_,
2643 6 : sthis->id(),
2644 3 : std::move(commits));
2645 : // If we're the latest thread, inform client that the search is finished
2646 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2647 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2648 : req,
2649 4 : sthis->pimpl_->accountId_,
2650 8 : std::string {},
2651 8 : std::vector<std::map<std::string, std::string>> {});
2652 : }
2653 8 : }
2654 4 : });
2655 4 : }
2656 :
2657 : void
2658 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2659 : {
2660 14 : if (!message.isMember("confId")) {
2661 0 : JAMI_ERR() << "Malformed commit";
2662 0 : return;
2663 : }
2664 :
2665 14 : auto now = std::chrono::system_clock::now();
2666 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2667 : {
2668 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2669 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2670 14 : pimpl_->saveHostedCalls();
2671 14 : }
2672 :
2673 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2674 : }
2675 :
2676 : bool
2677 20 : Conversation::isHosting(const std::string& confId) const
2678 : {
2679 20 : auto info = infos();
2680 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2681 0 : return true; // We are the current device Host
2682 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2683 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2684 20 : }
2685 :
2686 : void
2687 11 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2688 : {
2689 11 : if (!message.isMember("confId")) {
2690 0 : JAMI_ERR() << "Malformed commit";
2691 0 : return;
2692 : }
2693 :
2694 11 : auto erased = false;
2695 : {
2696 11 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2697 11 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2698 11 : }
2699 11 : if (erased) {
2700 11 : pimpl_->saveHostedCalls();
2701 11 : sendMessage(std::move(message), "", {}, std::move(cb));
2702 : } else
2703 0 : cb(false, "");
2704 : }
2705 :
2706 : std::vector<std::map<std::string, std::string>>
2707 39 : Conversation::currentCalls() const
2708 : {
2709 39 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2710 78 : return pimpl_->activeCalls_;
2711 39 : }
2712 : } // namespace jami
|