Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "conversation.h"
18 :
19 : #include "account_const.h"
20 : #include "fileutils.h"
21 : #include "jamiaccount.h"
22 : #include "client/ring_signal.h"
23 :
24 : #include <charconv>
25 : #include <json/json.h>
26 : #include <string_view>
27 : #include <opendht/thread_pool.h>
28 : #include <tuple>
29 : #include <optional>
30 : #include "swarm/swarm_manager.h"
31 : #ifdef ENABLE_PLUGIN
32 : #include "manager.h"
33 : #include "plugin/jamipluginmanager.h"
34 : #include "plugin/streamdata.h"
35 : #endif
36 : #include "jami/conversation_interface.h"
37 :
38 : namespace jami {
39 :
40 : static const char* const LAST_MODIFIED = "lastModified";
41 44 : static const auto jsonBuilder = [] {
42 44 : Json::StreamWriterBuilder wbuilder;
43 44 : wbuilder["commentStyle"] = "None";
44 44 : wbuilder["indentation"] = "";
45 44 : return wbuilder;
46 0 : }();
47 :
48 13 : ConvInfo::ConvInfo(const Json::Value& json)
49 : {
50 13 : id = json[ConversationMapKeys::ID].asString();
51 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
52 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
53 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
54 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
55 20 : members.emplace(v["uri"].asString());
56 : }
57 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
58 13 : }
59 :
60 : Json::Value
61 25 : ConvInfo::toJson() const
62 : {
63 25 : Json::Value json;
64 25 : json[ConversationMapKeys::ID] = id;
65 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
66 25 : if (removed) {
67 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
68 : }
69 25 : if (erased) {
70 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
71 : }
72 64 : for (const auto& m : members) {
73 39 : Json::Value member;
74 39 : member["uri"] = m;
75 39 : json[ConversationMapKeys::MEMBERS].append(member);
76 39 : }
77 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
78 25 : return json;
79 0 : }
80 :
81 : // ConversationRequest
82 9455 : ConversationRequest::ConversationRequest(const Json::Value& json)
83 : {
84 9455 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
85 9455 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
86 9455 : from = json[ConversationMapKeys::FROM].asString();
87 9455 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
88 9455 : auto& md = json[ConversationMapKeys::METADATAS];
89 18925 : for (const auto& member : md.getMemberNames()) {
90 9469 : metadatas.emplace(member, md[member].asString());
91 9455 : }
92 9454 : }
93 :
94 : Json::Value
95 2 : ConversationRequest::toJson() const
96 : {
97 2 : Json::Value json;
98 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
99 2 : json[ConversationMapKeys::FROM] = from;
100 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
101 2 : if (declined)
102 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
103 4 : for (const auto& [key, value] : metadatas) {
104 2 : json[ConversationMapKeys::METADATAS][key] = value;
105 : }
106 2 : return json;
107 0 : }
108 :
109 : std::map<std::string, std::string>
110 218 : ConversationRequest::toMap() const
111 : {
112 218 : auto result = metadatas;
113 218 : result[ConversationMapKeys::ID] = conversationId;
114 218 : result[ConversationMapKeys::FROM] = from;
115 218 : if (declined)
116 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
117 218 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
118 218 : return result;
119 0 : }
120 :
121 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
122 :
123 : struct History
124 : {
125 : MessageList messageList {};
126 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
127 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
128 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
129 : };
130 :
131 : class Conversation::Impl
132 : {
133 : public:
134 181 : Impl(const std::shared_ptr<JamiAccount>& account,
135 : ConversationMode mode,
136 : const std::string& otherMember = "")
137 181 : : repository_(ConversationRepository::createConversation(account, mode, otherMember))
138 181 : , account_(account)
139 181 : , accountId_(account->getAccountID())
140 181 : , userId_(account->getUsername())
141 543 : , deviceId_(account->currentDeviceId())
142 : {
143 181 : if (!repository_) {
144 0 : throw std::logic_error("Unable to create repository");
145 : }
146 181 : init(account);
147 181 : }
148 :
149 18 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
150 18 : : account_(account)
151 18 : , accountId_(account->getAccountID())
152 18 : , userId_(account->getUsername())
153 36 : , deviceId_(account->currentDeviceId())
154 : {
155 18 : repository_ = std::make_unique<ConversationRepository>(account, conversationId);
156 18 : if (!repository_) {
157 0 : throw std::logic_error("Unable to create repository");
158 : }
159 18 : init(account);
160 18 : }
161 :
162 193 : Impl(const std::shared_ptr<JamiAccount>& account,
163 : const std::string& remoteDevice,
164 : const std::string& conversationId)
165 193 : : account_(account)
166 193 : , accountId_(account->getAccountID())
167 193 : , userId_(account->getUsername())
168 386 : , deviceId_(account->currentDeviceId())
169 : {
170 193 : std::vector<ConversationCommit> commits;
171 386 : repository_ = ConversationRepository::cloneConversation(account,
172 : remoteDevice,
173 : conversationId,
174 186 : [&](auto c) {
175 186 : commits = std::move(c);
176 192 : });
177 192 : if (!repository_) {
178 12 : emitSignal<libjami::ConversationSignal::OnConversationError>(
179 6 : accountId_, conversationId, EFETCH, "Unable to clone repository");
180 6 : throw std::logic_error("Unable to clone repository");
181 : }
182 : // To detect current active calls, we need to check history
183 372 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
184 558 : / "conversation_data" / conversationId;
185 186 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
186 186 : initActiveCalls(repository_->convCommitsToMap(commits));
187 186 : init(account);
188 417 : }
189 :
190 385 : void init(const std::shared_ptr<JamiAccount>& account) {
191 385 : ioContext_ = Manager::instance().ioContext();
192 385 : fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
193 : swarmManager_
194 770 : = std::make_shared<SwarmManager>(NodeId(deviceId_),
195 770 : Manager::instance().getSeededRandomEngine(),
196 385 : [account = account_](const DeviceId& deviceId) {
197 2579 : if (auto acc = account.lock()) {
198 2579 : return acc->isConnectedWith(deviceId);
199 2579 : }
200 0 : return false;
201 385 : });
202 385 : swarmManager_->setMobility(account->isMobile());
203 : transferManager_
204 385 : = std::make_shared<TransferManager>(accountId_,
205 : "",
206 385 : repository_->id(),
207 770 : Manager::instance().getSeededRandomEngine());
208 770 : conversationDataPath_ = fileutils::get_data_dir() / accountId_
209 1155 : / "conversation_data" / repository_->id();
210 385 : fetchedPath_ = conversationDataPath_ / "fetched";
211 385 : statusPath_ = conversationDataPath_ / "status";
212 385 : sendingPath_ = conversationDataPath_ / "sending";
213 385 : preferencesPath_ = conversationDataPath_ / ConversationMapKeys::PREFERENCES;
214 385 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
215 385 : hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
216 385 : loadActiveCalls();
217 385 : loadStatus();
218 385 : typers_ = std::make_shared<Typers>(account, repository_->id());
219 385 : }
220 :
221 1112 : const std::string& toString() const
222 : {
223 1112 : if (fmtStr_.empty()) {
224 374 : if (repository_->mode() == ConversationMode::ONE_TO_ONE) {
225 125 : auto peer = userId_;
226 374 : for (const auto& member : repository_->getInitialMembers()) {
227 249 : if (member != userId_) {
228 124 : peer = member;
229 : }
230 125 : }
231 125 : fmtStr_ = fmt::format("[Conversation (1:1) {}]", peer);
232 125 : } else {
233 498 : fmtStr_ = fmt::format("[Conversation {}]", repository_->id());
234 : }
235 : }
236 1112 : return fmtStr_;
237 : }
238 : mutable std::string fmtStr_;
239 :
240 385 : ~Impl()
241 : {
242 : try {
243 385 : if (fallbackTimer_)
244 385 : fallbackTimer_->cancel();
245 0 : } catch (const std::exception& e) {
246 0 : JAMI_ERROR("[Conversation {:s}] {:s}", toString(), e.what());
247 0 : }
248 385 : }
249 :
250 : /**
251 : * If, for whatever reason, the daemon is stopped while hosting a conference,
252 : * we need to announce the end of this call when restarting.
253 : * To avoid to keep active calls forever.
254 : */
255 : std::vector<std::string> commitsEndedCalls();
256 : bool isAdmin() const;
257 : std::filesystem::path repoPath() const;
258 :
259 280 : void announce(const std::string& commitId, bool commitFromSelf = false) const
260 : {
261 280 : std::vector<std::string> vec;
262 280 : if (!commitId.empty())
263 278 : vec.emplace_back(commitId);
264 280 : announce(vec, commitFromSelf);
265 280 : }
266 :
267 296 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false) const
268 : {
269 296 : std::vector<ConversationCommit> convcommits;
270 296 : convcommits.reserve(commits.size());
271 606 : for (const auto& cid : commits) {
272 310 : auto commit = repository_->getCommit(cid);
273 310 : if (commit != std::nullopt) {
274 310 : convcommits.emplace_back(*commit);
275 : }
276 310 : }
277 296 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
278 296 : }
279 :
280 : /**
281 : * Initialize activeCalls_ from the list of commits in the repository
282 : * @param commits Commits in reverse chronological order (i.e. from newest to oldest)
283 : */
284 186 : void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
285 : {
286 186 : std::unordered_set<std::string> invalidHostUris;
287 186 : std::unordered_set<std::string> invalidCallIds;
288 :
289 186 : std::lock_guard lk(activeCallsMtx_);
290 1179 : for (const auto& commit : commits) {
291 993 : if (commit.at("type") == "member") {
292 : // Each commit of type "member" has an "action" field whose value can be one
293 : // of the following: "add", "join", "remove", "ban", "unban"
294 : // In the case of "remove" and "ban", we need to add the member's URI to
295 : // invalidHostUris to ensure that any call they may have started in the past
296 : // is no longer considered active.
297 : // For the other actions, there's no harm in adding the member's URI anyway,
298 : // since it's not possible to start hosting a call before joining the swarm (or
299 : // before getting unbanned in the case of previously banned members).
300 782 : invalidHostUris.emplace(commit.at("uri"));
301 423 : } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
302 423 : && commit.find("device") != commit.end()) {
303 : // The commit indicates either the end or the beginning of a call
304 : // (depending on whether there is a "duration" field or not).
305 1 : auto convId = repository_->id();
306 2 : auto confId = commit.at("confId");
307 2 : auto uri = commit.at("uri");
308 2 : auto device = commit.at("device");
309 :
310 2 : if (commit.find("duration") == commit.end()
311 1 : && invalidCallIds.find(confId) == invalidCallIds.end()
312 3 : && invalidHostUris.find(uri) == invalidHostUris.end()) {
313 1 : std::map<std::string, std::string> activeCall;
314 1 : activeCall["id"] = confId;
315 1 : activeCall["uri"] = uri;
316 1 : activeCall["device"] = device;
317 1 : activeCalls_.emplace_back(activeCall);
318 0 : fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
319 : convId,
320 : confId,
321 : device,
322 : uri);
323 1 : }
324 : // Even if the call was active, we still add its ID to invalidCallIds to make sure it
325 : // doesn't get added a second time. (This shouldn't happen normally, but in practice
326 : // there are sometimes multiple commits indicating the beginning of the same call.)
327 1 : invalidCallIds.emplace(confId);
328 1 : }
329 : }
330 186 : saveActiveCalls();
331 186 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
332 186 : repository_->id(),
333 186 : activeCalls_);
334 186 : }
335 :
336 : /**
337 : * Update activeCalls_ via announced commits (in load or via new commits)
338 : * @param commit Commit to check
339 : * @param eraseOnly If we want to ignore added commits
340 : * @param emitSig If we want to emit to client
341 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
342 : */
343 81 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
344 : bool eraseOnly = false,
345 : bool emitSig = true) const
346 : {
347 81 : if (!repository_)
348 0 : return;
349 81 : if (commit.at("type") == "member") {
350 : // In this case, we need to check if we are not removing a hosting member or device
351 21 : std::lock_guard lk(activeCallsMtx_);
352 21 : auto it = activeCalls_.begin();
353 21 : auto updateActives = false;
354 22 : while (it != activeCalls_.end()) {
355 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
356 3 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
357 : it->at("id"),
358 : commit.at("uri"));
359 1 : it = activeCalls_.erase(it);
360 1 : updateActives = true;
361 : } else {
362 0 : ++it;
363 : }
364 : }
365 21 : if (updateActives) {
366 1 : saveActiveCalls();
367 1 : if (emitSig)
368 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
369 1 : repository_->id(),
370 1 : activeCalls_);
371 : }
372 21 : return;
373 21 : }
374 : // Else, it's a call information
375 176 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
376 176 : && commit.find("device") != commit.end()) {
377 56 : auto convId = repository_->id();
378 112 : auto confId = commit.at("confId");
379 112 : auto uri = commit.at("uri");
380 112 : auto device = commit.at("device");
381 56 : std::lock_guard lk(activeCallsMtx_);
382 56 : auto itActive = std::find_if(activeCalls_.begin(),
383 : activeCalls_.end(),
384 25 : [&](const auto& value) {
385 75 : return value.at("id") == confId
386 50 : && value.at("uri") == uri
387 125 : && value.at("device") == device;
388 : });
389 56 : if (commit.find("duration") == commit.end()) {
390 31 : if (itActive == activeCalls_.end() && !eraseOnly) {
391 93 : JAMI_DEBUG(
392 : "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
393 : convId,
394 : confId,
395 : device,
396 : uri);
397 31 : std::map<std::string, std::string> activeCall;
398 31 : activeCall["id"] = confId;
399 31 : activeCall["uri"] = uri;
400 31 : activeCall["device"] = device;
401 31 : activeCalls_.emplace_back(activeCall);
402 31 : saveActiveCalls();
403 31 : if (emitSig)
404 31 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
405 : repository_
406 31 : ->id(),
407 31 : activeCalls_);
408 31 : }
409 : } else {
410 25 : if (itActive != activeCalls_.end()) {
411 25 : itActive = activeCalls_.erase(itActive);
412 : // Unlikely, but we must ensure that no duplicate exists
413 25 : while (itActive != activeCalls_.end()) {
414 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
415 0 : return value.at("id") == confId && value.at("uri") == uri
416 0 : && value.at("device") == device;
417 : });
418 0 : if (itActive != activeCalls_.end()) {
419 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
420 0 : itActive = activeCalls_.erase(itActive);
421 : }
422 : }
423 :
424 25 : if (eraseOnly) {
425 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
426 : "{:s}, account {:s}",
427 : convId,
428 : confId,
429 : device,
430 : uri);
431 : } else {
432 75 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
433 : convId,
434 : confId,
435 : device,
436 : uri);
437 : }
438 : }
439 25 : saveActiveCalls();
440 25 : if (emitSig)
441 25 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
442 25 : repository_->id(),
443 25 : activeCalls_);
444 : }
445 56 : }
446 : }
447 :
448 1197 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false) const
449 : {
450 1197 : if (!repository_)
451 0 : return;
452 1197 : auto convId = repository_->id();
453 1197 : auto ok = !commits.empty();
454 3589 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
455 1197 : addToHistory(commits, true, commitFromSelf);
456 1197 : if (ok) {
457 1195 : bool announceMember = false;
458 2456 : for (const auto& c : commits) {
459 : // Announce member events
460 1262 : if (c.at("type") == "member") {
461 931 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
462 931 : const auto& uri = c.at("uri");
463 931 : const auto& actionStr = c.at("action");
464 931 : auto action = -1;
465 931 : if (actionStr == "add")
466 440 : action = 0;
467 491 : else if (actionStr == "join")
468 468 : action = 1;
469 23 : else if (actionStr == "remove")
470 1 : action = 2;
471 22 : else if (actionStr == "ban")
472 20 : action = 3;
473 2 : else if (actionStr == "unban")
474 2 : action = 4;
475 931 : if (actionStr == "ban" || actionStr == "remove") {
476 : // In this case, a potential host was removed during a call.
477 21 : updateActiveCalls(c);
478 21 : typers_->removeTyper(uri);
479 : }
480 931 : if (action != -1) {
481 931 : announceMember = true;
482 1862 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(
483 931 : accountId_, convId, uri, action);
484 : }
485 : }
486 331 : } else if (c.at("type") == "application/call-history+json") {
487 60 : updateActiveCalls(c);
488 : }
489 : #ifdef ENABLE_PLUGIN
490 : auto& pluginChatManager
491 1262 : = Manager::instance().getJamiPluginManager().getChatServicesManager();
492 1262 : if (pluginChatManager.hasHandlers()) {
493 4 : auto cm = std::make_shared<JamiMessage>(accountId_,
494 : convId,
495 8 : c.at("author") != userId_,
496 : c,
497 8 : false);
498 4 : cm->isSwarm = true;
499 4 : pluginChatManager.publishMessage(std::move(cm));
500 4 : }
501 : #endif
502 : // announce message
503 1262 : emitSignal<libjami::ConversationSignal::MessageReceived>(accountId_, convId, c);
504 : }
505 :
506 1195 : if (announceMember && onMembersChanged_) {
507 919 : onMembersChanged_(repository_->memberUris("", {}));
508 : }
509 : }
510 1197 : }
511 :
512 385 : void loadStatus()
513 : {
514 : try {
515 : // read file
516 768 : auto file = fileutils::loadFile(statusPath_);
517 : // load values
518 2 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
519 2 : std::lock_guard lk {messageStatusMtx_};
520 2 : oh.get().convert(messagesStatus_);
521 385 : } catch (const std::exception& e) {
522 383 : }
523 385 : }
524 1100 : void saveStatus()
525 : {
526 1100 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
527 1100 : msgpack::pack(file, messagesStatus_);
528 1100 : }
529 :
530 403 : void loadActiveCalls() const
531 : {
532 : try {
533 : // read file
534 794 : auto file = fileutils::loadFile(activeCallsPath_);
535 : // load values
536 12 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
537 12 : std::lock_guard lk {activeCallsMtx_};
538 12 : oh.get().convert(activeCalls_);
539 403 : } catch (const std::exception& e) {
540 391 : return;
541 391 : }
542 : }
543 :
544 261 : void saveActiveCalls() const
545 : {
546 261 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
547 261 : msgpack::pack(file, activeCalls_);
548 261 : }
549 :
550 18 : void loadHostedCalls() const
551 : {
552 : try {
553 : // read file
554 30 : auto file = fileutils::loadFile(hostedCallsPath_);
555 : // load values
556 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
557 6 : std::lock_guard lk {activeCallsMtx_};
558 6 : oh.get().convert(hostedCalls_);
559 18 : } catch (const std::exception& e) {
560 12 : return;
561 12 : }
562 : }
563 :
564 43 : void saveHostedCalls() const
565 : {
566 43 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
567 43 : msgpack::pack(file, hostedCalls_);
568 43 : }
569 :
570 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
571 :
572 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
573 : bool includeLeft,
574 : bool includeBanned) const;
575 :
576 7589 : std::string_view bannedType(const std::string& uri) const
577 : {
578 7589 : auto repo = repoPath();
579 0 : auto crt = fmt::format("{}.crt", uri);
580 15176 : auto bannedMember = repo / "banned" / "members" / crt;
581 7590 : if (std::filesystem::is_regular_file(bannedMember))
582 17 : return "members"sv;
583 15145 : auto bannedAdmin = repo / "banned" / "admins" / crt;
584 7573 : if (std::filesystem::is_regular_file(bannedAdmin))
585 0 : return "admins"sv;
586 15143 : auto bannedInvited = repo / "banned" / "invited" / uri;
587 7572 : if (std::filesystem::is_regular_file(bannedInvited))
588 2 : return "invited"sv;
589 15140 : auto bannedDevice = repo / "banned" / "devices" / crt;
590 7569 : if (std::filesystem::is_regular_file(bannedDevice))
591 0 : return "devices"sv;
592 7571 : return {};
593 7589 : }
594 :
595 5112 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
596 : {
597 5112 : auto deviceSockets = gitSocketList_.find(deviceId);
598 10226 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
599 : }
600 :
601 2349 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
602 : {
603 2349 : gitSocketList_[deviceId] = socket;
604 2349 : }
605 823 : void removeGitSocket(const DeviceId& deviceId)
606 : {
607 823 : auto deviceSockets = gitSocketList_.find(deviceId);
608 823 : if (deviceSockets != gitSocketList_.end())
609 428 : gitSocketList_.erase(deviceSockets);
610 823 : }
611 :
612 : /**
613 : * Remove all git sockets and all DRT nodes associated with the given peer.
614 : * This is used when a swarm member is banned to ensure that we stop syncing
615 : * with them or sending them message notifications.
616 : */
617 : void disconnectFromPeer(const std::string& peerUri);
618 :
619 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
620 : bool includeLeft) const;
621 :
622 : std::mutex membersMtx_ {};
623 : std::set<std::string> checkedMembers_; // Store members we tried
624 : std::function<void()> bootstrapCb_;
625 : #ifdef LIBJAMI_TESTABLE
626 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
627 : #endif
628 :
629 : std::mutex writeMtx_ {};
630 : std::unique_ptr<ConversationRepository> repository_;
631 : std::shared_ptr<SwarmManager> swarmManager_;
632 : std::weak_ptr<JamiAccount> account_;
633 : std::string accountId_ {};
634 : std::string userId_;
635 : std::string deviceId_;
636 : std::atomic_bool isRemoving_ {false};
637 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
638 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
639 : History* optHistory = nullptr);
640 : void pull(const std::string& deviceId);
641 : std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
642 :
643 : // Avoid multiple fetch/merges at the same time.
644 : std::mutex pullcbsMtx_ {};
645 : std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch
646 : std::shared_ptr<TransferManager> transferManager_ {};
647 : std::filesystem::path conversationDataPath_ {};
648 : std::filesystem::path fetchedPath_ {};
649 :
650 : // Manage last message displayed and status
651 : std::filesystem::path sendingPath_ {};
652 : std::filesystem::path preferencesPath_ {};
653 : OnMembersChanged onMembersChanged_ {};
654 :
655 : // Manage hosted calls on this device
656 : std::filesystem::path hostedCallsPath_ {};
657 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
658 : // Manage active calls for this conversation (can be hosted by other devices)
659 : std::filesystem::path activeCallsPath_ {};
660 : mutable std::mutex activeCallsMtx_ {};
661 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
662 :
663 : GitSocketList gitSocketList_ {};
664 :
665 : // Bootstrap
666 : std::shared_ptr<asio::io_context> ioContext_;
667 : std::unique_ptr<asio::steady_timer> fallbackTimer_;
668 :
669 :
670 : /**
671 : * Loaded history represents the linearized history to show for clients
672 : */
673 : mutable History loadedHistory_ {};
674 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
675 : const std::vector<std::map<std::string, std::string>>& commits,
676 : bool messageReceived = false,
677 : bool commitFromSelf = false,
678 : History* history = nullptr) const;
679 : // While loading the history, we need to avoid:
680 : // - reloading history (can just be ignored)
681 : // - adding new commits (should wait for history to be loaded)
682 : bool isLoadingHistory_ {false};
683 : mutable std::mutex historyMtx_ {};
684 : mutable std::condition_variable historyCv_ {};
685 :
686 : void handleReaction(History& history,
687 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
688 : void handleEdition(History& history,
689 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
690 : bool messageReceived) const;
691 : bool handleMessage(History& history,
692 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
693 : bool messageReceived) const;
694 : void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
695 : History& history) const;
696 : /**
697 : * {uri, {
698 : * {"fetch", "commitId"},
699 : * {"fetched_ts", "timestamp"},
700 : * {"read", "commitId"},
701 : * {"read_ts", "timestamp"}
702 : * }
703 : * }
704 : */
705 : mutable std::mutex messageStatusMtx_;
706 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
707 : std::filesystem::path statusPath_ {};
708 : mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
709 : /**
710 : * Status: 0 = commited, 1 = fetched, 2 = read
711 : * This cache the curent status to add in the messages
712 : */
713 : // Note: only store int32_t cause it's easy to pass to dbus this way
714 : // memberToStatus serves as a cache for loading messages
715 : mutable std::map<std::string, int32_t> memberToStatus;
716 :
717 :
718 : // futureStatus is used to store the status for receiving messages
719 : // (because we're not sure to fetch the commit before receiving a status change for this)
720 : mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
721 : // Update internal structures regarding status
722 : void updateStatus(const std::string& uri,
723 : libjami::Account::MessageStates status,
724 : const std::string& commitId,
725 : const std::string& ts,
726 : bool emit = false);
727 :
728 :
729 : std::shared_ptr<Typers> typers_;
730 : };
731 :
732 : bool
733 20 : Conversation::Impl::isAdmin() const
734 : {
735 40 : auto adminsPath = repoPath() / "admins";
736 40 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
737 20 : }
738 :
739 : void
740 18 : Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
741 : {
742 : // Remove nodes from swarmManager
743 18 : const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
744 18 : std::vector<NodeId> toRemove;
745 41 : for (const auto node : nodes)
746 23 : if (peerUri == repository_->uriFromDevice(node.toString()))
747 11 : toRemove.emplace_back(node);
748 18 : swarmManager_->deleteNode(toRemove);
749 :
750 : // Remove git sockets with this member
751 40 : for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
752 22 : if (peerUri == repository_->uriFromDevice(it->first.toString()))
753 11 : it = gitSocketList_.erase(it);
754 : else
755 11 : ++it;
756 : }
757 18 : }
758 :
759 : std::vector<std::map<std::string, std::string>>
760 431 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
761 : {
762 431 : std::vector<std::map<std::string, std::string>> result;
763 431 : auto members = repository_->members();
764 431 : std::lock_guard lk(messageStatusMtx_);
765 1245 : for (const auto& member : members) {
766 814 : if (member.role == MemberRole::BANNED && !includeBanned) {
767 168 : continue;
768 : }
769 814 : if (member.role == MemberRole::INVITED && !includeInvited)
770 168 : continue;
771 646 : if (member.role == MemberRole::LEFT && !includeLeft)
772 0 : continue;
773 646 : auto mm = member.map();
774 646 : mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
775 646 : result.emplace_back(std::move(mm));
776 646 : }
777 862 : return result;
778 431 : }
779 :
780 : std::vector<std::string>
781 18 : Conversation::Impl::commitsEndedCalls()
782 : {
783 : // Handle current calls
784 18 : std::vector<std::string> commits {};
785 18 : std::unique_lock lk(writeMtx_);
786 18 : std::unique_lock lkA(activeCallsMtx_);
787 18 : for (const auto& hostedCall : hostedCalls_) {
788 : // In this case, this means that we left
789 : // the conference while still hosting it, so activeCalls
790 : // will not be correctly updated
791 : // We don't need to send notifications there, as peers will sync with presence
792 0 : Json::Value value;
793 0 : value["uri"] = userId_;
794 0 : value["device"] = deviceId_;
795 0 : value["confId"] = hostedCall.first;
796 0 : value["type"] = "application/call-history+json";
797 0 : auto now = std::chrono::system_clock::now();
798 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
799 0 : .count();
800 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
801 0 : auto itActive = std::find_if(activeCalls_.begin(),
802 : activeCalls_.end(),
803 0 : [this, confId = hostedCall.first](const auto& value) {
804 0 : return value.at("id") == confId && value.at("uri") == userId_
805 0 : && value.at("device") == deviceId_;
806 : });
807 0 : if (itActive != activeCalls_.end())
808 0 : activeCalls_.erase(itActive);
809 0 : commits.emplace_back(repository_->commitMessage(Json::writeString(jsonBuilder, value)));
810 :
811 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
812 0 : }
813 18 : hostedCalls_.clear();
814 18 : saveActiveCalls();
815 18 : saveHostedCalls();
816 36 : return commits;
817 18 : }
818 :
819 : std::filesystem::path
820 14145 : Conversation::Impl::repoPath() const
821 : {
822 28292 : return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
823 : }
824 :
825 : std::vector<std::map<std::string, std::string>>
826 25 : Conversation::Impl::loadMessages(const LogOptions& options)
827 : {
828 25 : if (!repository_)
829 0 : return {};
830 25 : std::vector<ConversationCommit> commits;
831 25 : auto startLogging = options.from == "";
832 25 : auto breakLogging = false;
833 50 : repository_->log(
834 90 : [&](const auto& id, const auto& author, const auto& commit) {
835 90 : if (!commits.empty()) {
836 : // Set linearized parent
837 65 : commits.rbegin()->linearized_parent = id;
838 : }
839 90 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
840 0 : return CallbackResult::Skip;
841 : }
842 90 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
843 0 : return CallbackResult::Break; // Stop logging
844 90 : if (breakLogging)
845 0 : return CallbackResult::Break; // Stop logging
846 90 : if (id == options.to) {
847 18 : if (options.includeTo)
848 0 : breakLogging = true; // For the next commit
849 : else
850 18 : return CallbackResult::Break; // Stop logging
851 : }
852 :
853 72 : if (!startLogging && options.from != "" && options.from == id)
854 0 : startLogging = true;
855 72 : if (!startLogging)
856 0 : return CallbackResult::Skip; // Start logging after this one
857 :
858 72 : if (options.fastLog) {
859 0 : if (options.authorUri != "") {
860 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
861 0 : return CallbackResult::Break; // Found author, stop
862 : }
863 : }
864 : // Used to only count commit
865 0 : commits.emplace(commits.end(), ConversationCommit {});
866 0 : return CallbackResult::Skip;
867 : }
868 :
869 72 : return CallbackResult::Ok; // Continue
870 : },
871 72 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
872 72 : [](auto, auto, auto) { return false; },
873 25 : options.from,
874 25 : options.logIfNotFound);
875 25 : return repository_->convCommitsToMap(commits);
876 25 : }
877 :
878 : std::vector<libjami::SwarmMessage>
879 1555 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
880 : {
881 1555 : if (!optHistory) {
882 2 : std::lock_guard lock(historyMtx_);
883 2 : if (!repository_ || isLoadingHistory_)
884 0 : return {};
885 2 : isLoadingHistory_ = true;
886 2 : }
887 :
888 : // Reset variables if needed to get the correct status.
889 : // options.from stores a swarmMessageId; this option is used
890 : // to load the messages of a conversation from a specific message
891 1555 : if (options.from.empty()) {
892 455 : memberToStatus.clear();
893 : }
894 :
895 : // By convention, if options.nbOfCommits is zero, then we
896 : // don't impose a limit on the number of commits returned.
897 1555 : bool limitNbOfCommits = options.nbOfCommits > 0;
898 :
899 1555 : auto startLogging = options.from == "";
900 1555 : auto breakLogging = false;
901 1555 : auto currentHistorySize = loadedHistory_.messageList.size();
902 1555 : std::vector<std::string> replies;
903 1555 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
904 3110 : repository_->log(
905 : /* preCondition */
906 11184 : [&](const auto& id, const auto& author, const auto& commit) {
907 11184 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
908 0 : return CallbackResult::Skip;
909 : }
910 11184 : if (replies.empty()) { // This avoid load until
911 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
912 : // until this commit
913 22366 : if ((limitNbOfCommits
914 11954 : && (loadedHistory_.messageList.size() - currentHistorySize)
915 771 : == options.nbOfCommits))
916 4 : return CallbackResult::Break; // Stop logging
917 11179 : if (breakLogging)
918 0 : return CallbackResult::Break; // Stop logging
919 11179 : if (id == options.to) {
920 568 : if (options.includeTo)
921 0 : breakLogging = true; // For the next commit
922 : else
923 568 : return CallbackResult::Break; // Stop logging
924 : }
925 : }
926 :
927 10612 : if (!startLogging && options.from != "" && options.from == id)
928 1094 : startLogging = true;
929 10612 : if (!startLogging)
930 24 : return CallbackResult::Skip; // Start logging after this one
931 :
932 10588 : if (options.fastLog) {
933 9813 : if (options.authorUri != "") {
934 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
935 1 : return CallbackResult::Break; // Found author, stop
936 : }
937 : }
938 : }
939 :
940 10587 : return CallbackResult::Ok; // Continue
941 : },
942 : /* emplaceCb */
943 10586 : [&](auto&& cc) {
944 10586 : if(limitNbOfCommits && (msgList.size() == options.nbOfCommits))
945 324 : return;
946 10263 : auto optMessage = repository_->convCommitToMap(cc);
947 10264 : if (!optMessage.has_value())
948 1 : return;
949 10263 : auto message = optMessage.value();
950 10263 : if (message.find("reply-to") != message.end()) {
951 1 : replies.emplace_back(message.at("reply-to"));
952 : }
953 10262 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
954 10263 : if (it != replies.end()) {
955 1 : replies.erase(it);
956 : }
957 10263 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
958 10263 : if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) {
959 0 : firstMsg = *loadedHistory_.messageList.rbegin();
960 : }
961 30787 : auto added = addToHistory({message}, false, false, optHistory);
962 10263 : if (!added.empty() && firstMsg) {
963 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
964 0 : repository_->id(),
965 0 : *firstMsg);
966 : }
967 10263 : msgList.insert(msgList.end(), added.begin(), added.end());
968 10264 : },
969 : /* postCondition */
970 10587 : [&](auto, auto, auto) {
971 : // Stop logging if there was a limit set on the number of commits
972 : // to return and we reached it. This isn't strictly necessary since
973 : // the check at the beginning of `emplaceCb` ensures that we won't
974 : // return too many messages, but it prevents us from needlessly
975 : // iterating over a (potentially) large number of commits.
976 10587 : return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
977 : },
978 1555 : options.from,
979 1555 : options.logIfNotFound);
980 :
981 : // Convert for client (remove ptr)
982 1555 : std::vector<libjami::SwarmMessage> ret;
983 1555 : ret.reserve(msgList.size());
984 11809 : for (const auto& msg: msgList) {
985 10254 : ret.emplace_back(*msg);
986 : }
987 1555 : if (!optHistory) {
988 2 : std::lock_guard lock(historyMtx_);
989 2 : isLoadingHistory_ = false;
990 2 : historyCv_.notify_all();
991 2 : }
992 :
993 1555 : return ret;
994 1555 : }
995 :
996 : void
997 3 : Conversation::Impl::handleReaction(History& history,
998 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
999 : {
1000 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
1001 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1002 3 : if (peditIt != history.pendingEditions.end()) {
1003 0 : auto oldBody = sharedCommit->body;
1004 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1005 0 : if (sharedCommit->body.at("body").empty())
1006 0 : return;
1007 0 : history.pendingEditions.erase(peditIt);
1008 0 : }
1009 3 : if (it != history.quickAccess.end()) {
1010 3 : it->second->reactions.emplace_back(sharedCommit->body);
1011 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
1012 3 : repository_->id(),
1013 3 : it->second->id,
1014 3 : sharedCommit->body);
1015 : } else {
1016 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
1017 : }
1018 : }
1019 :
1020 : void
1021 8 : Conversation::Impl::handleEdition(History& history,
1022 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1023 : bool messageReceived) const
1024 : {
1025 16 : auto editId = sharedCommit->body.at("edit");
1026 8 : auto it = history.quickAccess.find(editId);
1027 8 : if (it != history.quickAccess.end()) {
1028 6 : auto baseCommit = it->second;
1029 6 : if (baseCommit) {
1030 6 : auto itReact = baseCommit->body.find("react-to");
1031 6 : std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
1032 12 : "tid" : "body";
1033 6 : auto body = sharedCommit->body.at(toReplace);
1034 : // Edit reaction
1035 6 : if (itReact != baseCommit->body.end()) {
1036 1 : baseCommit->body[toReplace] = body; // Replace body if pending
1037 1 : it = history.quickAccess.find(itReact->second);
1038 1 : auto itPending = history.pendingReactions.find(itReact->second);
1039 1 : if (it != history.quickAccess.end()) {
1040 1 : baseCommit = it->second; // Base commit
1041 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1042 1 : baseCommit->reactions.end(),
1043 1 : [&](const auto& reaction) {
1044 1 : return reaction.at("id") == editId;
1045 : });
1046 1 : if (itPreviousReact != baseCommit->reactions.end()) {
1047 1 : (*itPreviousReact)[toReplace] = body;
1048 1 : if (body.empty()) {
1049 1 : baseCommit->reactions.erase(itPreviousReact);
1050 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
1051 : repository_
1052 1 : ->id(),
1053 1 : baseCommit->id,
1054 : editId);
1055 : }
1056 : }
1057 0 : } else if (itPending != history.pendingReactions.end()) {
1058 : // Else edit if pending
1059 0 : auto itReaction = std::find_if(itPending->second.begin(),
1060 0 : itPending->second.end(),
1061 0 : [&](const auto& reaction) {
1062 0 : return reaction.at("id") == editId;
1063 : });
1064 0 : if (itReaction != itPending->second.end()) {
1065 0 : (*itReaction)[toReplace] = body;
1066 0 : if (body.empty())
1067 0 : itPending->second.erase(itReaction);
1068 : }
1069 : } else {
1070 : // Add to pending edtions
1071 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1072 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1073 : }
1074 : } else {
1075 : // Normal message
1076 5 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1077 5 : it->second->body[toReplace] = sharedCommit->body[toReplace];
1078 5 : if (toReplace == "tid") {
1079 : // Avoid to replace fileId in client
1080 1 : it->second->body["fileId"] = "";
1081 : }
1082 : // Remove reactions
1083 5 : if (sharedCommit->body.at(toReplace).empty())
1084 2 : it->second->reactions.clear();
1085 5 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1086 : }
1087 6 : }
1088 6 : } else {
1089 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1090 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
1091 : }
1092 8 : }
1093 :
1094 : bool
1095 11491 : Conversation::Impl::handleMessage(History& history,
1096 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1097 : bool messageReceived) const
1098 : {
1099 11491 : if (messageReceived) {
1100 : // For a received message, we place it at the beginning of the list
1101 1217 : if (!history.messageList.empty())
1102 976 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1103 1217 : history.messageList.emplace_front(sharedCommit);
1104 : } else {
1105 : // For a loaded message, we load from newest to oldest
1106 : // So we change the parent of the last message.
1107 10274 : if (!history.messageList.empty())
1108 8731 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1109 10274 : history.messageList.emplace_back(sharedCommit);
1110 : }
1111 : // Handle pending reactions/editions
1112 11489 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1113 11490 : if (reactIt != history.pendingReactions.end()) {
1114 0 : for (const auto& commitBody : reactIt->second)
1115 0 : sharedCommit->reactions.emplace_back(commitBody);
1116 0 : history.pendingReactions.erase(reactIt);
1117 : }
1118 11490 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1119 11491 : if (peditIt != history.pendingEditions.end()) {
1120 0 : auto oldBody = sharedCommit->body;
1121 0 : if (sharedCommit->type == "application/data-transfer+json") {
1122 0 : sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1123 0 : sharedCommit->body["fileId"] = "";
1124 : } else {
1125 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1126 : }
1127 0 : peditIt->second.pop_front();
1128 0 : for (const auto& commit : peditIt->second) {
1129 0 : sharedCommit->editions.emplace_back(commit->body);
1130 : }
1131 0 : sharedCommit->editions.emplace_back(oldBody);
1132 0 : history.pendingEditions.erase(peditIt);
1133 0 : }
1134 : // Announce to client
1135 11490 : if (messageReceived)
1136 2434 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
1137 1217 : repository_->id(),
1138 1217 : *sharedCommit);
1139 11491 : return !messageReceived;
1140 : }
1141 :
1142 11502 : void Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
1143 : History& history) const
1144 : {
1145 :
1146 11502 : auto parentIt = history.quickAccess.find(message->linearizedParent);
1147 11502 : auto currentMessage = message;
1148 :
1149 23560 : while(parentIt != history.quickAccess.end()){
1150 12060 : const auto& parent = parentIt->second;
1151 13471 : for (const auto& [peer, value] : message->status) {
1152 12359 : auto parentStatusIt = parent->status.find(peer);
1153 12357 : if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1154 1412 : parent->status[peer] = value;
1155 2824 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
1156 1412 : accountId_,
1157 1412 : repository_->id(),
1158 : peer,
1159 1412 : parent->id,
1160 : value);
1161 : }
1162 10944 : else if(parentStatusIt->second >= value){
1163 10944 : break;
1164 : }
1165 : }
1166 12056 : currentMessage = parent;
1167 12054 : parentIt = history.quickAccess.find(parent->linearizedParent);
1168 : }
1169 11501 : }
1170 :
1171 :
1172 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1173 11480 : Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::string>>& commits,
1174 : bool messageReceived,
1175 : bool commitFromSelf,
1176 : History* optHistory) const
1177 : {
1178 11480 : auto acc = account_.lock();
1179 11479 : if (!acc)
1180 0 : return {};
1181 11479 : auto username = acc->getUsername();
1182 11480 : if (messageReceived && (!optHistory && isLoadingHistory_)) {
1183 0 : std::unique_lock lk(historyMtx_);
1184 0 : historyCv_.wait(lk, [&] { return !isLoadingHistory_; });
1185 0 : }
1186 11480 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1187 11545 : auto addCommit = [&](const auto& commit) {
1188 11545 : auto* history = optHistory ? optHistory : &loadedHistory_;
1189 23090 : auto commitId = commit.at("id");
1190 11545 : if (history->quickAccess.find(commitId) != history->quickAccess.end())
1191 10 : return; // Already present
1192 11534 : auto typeIt = commit.find("type");
1193 11535 : auto reactToIt = commit.find("react-to");
1194 11534 : auto editIt = commit.find("edit");
1195 : // Nothing to show for the client, skip
1196 11535 : if (typeIt != commit.end() && typeIt->second == "merge")
1197 33 : return;
1198 :
1199 11502 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1200 11501 : sharedCommit->fromMapStringString(commit);
1201 : // Set message status based on cache (only on history for client)
1202 11501 : if (!commitFromSelf && optHistory == nullptr) {
1203 924 : std::lock_guard lk(messageStatusMtx_);
1204 13564 : for (const auto& member: repository_->members()) {
1205 : // If we have a status cached, use it
1206 12639 : auto itFuture = futureStatus.find(sharedCommit->id);
1207 12642 : if (itFuture != futureStatus.end()) {
1208 17 : sharedCommit->status = std::move(itFuture->second);
1209 17 : futureStatus.erase(itFuture);
1210 932 : continue;
1211 : }
1212 : // Else we need to compute the status.
1213 12626 : auto& cache = memberToStatus[member.uri];
1214 12624 : if (cache == 0) {
1215 : // Message is sending, sent or displayed
1216 1045 : cache = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1217 : }
1218 12624 : if (!messageReceived) {
1219 : // For loading previous messages, there is 3 cases. Last value cached is displayed, so is every previous commits
1220 : // Else, if last value is sent, we can compare to the last read commit to update the cache
1221 : // Finally if it's sending, we check last fetched commit
1222 12 : if (cache == static_cast<int32_t>(libjami::Account::MessageStates::SENT)) {
1223 0 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1224 0 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1225 : }
1226 12 : } else if (cache <= static_cast<int32_t>(libjami::Account::MessageStates::SENDING)) { // SENDING or UNKNOWN
1227 : // cache can be upgraded to displayed or sent
1228 9 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1229 1 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1230 8 : } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1231 0 : cache = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1232 : }
1233 : }
1234 12 : if(static_cast<int32_t>(cache) > sharedCommit->status[member.uri]){
1235 12 : sharedCommit->status[member.uri] = static_cast<int32_t>(cache);
1236 : }
1237 : } else {
1238 : // If member is author of the message received, they already saw it
1239 12612 : if (member.uri == commit.at("author")) {
1240 : // If member is the author of the commit, they are considered as displayed (same for all previous commits)
1241 915 : messagesStatus_[member.uri]["read"] = sharedCommit->id;
1242 915 : messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1243 915 : sharedCommit->status[commit.at("author")] = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1244 915 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1245 915 : continue;
1246 : }
1247 : // For receiving messages, every commit is considered as SENDING, unless we got a update
1248 11697 : auto status = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1249 11697 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1250 0 : status = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1251 11697 : } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1252 0 : status = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1253 : }
1254 11695 : if(static_cast<int32_t>(status) > sharedCommit->status[member.uri]){
1255 11696 : sharedCommit->status[member.uri] = static_cast<int32_t>(status);
1256 : }
1257 : }
1258 : }
1259 924 : }
1260 11501 : history->quickAccess[commitId] = sharedCommit;
1261 :
1262 11500 : if (reactToIt != commit.end() && !reactToIt->second.empty()) {
1263 3 : handleReaction(*history, sharedCommit);
1264 11498 : } else if (editIt != commit.end() && !editIt->second.empty()) {
1265 8 : handleEdition(*history, sharedCommit, messageReceived);
1266 11491 : } else if (handleMessage(*history, sharedCommit, messageReceived)) {
1267 10274 : messages.emplace_back(sharedCommit);
1268 : }
1269 11501 : rectifyStatus(sharedCommit, *history);
1270 11543 : };
1271 11480 : std::for_each(commits.begin(), commits.end(), addCommit);
1272 :
1273 11479 : return messages;
1274 11480 : }
1275 :
1276 181 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1277 : ConversationMode mode,
1278 181 : const std::string& otherMember)
1279 181 : : pimpl_ {new Impl {account, mode, otherMember}}
1280 181 : {}
1281 :
1282 18 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1283 18 : const std::string& conversationId)
1284 18 : : pimpl_ {new Impl {account, conversationId}}
1285 18 : {}
1286 :
1287 193 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1288 : const std::string& remoteDevice,
1289 193 : const std::string& conversationId)
1290 193 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1291 193 : {}
1292 :
1293 385 : Conversation::~Conversation() {}
1294 :
1295 : std::string
1296 8137 : Conversation::id() const
1297 : {
1298 8137 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1299 : }
1300 :
1301 : void
1302 137 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1303 : {
1304 : try {
1305 137 : if (mode() == ConversationMode::ONE_TO_ONE) {
1306 : // Only authorize to add left members
1307 1 : auto initialMembers = getInitialMembers();
1308 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1309 1 : if (it == initialMembers.end()) {
1310 1 : JAMI_WARN("Unable to add new member in one to one conversation");
1311 1 : cb(false, "");
1312 1 : return;
1313 : }
1314 1 : }
1315 0 : } catch (const std::exception& e) {
1316 0 : JAMI_WARN("Unable to get mode: %s", e.what());
1317 0 : cb(false, "");
1318 0 : return;
1319 0 : }
1320 136 : if (isMember(contactUri, true)) {
1321 0 : JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1322 0 : cb(false, "");
1323 0 : return;
1324 : }
1325 136 : if (isBanned(contactUri)) {
1326 3 : if (pimpl_->isAdmin()) {
1327 6 : dht::ThreadPool::io().run(
1328 4 : [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1329 2 : if (auto sthis = w.lock()) {
1330 2 : auto members = sthis->pimpl_->repository_->members();
1331 2 : auto type = sthis->pimpl_->bannedType(contactUri);
1332 2 : if (type.empty()) {
1333 0 : cb(false, {});
1334 0 : return;
1335 : }
1336 2 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1337 4 : }
1338 : });
1339 : } else {
1340 1 : JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1341 1 : cb(false, "");
1342 : }
1343 3 : return;
1344 : }
1345 :
1346 133 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1347 133 : if (auto sthis = w.lock()) {
1348 : // Add member files and commit
1349 133 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1350 133 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1351 133 : sthis->pimpl_->announce(commit, true);
1352 133 : lk.unlock();
1353 133 : if (cb)
1354 133 : cb(!commit.empty(), commit);
1355 266 : }
1356 133 : });
1357 : }
1358 :
1359 : std::shared_ptr<dhtnet::ChannelSocket>
1360 5113 : Conversation::gitSocket(const DeviceId& deviceId) const
1361 : {
1362 5113 : return pimpl_->gitSocket(deviceId);
1363 : }
1364 :
1365 : void
1366 2349 : Conversation::addGitSocket(const DeviceId& deviceId,
1367 : const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1368 : {
1369 2349 : pimpl_->addGitSocket(deviceId, socket);
1370 2349 : }
1371 :
1372 : void
1373 823 : Conversation::removeGitSocket(const DeviceId& deviceId)
1374 : {
1375 823 : pimpl_->removeGitSocket(deviceId);
1376 823 : }
1377 :
1378 : void
1379 394 : Conversation::shutdownConnections()
1380 : {
1381 394 : pimpl_->fallbackTimer_->cancel();
1382 394 : pimpl_->gitSocketList_.clear();
1383 394 : if (pimpl_->swarmManager_)
1384 394 : pimpl_->swarmManager_->shutdown();
1385 394 : std::lock_guard(pimpl_->membersMtx_);
1386 394 : pimpl_->checkedMembers_.clear();
1387 394 : }
1388 :
1389 : void
1390 0 : Conversation::connectivityChanged()
1391 : {
1392 0 : if (pimpl_->swarmManager_)
1393 0 : pimpl_->swarmManager_->maintainBuckets();
1394 0 : }
1395 :
1396 : std::vector<jami::DeviceId>
1397 0 : Conversation::getDeviceIdList() const
1398 : {
1399 0 : return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1400 : }
1401 :
1402 : std::shared_ptr<Typers>
1403 9 : Conversation::typers() const
1404 : {
1405 9 : return pimpl_->typers_;
1406 : }
1407 :
1408 : bool
1409 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1410 : {
1411 0 : if (!pimpl_->swarmManager_)
1412 0 : return false;
1413 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1414 : }
1415 :
1416 : void
1417 2 : Conversation::Impl::voteUnban(const std::string& contactUri,
1418 : const std::string_view type,
1419 : const OnDoneCb& cb)
1420 : {
1421 : // Check if admin
1422 2 : if (!isAdmin()) {
1423 0 : JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1424 0 : cb(false, {});
1425 0 : return;
1426 : }
1427 :
1428 : // Vote for removal
1429 2 : std::unique_lock lk(writeMtx_);
1430 2 : auto voteCommit = repository_->voteUnban(contactUri, type);
1431 2 : if (voteCommit.empty()) {
1432 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1433 0 : cb(false, "");
1434 0 : return;
1435 : }
1436 :
1437 2 : auto lastId = voteCommit;
1438 2 : std::vector<std::string> commits;
1439 2 : commits.emplace_back(voteCommit);
1440 :
1441 : // If admin, check vote
1442 4 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1443 2 : if (!resolveCommit.empty()) {
1444 2 : commits.emplace_back(resolveCommit);
1445 2 : lastId = resolveCommit;
1446 2 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1447 : }
1448 2 : announce(commits, true);
1449 2 : lk.unlock();
1450 2 : if (cb)
1451 2 : cb(!lastId.empty(), lastId);
1452 2 : }
1453 :
1454 : void
1455 15 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1456 : {
1457 45 : dht::ThreadPool::io().run([w = weak(),
1458 15 : contactUri = std::move(contactUri),
1459 15 : isDevice = std::move(isDevice),
1460 15 : cb = std::move(cb)] {
1461 15 : if (auto sthis = w.lock()) {
1462 : // Check if admin
1463 15 : if (!sthis->pimpl_->isAdmin()) {
1464 1 : JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1465 1 : cb(false, {});
1466 2 : return;
1467 : }
1468 :
1469 : // Get current user type
1470 14 : std::string type;
1471 14 : if (isDevice) {
1472 0 : type = "devices";
1473 : } else {
1474 14 : auto members = sthis->pimpl_->repository_->members();
1475 30 : for (const auto& member : members) {
1476 30 : if (member.uri == contactUri) {
1477 14 : if (member.role == MemberRole::INVITED) {
1478 3 : type = "invited";
1479 11 : } else if (member.role == MemberRole::ADMIN) {
1480 1 : type = "admins";
1481 10 : } else if (member.role == MemberRole::MEMBER) {
1482 10 : type = "members";
1483 : }
1484 14 : break;
1485 : }
1486 : }
1487 14 : if (type.empty()) {
1488 0 : cb(false, {});
1489 0 : return;
1490 : }
1491 14 : }
1492 :
1493 : // Vote for removal
1494 14 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1495 14 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1496 14 : if (voteCommit.empty()) {
1497 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1498 1 : cb(false, "");
1499 1 : return;
1500 : }
1501 :
1502 13 : auto lastId = voteCommit;
1503 13 : std::vector<std::string> commits;
1504 13 : commits.emplace_back(voteCommit);
1505 :
1506 : // If admin, check vote
1507 26 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1508 13 : if (!resolveCommit.empty()) {
1509 13 : commits.emplace_back(resolveCommit);
1510 13 : lastId = resolveCommit;
1511 13 : JAMI_WARN("Vote solved for %s. %s banned",
1512 : contactUri.c_str(),
1513 : isDevice ? "Device" : "Member");
1514 13 : sthis->pimpl_->disconnectFromPeer(contactUri);
1515 : }
1516 :
1517 13 : sthis->pimpl_->announce(commits, true);
1518 13 : lk.unlock();
1519 13 : cb(!lastId.empty(), lastId);
1520 31 : }
1521 : });
1522 15 : }
1523 :
1524 : std::vector<std::map<std::string, std::string>>
1525 431 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1526 : {
1527 431 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1528 : }
1529 :
1530 : std::set<std::string>
1531 2192 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1532 : {
1533 2192 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1534 : }
1535 :
1536 : std::vector<NodeId>
1537 1915 : Conversation::peersToSyncWith() const
1538 : {
1539 1915 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1540 1915 : const auto& nodes = routingTable.getNodes();
1541 1915 : const auto& mobiles = routingTable.getMobileNodes();
1542 1915 : std::vector<NodeId> s;
1543 1915 : s.reserve(nodes.size() + mobiles.size());
1544 1915 : s.insert(s.end(), nodes.begin(), nodes.end());
1545 1915 : s.insert(s.end(), mobiles.begin(), mobiles.end());
1546 13952 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1547 12037 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1548 224 : s.emplace_back(deviceId);
1549 3830 : return s;
1550 1915 : }
1551 :
1552 : bool
1553 1914 : Conversation::isBootstraped() const
1554 : {
1555 1914 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1556 1915 : return !routingTable.getNodes().empty();
1557 : }
1558 :
1559 : std::string
1560 17673 : Conversation::uriFromDevice(const std::string& deviceId) const
1561 : {
1562 17673 : return pimpl_->repository_->uriFromDevice(deviceId);
1563 : }
1564 :
1565 : void
1566 0 : Conversation::monitor()
1567 : {
1568 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1569 0 : }
1570 :
1571 : std::string
1572 186 : Conversation::join()
1573 : {
1574 186 : return pimpl_->repository_->join();
1575 : }
1576 :
1577 : bool
1578 6538 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1579 : {
1580 6538 : auto repoPath = pimpl_->repoPath();
1581 6538 : auto invitedPath = repoPath / "invited";
1582 6538 : auto adminsPath = repoPath / "admins";
1583 6538 : auto membersPath = repoPath / "members";
1584 26151 : std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1585 6538 : if (includeInvited)
1586 5867 : pathsToCheck.emplace_back(invitedPath);
1587 16938 : for (const auto& path : pathsToCheck) {
1588 80521 : for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1589 70114 : std::string_view crtUri = certificate;
1590 70117 : auto crtIt = crtUri.find(".crt");
1591 70099 : if (path != invitedPath && crtIt == std::string_view::npos) {
1592 0 : JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1593 0 : continue;
1594 0 : }
1595 70143 : if (crtIt != std::string_view::npos)
1596 66372 : crtUri = crtUri.substr(0, crtIt);
1597 70131 : if (crtUri == uri)
1598 5778 : return true;
1599 16178 : }
1600 : }
1601 :
1602 760 : if (includeInvited && mode() == ConversationMode::ONE_TO_ONE) {
1603 3 : for (const auto& member : getInitialMembers()) {
1604 2 : if (member == uri)
1605 0 : return true;
1606 1 : }
1607 : }
1608 :
1609 760 : return false;
1610 6538 : }
1611 :
1612 : bool
1613 7588 : Conversation::isBanned(const std::string& uri) const
1614 : {
1615 7588 : return !pimpl_->bannedType(uri).empty();
1616 : }
1617 :
1618 : void
1619 0 : Conversation::sendMessage(std::string&& message,
1620 : const std::string& type,
1621 : const std::string& replyTo,
1622 : OnCommitCb&& onCommit,
1623 : OnDoneCb&& cb)
1624 : {
1625 0 : Json::Value json;
1626 0 : json["body"] = std::move(message);
1627 0 : json["type"] = type;
1628 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1629 0 : }
1630 :
1631 : void
1632 139 : Conversation::sendMessage(Json::Value&& value,
1633 : const std::string& replyTo,
1634 : OnCommitCb&& onCommit,
1635 : OnDoneCb&& cb)
1636 : {
1637 139 : if (!replyTo.empty()) {
1638 2 : auto commit = pimpl_->repository_->getCommit(replyTo);
1639 2 : if (commit == std::nullopt) {
1640 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1641 1 : return;
1642 : }
1643 1 : value["reply-to"] = replyTo;
1644 2 : }
1645 552 : dht::ThreadPool::io().run(
1646 414 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1647 138 : if (auto sthis = w.lock()) {
1648 138 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1649 138 : auto commit = sthis->pimpl_->repository_->commitMessage(
1650 138 : Json::writeString(jsonBuilder, value));
1651 138 : lk.unlock();
1652 138 : if (onCommit)
1653 11 : onCommit(commit);
1654 138 : sthis->pimpl_->announce(commit, true);
1655 138 : if (cb)
1656 138 : cb(!commit.empty(), commit);
1657 276 : }
1658 138 : });
1659 : }
1660 :
1661 : void
1662 1 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1663 : {
1664 1 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1665 1 : if (auto sthis = w.lock()) {
1666 1 : std::vector<std::string> commits;
1667 1 : commits.reserve(messages.size());
1668 1 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1669 3 : for (const auto& message : messages) {
1670 2 : auto commit = sthis->pimpl_->repository_->commitMessage(
1671 2 : Json::writeString(jsonBuilder, message));
1672 2 : commits.emplace_back(std::move(commit));
1673 2 : }
1674 1 : lk.unlock();
1675 1 : sthis->pimpl_->announce(commits, true);
1676 1 : if (cb)
1677 1 : cb(commits);
1678 2 : }
1679 1 : });
1680 1 : }
1681 :
1682 : std::optional<std::map<std::string, std::string>>
1683 14665 : Conversation::getCommit(const std::string& commitId) const
1684 : {
1685 14665 : auto commit = pimpl_->repository_->getCommit(commitId);
1686 14666 : if (commit == std::nullopt)
1687 2446 : return std::nullopt;
1688 12220 : return pimpl_->repository_->convCommitToMap(*commit);
1689 14665 : }
1690 :
1691 : void
1692 7 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1693 : {
1694 7 : if (!cb)
1695 0 : return;
1696 7 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1697 7 : if (auto sthis = w.lock()) {
1698 7 : cb(sthis->pimpl_->loadMessages(options));
1699 7 : }
1700 7 : });
1701 : }
1702 :
1703 : void
1704 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1705 : {
1706 2 : if (!cb)
1707 0 : return;
1708 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1709 2 : if (auto sthis = w.lock()) {
1710 2 : cb(sthis->pimpl_->loadMessages2(options));
1711 2 : }
1712 2 : });
1713 : }
1714 :
1715 : void
1716 0 : Conversation::clearCache()
1717 : {
1718 0 : pimpl_->loadedHistory_.messageList.clear();
1719 0 : pimpl_->loadedHistory_.quickAccess.clear();
1720 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1721 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1722 0 : }
1723 :
1724 : std::string
1725 2861 : Conversation::lastCommitId() const
1726 : {
1727 2861 : LogOptions options;
1728 2861 : options.nbOfCommits = 1;
1729 2861 : options.skipMerge = true;
1730 2861 : History optHistory;
1731 : {
1732 2861 : std::lock_guard lk(pimpl_->historyMtx_);
1733 2861 : if (!pimpl_->loadedHistory_.messageList.empty())
1734 4824 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1735 2861 : }
1736 :
1737 449 : std::lock_guard lk(pimpl_->writeMtx_);
1738 449 : auto res = pimpl_->loadMessages2(options, &optHistory);
1739 449 : if (res.empty())
1740 5 : return {};
1741 888 : return (*optHistory.messageList.begin())->id;
1742 2860 : }
1743 :
1744 : std::vector<std::map<std::string, std::string>>
1745 2025 : Conversation::Impl::mergeHistory(const std::string& uri)
1746 : {
1747 2025 : if (not repository_) {
1748 0 : JAMI_WARN("Invalid repo. Abort merge");
1749 0 : return {};
1750 : }
1751 4050 : auto remoteHead = repository_->remoteHead(uri);
1752 2025 : if (remoteHead.empty()) {
1753 0 : JAMI_WARN("Unable to get HEAD of %s", uri.c_str());
1754 0 : return {};
1755 : }
1756 :
1757 : // Validate commit
1758 2025 : auto [newCommits, err] = repository_->validFetch(uri);
1759 2025 : if (newCommits.empty()) {
1760 1124 : if (err)
1761 17 : JAMI_ERR("Unable to validate history with %s", uri.c_str());
1762 1124 : repository_->removeBranchWith(uri);
1763 1124 : return {};
1764 : }
1765 :
1766 : // If validated, merge
1767 901 : auto [ok, cid] = repository_->merge(remoteHead);
1768 901 : if (!ok) {
1769 0 : JAMI_ERR("Unable to merge history with %s", uri.c_str());
1770 0 : repository_->removeBranchWith(uri);
1771 0 : return {};
1772 : }
1773 901 : if (!cid.empty()) {
1774 : // A merge commit was generated, should be added in new commits
1775 16 : auto commit = repository_->getCommit(cid);
1776 16 : if (commit != std::nullopt)
1777 16 : newCommits.emplace_back(*commit);
1778 16 : }
1779 :
1780 2703 : JAMI_DEBUG("Successfully merge history with {:s}", uri);
1781 901 : auto result = repository_->convCommitsToMap(newCommits);
1782 1832 : for (auto& commit : result) {
1783 931 : auto it = commit.find("type");
1784 931 : if (it != commit.end() && it->second == "member") {
1785 771 : repository_->refreshMembers();
1786 :
1787 771 : if (commit["action"] == "ban")
1788 5 : disconnectFromPeer(commit["uri"]);
1789 : }
1790 : }
1791 901 : return result;
1792 2025 : }
1793 :
1794 : bool
1795 2161 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1796 : {
1797 2161 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1798 2163 : auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1799 2163 : auto& pullcbs = it->second;
1800 2163 : auto itPull = std::find_if(pullcbs.begin(),
1801 0 : pullcbs.end(),
1802 4337 : [&](const auto& elem) { return std::get<0>(elem) == commitId; });
1803 2163 : if (itPull != pullcbs.end()) {
1804 30 : JAMI_DEBUG("Ignoring request to pull from {:s} with commit {:s}: pull already in progress", deviceId, commitId);
1805 10 : cb(false);
1806 10 : return false;
1807 : }
1808 6458 : JAMI_DEBUG("Pulling from {:s} with commit {:s}", deviceId, commitId);
1809 2153 : pullcbs.emplace_back(std::move(commitId), std::move(cb));
1810 2153 : if (notInProgress)
1811 2116 : dht::ThreadPool::io().run([w = weak(), deviceId] {
1812 2116 : if (auto sthis_ = w.lock())
1813 2116 : sthis_->pimpl_->pull(deviceId);
1814 2116 : });
1815 2153 : return true;
1816 2163 : }
1817 :
1818 : void
1819 2116 : Conversation::Impl::pull(const std::string& deviceId)
1820 : {
1821 2116 : auto& repo = repository_;
1822 :
1823 2116 : std::string commitId;
1824 2116 : OnPullCb cb;
1825 : while (true) {
1826 : {
1827 4269 : std::lock_guard lk(pullcbsMtx_);
1828 4269 : auto it = fetchingRemotes_.find(deviceId);
1829 4269 : if (it == fetchingRemotes_.end()) {
1830 0 : JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1831 0 : break;
1832 : }
1833 4269 : auto& pullcbs = it->second;
1834 4269 : if (pullcbs.empty()) {
1835 2116 : fetchingRemotes_.erase(it);
1836 2116 : break;
1837 : }
1838 2153 : auto& elem = pullcbs.front();
1839 2153 : commitId = std::move(std::get<0>(elem));
1840 2153 : cb = std::move(std::get<1>(elem));
1841 2153 : pullcbs.pop_front();
1842 4269 : }
1843 : // If recently fetched, the commit can already be there, so no need to do complex operations
1844 2153 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1845 123 : cb(true);
1846 128 : continue;
1847 : }
1848 : // Pull from remote
1849 2030 : auto fetched = repo->fetch(deviceId);
1850 2030 : if (!fetched) {
1851 5 : cb(false);
1852 5 : continue;
1853 : }
1854 2025 : auto oldHead = repo->getHead();
1855 2025 : std::string newHead = oldHead;
1856 2025 : std::unique_lock lk(writeMtx_);
1857 2025 : auto commits = mergeHistory(deviceId);
1858 2025 : if (!commits.empty()) {
1859 901 : newHead = commits.rbegin()->at("id");
1860 : // Note: Because clients needs to linearize the history, they need to know all commits
1861 : // that can be updated.
1862 : // In this case, all commits until the common merge base should be announced.
1863 : // The client ill need to update it's model after this.
1864 901 : std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1865 901 : auto newHeadCommit = repo->getCommit(newHead);
1866 901 : if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1867 18 : mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1868 18 : LogOptions options;
1869 18 : options.to = mergeBase;
1870 18 : auto updatedCommits = loadMessages(options);
1871 : // We announce commits from oldest to update to newest. This generally avoid
1872 : // to get detached commits until they are all announced.
1873 18 : std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1874 18 : announce(updatedCommits);
1875 18 : } else {
1876 883 : announce(commits);
1877 : }
1878 901 : }
1879 2025 : lk.unlock();
1880 :
1881 2025 : bool commitFound = false;
1882 2025 : if (commitId != "") {
1883 : // If `commitId` is non-empty, then we were trying to pull a specific commit.
1884 : // We need to check if we actually got it; the fact that the fetch above was
1885 : // successful doesn't guarantee that we did.
1886 1959 : for (const auto& commit : commits) {
1887 905 : if (commit.at("id") == commitId) {
1888 892 : commitFound = true;
1889 892 : break;
1890 : }
1891 : }
1892 : } else {
1893 79 : commitFound = true;
1894 : }
1895 2025 : if (!commitFound)
1896 3162 : JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1897 : deviceId, commitId);
1898 : // WARNING: If its argument is `true`, this callback will try to send a message notification
1899 : // for commit `commitId` to other members of the swarm. It's important that we only
1900 : // send these notifications if we actually have the commit. Otherwise, we can end up
1901 : // in a situation where the members of the swarm keep sending notifications to each
1902 : // other for a commit that none of them have (note that we cannot rule this out, as
1903 : // nothing prevents a malicious user from intentionally sending a notification with
1904 : // a fake commit ID).
1905 2025 : if (cb)
1906 2025 : cb(commitFound);
1907 : // Announce if profile changed
1908 2025 : if (oldHead != newHead) {
1909 901 : auto diffStats = repo->diffStats(newHead, oldHead);
1910 901 : auto changedFiles = repo->changedFiles(diffStats);
1911 901 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1912 1802 : != changedFiles.end()) {
1913 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
1914 10 : accountId_, repo->id(), repo->infos());
1915 : }
1916 901 : }
1917 4178 : }
1918 2116 : }
1919 :
1920 : void
1921 2161 : Conversation::sync(const std::string& member,
1922 : const std::string& deviceId,
1923 : OnPullCb&& cb,
1924 : std::string commitId)
1925 : {
1926 2161 : pull(deviceId, std::move(cb), commitId);
1927 2163 : dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1928 2163 : auto sthis = w.lock();
1929 : // For waiting request, downloadFile
1930 2163 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1931 0 : auto path = fileutils::get_data_dir() / sthis->pimpl_->accountId_
1932 0 : / "conversation_data" / sthis->id() / wr.fileId;
1933 0 : auto start = fileutils::size(path);
1934 0 : if (start < 0)
1935 0 : start = 0;
1936 0 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId, start);
1937 2163 : }
1938 2163 : });
1939 2163 : }
1940 :
1941 : std::map<std::string, std::string>
1942 2916 : Conversation::generateInvitation() const
1943 : {
1944 : // Invite the new member to the conversation
1945 2916 : Json::Value root;
1946 2916 : auto& metadata = root[ConversationMapKeys::METADATAS];
1947 5840 : for (const auto& [k, v] : infos()) {
1948 2924 : if (v.size() >= 64000) {
1949 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1950 0 : continue;
1951 0 : }
1952 2924 : metadata[k] = v;
1953 2916 : }
1954 2916 : root[ConversationMapKeys::CONVERSATIONID] = id();
1955 8748 : return {{"application/invite+json", Json::writeString(jsonBuilder, root)}};
1956 2916 : }
1957 :
1958 : std::string
1959 6 : Conversation::leave()
1960 : {
1961 6 : setRemovingFlag();
1962 6 : std::lock_guard lk(pimpl_->writeMtx_);
1963 12 : return pimpl_->repository_->leave();
1964 6 : }
1965 :
1966 : void
1967 11 : Conversation::setRemovingFlag()
1968 : {
1969 11 : pimpl_->isRemoving_ = true;
1970 11 : }
1971 :
1972 : bool
1973 4411 : Conversation::isRemoving()
1974 : {
1975 4411 : return pimpl_->isRemoving_;
1976 : }
1977 :
1978 : void
1979 22 : Conversation::erase()
1980 : {
1981 22 : if (pimpl_->conversationDataPath_ != "")
1982 22 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1983 22 : if (!pimpl_->repository_)
1984 0 : return;
1985 22 : std::lock_guard lk(pimpl_->writeMtx_);
1986 22 : pimpl_->repository_->erase();
1987 22 : }
1988 :
1989 : ConversationMode
1990 5140 : Conversation::mode() const
1991 : {
1992 5140 : return pimpl_->repository_->mode();
1993 : }
1994 :
1995 : std::vector<std::string>
1996 32 : Conversation::getInitialMembers() const
1997 : {
1998 32 : return pimpl_->repository_->getInitialMembers();
1999 : }
2000 :
2001 : bool
2002 0 : Conversation::isInitialMember(const std::string& uri) const
2003 : {
2004 0 : auto members = getInitialMembers();
2005 0 : return std::find(members.begin(), members.end(), uri) != members.end();
2006 0 : }
2007 :
2008 : void
2009 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2010 : {
2011 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2012 9 : if (auto sthis = w.lock()) {
2013 9 : auto& repo = sthis->pimpl_->repository_;
2014 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
2015 9 : auto commit = repo->updateInfos(map);
2016 9 : sthis->pimpl_->announce(commit, true);
2017 9 : lk.unlock();
2018 9 : if (cb)
2019 9 : cb(!commit.empty(), commit);
2020 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2021 18 : sthis->pimpl_->accountId_, repo->id(), repo->infos());
2022 18 : }
2023 9 : });
2024 9 : }
2025 :
2026 : std::map<std::string, std::string>
2027 2963 : Conversation::infos() const
2028 : {
2029 2963 : return pimpl_->repository_->infos();
2030 : }
2031 :
2032 : void
2033 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2034 : {
2035 8 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2036 8 : auto prefs = map;
2037 8 : auto itLast = prefs.find(LAST_MODIFIED);
2038 8 : if (itLast != prefs.end()) {
2039 3 : if (std::filesystem::is_regular_file(filePath)) {
2040 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
2041 : try {
2042 1 : if (lastModified >= std::stoul(itLast->second))
2043 0 : return;
2044 0 : } catch (...) {
2045 0 : return;
2046 0 : }
2047 : }
2048 3 : prefs.erase(itLast);
2049 : }
2050 :
2051 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2052 8 : msgpack::pack(file, prefs);
2053 16 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_,
2054 16 : id(),
2055 8 : std::move(prefs));
2056 8 : }
2057 :
2058 : std::map<std::string, std::string>
2059 107 : Conversation::preferences(bool includeLastModified) const
2060 : {
2061 : try {
2062 107 : std::map<std::string, std::string> preferences;
2063 107 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
2064 203 : auto file = fileutils::loadFile(filePath);
2065 11 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2066 11 : oh.get().convert(preferences);
2067 11 : if (includeLastModified)
2068 8 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
2069 11 : return preferences;
2070 299 : } catch (const std::exception& e) {
2071 96 : }
2072 96 : return {};
2073 : }
2074 :
2075 : std::vector<uint8_t>
2076 0 : Conversation::vCard() const
2077 : {
2078 : try {
2079 0 : return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
2080 0 : } catch (...) {
2081 0 : }
2082 0 : return {};
2083 : }
2084 :
2085 : std::shared_ptr<TransferManager>
2086 2262 : Conversation::dataTransfer() const
2087 : {
2088 2262 : return pimpl_->transferManager_;
2089 : }
2090 :
2091 : bool
2092 13 : Conversation::onFileChannelRequest(const std::string& member,
2093 : const std::string& fileId,
2094 : bool verifyShaSum) const
2095 : {
2096 13 : if (!isMember(member))
2097 0 : return false;
2098 :
2099 13 : auto sep = fileId.find('_');
2100 13 : if (sep == std::string::npos)
2101 0 : return false;
2102 :
2103 13 : auto interactionId = fileId.substr(0, sep);
2104 13 : auto commit = getCommit(interactionId);
2105 26 : if (commit == std::nullopt || commit->find("type") == commit->end()
2106 26 : || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
2107 26 : || commit->at("type") != "application/data-transfer+json") {
2108 0 : JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}", pimpl_->accountId_, member, interactionId);
2109 0 : return false;
2110 : }
2111 :
2112 13 : auto path = dataTransfer()->path(fileId);
2113 :
2114 13 : if (!std::filesystem::is_regular_file(path)) {
2115 : // Check if dangling symlink
2116 1 : if (std::filesystem::is_symlink(path)) {
2117 1 : dhtnet::fileutils::remove(path, true);
2118 : }
2119 3 : JAMI_WARNING("[Account {:s}] {:s} asked for non existing file {} in {:s}",
2120 : pimpl_->accountId_,
2121 : member,
2122 : fileId,
2123 : id());
2124 1 : return false;
2125 : }
2126 : // Check that our file is correct before sending
2127 12 : if (verifyShaSum && commit->at("sha3sum") != fileutils::sha3File(path)) {
2128 3 : JAMI_WARNING(
2129 : "[Account {:s}] {:s} asked for file {:s} in {:s}, but our version is not complete or corrupted",
2130 : pimpl_->accountId_,
2131 : member,
2132 : fileId,
2133 : id());
2134 1 : return false;
2135 : }
2136 11 : return true;
2137 13 : }
2138 :
2139 : bool
2140 11 : Conversation::downloadFile(const std::string& interactionId,
2141 : const std::string& fileId,
2142 : const std::string& path,
2143 : const std::string&,
2144 : const std::string& deviceId,
2145 : std::size_t start,
2146 : std::size_t end)
2147 : {
2148 11 : auto commit = getCommit(interactionId);
2149 11 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2150 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2151 0 : return false;
2152 : }
2153 11 : auto tid = commit->find("tid");
2154 11 : auto sha3sum = commit->find("sha3sum");
2155 11 : auto size_str = commit->find("totalSize");
2156 :
2157 11 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2158 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2159 0 : return false;
2160 : }
2161 :
2162 11 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2163 11 : if (totalSize < 0) {
2164 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2165 0 : return false;
2166 : }
2167 :
2168 : // Be sure to not lock conversation
2169 22 : dht::ThreadPool().io().run([w = weak(),
2170 : deviceId,
2171 : fileId,
2172 : interactionId,
2173 11 : sha3sum = sha3sum->second,
2174 : path,
2175 : totalSize,
2176 : start,
2177 22 : end] {
2178 11 : if (auto shared = w.lock()) {
2179 11 : auto acc = shared->pimpl_->account_.lock();
2180 11 : if (!acc)
2181 0 : return;
2182 11 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2183 11 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2184 22 : }
2185 : });
2186 11 : return true;
2187 11 : }
2188 :
2189 : void
2190 1109 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2191 : {
2192 1109 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2193 1109 : auto sthis = w.lock();
2194 1109 : if (!sthis)
2195 0 : return;
2196 : // Update fetched for Uri
2197 1109 : auto uri = sthis->uriFromDevice(deviceId);
2198 1109 : if (uri.empty() || uri == sthis->pimpl_->userId_)
2199 45 : return;
2200 : // When a user fetches a commit, the message is sent for this person
2201 1064 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2202 1154 : });
2203 1109 : }
2204 :
2205 :
2206 : void
2207 1106 : Conversation::Impl::updateStatus(const std::string& uri,
2208 : libjami::Account::MessageStates st,
2209 : const std::string& commitId,
2210 : const std::string& ts,
2211 : bool emit)
2212 : {
2213 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2214 1106 : LogOptions options;
2215 1106 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2216 : {
2217 : // Update internal structures.
2218 1106 : std::lock_guard lk(messageStatusMtx_);
2219 1106 : auto& status = messagesStatus_[uri];
2220 1106 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2221 1106 : if (oldStatus == commitId)
2222 6 : return; // Nothing to do
2223 1100 : options.to = oldStatus;
2224 1100 : options.from = commitId;
2225 1100 : oldStatus = commitId;
2226 1100 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2227 1100 : saveStatus();
2228 1100 : if (emit)
2229 1074 : newStatus[uri].insert(status.begin(), status.end());
2230 1106 : }
2231 1100 : if (emit && messageStatusCb_) {
2232 1074 : messageStatusCb_(newStatus);
2233 : }
2234 : // Update messages status for all commit between the old and new one
2235 1100 : options.logIfNotFound = false;
2236 1100 : options.fastLog = true;
2237 1100 : History optHistory;
2238 1100 : std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status.
2239 1100 : auto res = loadMessages2(options, &optHistory);
2240 1100 : if (res.size() == 0) {
2241 : // In this case, commit is not received yet, so we cache it
2242 9 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2243 : }
2244 10899 : for (const auto& [cid, _]: optHistory.quickAccess) {
2245 9799 : auto message = loadedHistory_.quickAccess.find(cid);
2246 9799 : if (message != loadedHistory_.quickAccess.end()) {
2247 : // Update message and emit to client,
2248 2124 : if(static_cast<int32_t>(st) > message->second->status[uri]){
2249 2120 : message->second->status[uri] = static_cast<int32_t>(st);
2250 4240 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
2251 2120 : accountId_,
2252 2120 : repository_->id(),
2253 : uri,
2254 : cid,
2255 : static_cast<int>(st));
2256 : }
2257 : } else {
2258 : // In this case, commit is not loaded by client, so we cache it
2259 : // No need to emit to client, they will get a correct status on load.
2260 7675 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2261 : }
2262 : }
2263 1112 : }
2264 :
2265 : bool
2266 18 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2267 : {
2268 18 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2269 18 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2270 2 : return false; // Nothing to do
2271 16 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2272 16 : auto sthis = w.lock();
2273 16 : if (!sthis)
2274 0 : return;
2275 16 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2276 16 : });
2277 16 : return true;
2278 18 : }
2279 :
2280 : std::map<std::string, std::map<std::string, std::string>>
2281 89 : Conversation::messageStatus() const
2282 : {
2283 89 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2284 178 : return pimpl_->messagesStatus_;
2285 89 : }
2286 :
2287 : void
2288 60 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2289 : {
2290 60 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2291 60 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2292 132 : for (const auto& [uri, status] : messageStatus) {
2293 72 : auto& oldMs = pimpl_->messagesStatus_[uri];
2294 72 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2295 23 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2296 23 : stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2297 : }
2298 : }
2299 72 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2300 3 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2301 3 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2302 : }
2303 : }
2304 : }
2305 60 : lk.unlock();
2306 :
2307 86 : for (const auto& [status, uri, commitId, ts] : stVec) {
2308 26 : pimpl_->updateStatus(uri, status, commitId, ts);
2309 : }
2310 60 : }
2311 :
2312 : void
2313 385 : Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2314 : {
2315 385 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2316 385 : pimpl_->messageStatusCb_ = cb;
2317 385 : }
2318 :
2319 : #ifdef LIBJAMI_TESTABLE
2320 : void
2321 512 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2322 : {
2323 512 : pimpl_->bootstrapCbTest_ = cb;
2324 512 : }
2325 : #endif
2326 :
2327 : void
2328 618 : Conversation::checkBootstrapMember(const asio::error_code& ec,
2329 : std::vector<std::map<std::string, std::string>> members)
2330 : {
2331 618 : if (ec == asio::error::operation_aborted)
2332 330 : return;
2333 585 : auto acc = pimpl_->account_.lock();
2334 585 : if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2335 17 : return;
2336 : // We bootstrap the DRT with devices who already wrote in the repository.
2337 : // However, in a conversation, a large number of devices may just watch
2338 : // the conversation, but never write any message.
2339 568 : std::unique_lock lock(pimpl_->membersMtx_);
2340 :
2341 568 : std::string uri;
2342 854 : while (!members.empty()) {
2343 292 : auto member = std::move(members.back());
2344 292 : members.pop_back();
2345 292 : uri = std::move(member.at("uri"));
2346 292 : if (uri != pimpl_->userId_
2347 292 : && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2348 6 : break;
2349 292 : }
2350 281 : auto fallbackFailed = [](auto sthis) {
2351 843 : JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback failed. Wait for remote connections.",
2352 : sthis->pimpl_->toString(),
2353 : fmt::ptr(sthis->pimpl_->swarmManager_.get()));
2354 : #ifdef LIBJAMI_TESTABLE
2355 281 : if (sthis->pimpl_->bootstrapCbTest_)
2356 8 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2357 : #endif
2358 281 : };
2359 : // If members is empty, we finished the fallback un-successfully
2360 568 : if (members.empty() && uri.empty()) {
2361 280 : lock.unlock();
2362 280 : fallbackFailed(this);
2363 280 : return;
2364 : }
2365 :
2366 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2367 288 : pimpl_->checkedMembers_.emplace(uri);
2368 288 : auto devices = std::make_shared<std::vector<NodeId>>();
2369 1152 : acc->forEachDevice(
2370 576 : dht::InfoHash(uri),
2371 287 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2372 : // Test if already sent
2373 287 : if (auto sthis = w.lock()) {
2374 286 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2375 280 : devices->emplace_back(dev->getLongId());
2376 287 : }
2377 287 : },
2378 288 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2379 288 : auto sthis = w.lock();
2380 288 : if (!sthis)
2381 1 : return;
2382 287 : auto checkNext = true;
2383 287 : if (ok && devices->size() != 0) {
2384 : #ifdef LIBJAMI_TESTABLE
2385 279 : if (sthis->pimpl_->bootstrapCbTest_)
2386 5 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2387 : #endif
2388 837 : JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback with member: {}",
2389 : sthis->pimpl_->toString(),
2390 : fmt::ptr(sthis->pimpl_->swarmManager_),
2391 : uri);
2392 279 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2393 1 : checkNext = false;
2394 : }
2395 287 : if (checkNext) {
2396 : // Check next member
2397 286 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2398 572 : sthis->pimpl_->fallbackTimer_->async_wait(
2399 572 : std::bind(&Conversation::checkBootstrapMember,
2400 : sthis,
2401 : std::placeholders::_1,
2402 286 : std::move(members)));
2403 : } else {
2404 : // In this case, all members are checked. Fallback failed
2405 1 : fallbackFailed(sthis);
2406 : }
2407 288 : });
2408 1145 : }
2409 :
2410 : void
2411 511 : Conversation::bootstrap(std::function<void()> onBootstraped,
2412 : const std::vector<DeviceId>& knownDevices)
2413 : {
2414 511 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2415 0 : return;
2416 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2417 : // If this doesn't work, it will try to fallback with checkBootstrapMember
2418 : // If it works, the callback onConnectionChanged will be called with ok=true
2419 511 : pimpl_->bootstrapCb_ = std::move(onBootstraped);
2420 511 : std::vector<DeviceId> devices = knownDevices;
2421 1529 : for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2422 1018 : if (!isBanned(member))
2423 1018 : devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2424 511 : }
2425 1533 : JAMI_DEBUG("{}[SwarmManager {}] Bootstrap with {} devices",
2426 : pimpl_->toString(),
2427 : fmt::ptr(pimpl_->swarmManager_),
2428 : devices.size());
2429 : // set callback
2430 332 : auto fallback = [](auto sthis, bool now = false) {
2431 : // Fallback
2432 332 : auto acc = sthis->pimpl_->account_.lock();
2433 332 : if (!acc)
2434 0 : return;
2435 332 : auto members = sthis->getMembers(false, false);
2436 332 : std::shuffle(members.begin(), members.end(), acc->rand);
2437 332 : if (now) {
2438 291 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2439 : } else {
2440 41 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2441 123 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2442 82 : - std::chrono::seconds(timeForBootstrap));
2443 123 : JAMI_DEBUG("{}[SwarmManager {}] Fallback in {} seconds",
2444 : sthis->pimpl_->toString(),
2445 : fmt::ptr(sthis->pimpl_->swarmManager_.get()),
2446 : (20 - timeForBootstrap));
2447 : }
2448 664 : sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2449 : sthis,
2450 : std::placeholders::_1,
2451 332 : std::move(members)));
2452 332 : };
2453 :
2454 511 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2455 : // This will call methods from accounts, so trigger on another thread.
2456 497 : dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2457 497 : auto sthis = w.lock();
2458 497 : if (!sthis)
2459 0 : return;
2460 497 : if (ok) {
2461 : // Bootstrap succeeded!
2462 : {
2463 456 : std::lock_guard lock(sthis->pimpl_->membersMtx_);
2464 456 : sthis->pimpl_->checkedMembers_.clear();
2465 456 : }
2466 456 : if (sthis->pimpl_->bootstrapCb_)
2467 456 : sthis->pimpl_->bootstrapCb_();
2468 : #ifdef LIBJAMI_TESTABLE
2469 456 : if (sthis->pimpl_->bootstrapCbTest_)
2470 10 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2471 : #endif
2472 456 : return;
2473 : }
2474 41 : fallback(sthis);
2475 497 : });
2476 497 : });
2477 : {
2478 511 : std::lock_guard lock(pimpl_->membersMtx_);
2479 511 : pimpl_->checkedMembers_.clear();
2480 511 : }
2481 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2482 511 : if (pimpl_->swarmManager_->isShutdown()) {
2483 20 : pimpl_->swarmManager_->restart();
2484 20 : pimpl_->swarmManager_->maintainBuckets();
2485 491 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2486 291 : fallback(this, true);
2487 : }
2488 511 : }
2489 :
2490 : std::vector<std::string>
2491 18 : Conversation::commitsEndedCalls()
2492 : {
2493 18 : pimpl_->loadActiveCalls();
2494 18 : pimpl_->loadHostedCalls();
2495 18 : auto commits = pimpl_->commitsEndedCalls();
2496 18 : if (!commits.empty()) {
2497 : // Announce to client
2498 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2499 0 : if (auto sthis = w.lock())
2500 0 : sthis->pimpl_->announce(commits, true);
2501 0 : });
2502 : }
2503 18 : return commits;
2504 0 : }
2505 :
2506 : void
2507 385 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2508 : {
2509 385 : pimpl_->onMembersChanged_ = std::move(cb);
2510 385 : pimpl_->repository_->onMembersChanged([w=weak()] (const std::set<std::string>& memberUris) {
2511 1075 : if (auto sthis = w.lock())
2512 1075 : sthis->pimpl_->onMembersChanged_(memberUris);
2513 1075 : });
2514 385 : }
2515 :
2516 : void
2517 385 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2518 : {
2519 770 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2520 : w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2521 849 : if (auto sthis = w.lock())
2522 849 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2523 1619 : };
2524 385 : }
2525 :
2526 : void
2527 992 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2528 : {
2529 992 : auto deviceId = channel->deviceId();
2530 : // Transmit avatar if necessary
2531 : // We do this here, because at this point we know both sides are connected and in
2532 : // the same conversation
2533 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2534 992 : auto cert = channel->peerCertificate();
2535 992 : if (!cert || !cert->issuer)
2536 0 : return;
2537 991 : auto member = cert->issuer->getId().toString();
2538 992 : pimpl_->swarmManager_->addChannel(std::move(channel));
2539 992 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2540 992 : auto sthis = w.lock();
2541 992 : if (auto account = a.lock()) {
2542 991 : account->sendProfile(sthis->id(), member, deviceId.toString());
2543 992 : }
2544 992 : });
2545 992 : }
2546 :
2547 : uint32_t
2548 4 : Conversation::countInteractions(const std::string& toId,
2549 : const std::string& fromId,
2550 : const std::string& authorUri) const
2551 : {
2552 4 : LogOptions options;
2553 4 : options.to = toId;
2554 4 : options.from = fromId;
2555 4 : options.authorUri = authorUri;
2556 4 : options.logIfNotFound = false;
2557 4 : options.fastLog = true;
2558 4 : History history;
2559 4 : auto res = pimpl_->loadMessages2(options, &history);
2560 8 : return res.size();
2561 4 : }
2562 :
2563 : void
2564 4 : Conversation::search(uint32_t req,
2565 : const Filter& filter,
2566 : const std::shared_ptr<std::atomic_int>& flag) const
2567 : {
2568 : // Because logging a conversation can take quite some time,
2569 : // do it asynchronously
2570 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2571 4 : if (auto sthis = w.lock()) {
2572 4 : History history;
2573 4 : std::vector<std::map<std::string, std::string>> commits {};
2574 : // std::regex_constants::ECMAScript is the default flag.
2575 4 : auto re = std::regex(filter.regexSearch,
2576 4 : filter.caseSensitive ? std::regex_constants::ECMAScript
2577 4 : : std::regex_constants::icase);
2578 4 : sthis->pimpl_->repository_->log(
2579 20 : [&](const auto& id, const auto& author, auto& commit) {
2580 100 : if (!filter.author.empty()
2581 20 : && filter.author != sthis->uriFromDevice(author.email)) {
2582 : // Filter author
2583 0 : return CallbackResult::Skip;
2584 : }
2585 20 : auto commitTime = git_commit_time(commit.get());
2586 20 : if (filter.before && filter.before < commitTime) {
2587 : // Only get commits before this date
2588 0 : return CallbackResult::Skip;
2589 : }
2590 20 : if (filter.after && filter.after > commitTime) {
2591 : // Only get commits before this date
2592 0 : if (git_commit_parentcount(commit.get()) <= 1)
2593 0 : return CallbackResult::Break;
2594 : else
2595 0 : return CallbackResult::Skip; // Because we are sorting it with
2596 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2597 : }
2598 :
2599 20 : return CallbackResult::Ok; // Continue
2600 : },
2601 20 : [&](auto&& cc) {
2602 40 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2603 40 : sthis->pimpl_->addToHistory({optMessage.value()}, false, false, &history);
2604 20 : },
2605 20 : [&](auto id, auto, auto) {
2606 20 : if (id == filter.lastId)
2607 0 : return true;
2608 20 : return false;
2609 : },
2610 : "",
2611 : false);
2612 : // Search on generated history
2613 24 : for (auto& message : history.messageList) {
2614 20 : auto contentType = message->type;
2615 20 : auto isSearchable = contentType == "text/plain"
2616 20 : || contentType == "application/data-transfer+json";
2617 20 : if (filter.type.empty() && !isSearchable) {
2618 : // Not searchable, at least for now
2619 8 : continue;
2620 12 : } else if (contentType == filter.type || filter.type.empty()) {
2621 12 : if (isSearchable) {
2622 : // If it's a text match the body, else the display name
2623 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2624 36 : : message->body.at("displayName");
2625 12 : std::smatch body_match;
2626 12 : if (std::regex_search(body, body_match, re)) {
2627 5 : auto commit = message->body;
2628 5 : commit["id"] = message->id;
2629 5 : commit["type"] = message->type;
2630 5 : commits.emplace_back(commit);
2631 5 : }
2632 12 : } else {
2633 : // Matching type, just add it to the results
2634 0 : commits.emplace_back(message->body);
2635 : }
2636 :
2637 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2638 0 : break;
2639 : }
2640 20 : }
2641 :
2642 4 : if (commits.size() > 0)
2643 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2644 3 : sthis->pimpl_->accountId_,
2645 6 : sthis->id(),
2646 3 : std::move(commits));
2647 : // If we're the latest thread, inform client that the search is finished
2648 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2649 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2650 : req,
2651 4 : sthis->pimpl_->accountId_,
2652 8 : std::string {},
2653 8 : std::vector<std::map<std::string, std::string>> {});
2654 : }
2655 8 : }
2656 4 : });
2657 4 : }
2658 :
2659 : void
2660 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2661 : {
2662 14 : if (!message.isMember("confId")) {
2663 0 : JAMI_ERR() << "Malformed commit";
2664 0 : return;
2665 : }
2666 :
2667 14 : auto now = std::chrono::system_clock::now();
2668 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2669 : {
2670 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2671 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2672 14 : pimpl_->saveHostedCalls();
2673 14 : }
2674 :
2675 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2676 : }
2677 :
2678 : bool
2679 20 : Conversation::isHosting(const std::string& confId) const
2680 : {
2681 20 : auto info = infos();
2682 20 : if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2683 0 : return true; // We are the current device Host
2684 20 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2685 20 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2686 20 : }
2687 :
2688 : void
2689 11 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2690 : {
2691 11 : if (!message.isMember("confId")) {
2692 0 : JAMI_ERR() << "Malformed commit";
2693 0 : return;
2694 : }
2695 :
2696 11 : auto erased = false;
2697 : {
2698 11 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2699 11 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2700 11 : }
2701 11 : if (erased) {
2702 11 : pimpl_->saveHostedCalls();
2703 11 : sendMessage(std::move(message), "", {}, std::move(cb));
2704 : } else
2705 0 : cb(false, "");
2706 : }
2707 :
2708 : std::vector<std::map<std::string, std::string>>
2709 39 : Conversation::currentCalls() const
2710 : {
2711 39 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2712 78 : return pimpl_->activeCalls_;
2713 39 : }
2714 : } // namespace jami
|