Line data Source code
1 : /*
2 : * Copyright (C) 2014-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 : * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 : *
7 : * This program is free software; you can redistribute it and/or modify
8 : * it under the terms of the GNU General Public License as published by
9 : * the Free Software Foundation; either version 3 of the License, or
10 : * (at your option) any later version.
11 : *
12 : * This program is distributed in the hope that it will be useful,
13 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : * GNU General Public License for more details.
16 : *
17 : * You should have received a copy of the GNU General Public License
18 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 : */
20 : #include "conversation.h"
21 :
22 : #include "account_const.h"
23 : #include "fileutils.h"
24 : #include "jamiaccount.h"
25 : #include "client/ring_signal.h"
26 :
27 : #include <charconv>
28 : #include <json/json.h>
29 : #include <string_view>
30 : #include <opendht/thread_pool.h>
31 : #include <tuple>
32 : #include <optional>
33 : #include "swarm/swarm_manager.h"
34 : #ifdef ENABLE_PLUGIN
35 : #include "manager.h"
36 : #include "plugin/jamipluginmanager.h"
37 : #include "plugin/streamdata.h"
38 : #endif
39 : #include "jami/conversation_interface.h"
40 :
41 : namespace jami {
42 :
43 : static const char* const LAST_MODIFIED = "lastModified";
44 44 : static const auto jsonBuilder = [] {
45 44 : Json::StreamWriterBuilder wbuilder;
46 44 : wbuilder["commentStyle"] = "None";
47 44 : wbuilder["indentation"] = "";
48 44 : return wbuilder;
49 0 : }();
50 :
51 13 : ConvInfo::ConvInfo(const Json::Value& json)
52 : {
53 13 : id = json[ConversationMapKeys::ID].asString();
54 13 : created = json[ConversationMapKeys::CREATED].asLargestUInt();
55 13 : removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
56 13 : erased = json[ConversationMapKeys::ERASED].asLargestUInt();
57 33 : for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
58 20 : members.emplace(v["uri"].asString());
59 : }
60 13 : lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
61 13 : }
62 :
63 : Json::Value
64 25 : ConvInfo::toJson() const
65 : {
66 25 : Json::Value json;
67 25 : json[ConversationMapKeys::ID] = id;
68 25 : json[ConversationMapKeys::CREATED] = Json::Int64(created);
69 25 : if (removed) {
70 1 : json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
71 : }
72 25 : if (erased) {
73 1 : json[ConversationMapKeys::ERASED] = Json::Int64(erased);
74 : }
75 64 : for (const auto& m : members) {
76 39 : Json::Value member;
77 39 : member["uri"] = m;
78 39 : json[ConversationMapKeys::MEMBERS].append(member);
79 39 : }
80 25 : json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
81 25 : return json;
82 0 : }
83 :
84 : // ConversationRequest
85 527 : ConversationRequest::ConversationRequest(const Json::Value& json)
86 : {
87 527 : received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
88 527 : declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
89 527 : from = json[ConversationMapKeys::FROM].asString();
90 527 : conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
91 527 : auto& md = json[ConversationMapKeys::METADATAS];
92 1057 : for (const auto& member : md.getMemberNames()) {
93 530 : metadatas.emplace(member, md[member].asString());
94 527 : }
95 527 : }
96 :
97 : Json::Value
98 2 : ConversationRequest::toJson() const
99 : {
100 2 : Json::Value json;
101 2 : json[ConversationMapKeys::CONVERSATIONID] = conversationId;
102 2 : json[ConversationMapKeys::FROM] = from;
103 2 : json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
104 2 : if (declined)
105 0 : json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
106 4 : for (const auto& [key, value] : metadatas) {
107 2 : json[ConversationMapKeys::METADATAS][key] = value;
108 : }
109 2 : return json;
110 0 : }
111 :
112 : std::map<std::string, std::string>
113 213 : ConversationRequest::toMap() const
114 : {
115 213 : auto result = metadatas;
116 213 : result[ConversationMapKeys::ID] = conversationId;
117 213 : result[ConversationMapKeys::FROM] = from;
118 213 : if (declined)
119 0 : result[ConversationMapKeys::DECLINED] = std::to_string(declined);
120 213 : result[ConversationMapKeys::RECEIVED] = std::to_string(received);
121 213 : return result;
122 0 : }
123 :
124 : using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
125 :
126 : struct History
127 : {
128 : MessageList messageList {};
129 : std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
130 : std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
131 : std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
132 : };
133 :
134 : class Conversation::Impl
135 : {
136 : public:
137 178 : Impl(const std::shared_ptr<JamiAccount>& account,
138 : ConversationMode mode,
139 : const std::string& otherMember = "")
140 178 : : repository_(ConversationRepository::createConversation(account, mode, otherMember))
141 356 : , account_(account)
142 : {
143 178 : if (!repository_) {
144 0 : throw std::logic_error("Couldn't create repository");
145 : }
146 178 : init();
147 178 : }
148 :
149 17 : Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
150 17 : : account_(account)
151 : {
152 17 : repository_ = std::make_unique<ConversationRepository>(account, conversationId);
153 17 : if (!repository_) {
154 0 : throw std::logic_error("Couldn't create repository");
155 : }
156 17 : init();
157 17 : }
158 :
159 194 : Impl(const std::shared_ptr<JamiAccount>& account,
160 : const std::string& remoteDevice,
161 : const std::string& conversationId)
162 194 : : account_(account)
163 : {
164 194 : std::vector<ConversationCommit> commits;
165 388 : repository_ = ConversationRepository::cloneConversation(account,
166 : remoteDevice,
167 : conversationId,
168 185 : [&](auto c) {
169 185 : commits = std::move(c);
170 193 : });
171 193 : if (!repository_) {
172 16 : emitSignal<libjami::ConversationSignal::OnConversationError>(
173 8 : account->getAccountID(), conversationId, EFETCH, "Couldn't clone repository");
174 8 : throw std::logic_error("Couldn't clone repository");
175 : }
176 : // To detect current active calls, we need to check history
177 370 : conversationDataPath_ = fileutils::get_data_dir() / account->getAccountID()
178 555 : / "conversation_data" / conversationId;
179 185 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
180 1178 : for (const auto& c : repository_->convCommitsToMap(commits))
181 1178 : updateActiveCalls(c);
182 185 : init();
183 473 : }
184 :
185 380 : void init()
186 : {
187 380 : if (auto shared = account_.lock()) {
188 380 : ioContext_ = Manager::instance().ioContext();
189 380 : fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
190 : swarmManager_
191 760 : = std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId()),
192 760 : Manager::instance().getSeededRandomEngine(),
193 380 : [account = account_](const DeviceId& deviceId) {
194 2573 : if (auto acc = account.lock()) {
195 2573 : return acc->isConnectedWith(deviceId);
196 2573 : }
197 0 : return false;
198 380 : });
199 380 : swarmManager_->setMobility(shared->isMobile());
200 380 : accountId_ = shared->getAccountID();
201 : transferManager_
202 760 : = std::make_shared<TransferManager>(shared->getAccountID(),
203 : "",
204 380 : repository_->id(),
205 760 : Manager::instance().getSeededRandomEngine());
206 760 : conversationDataPath_ = fileutils::get_data_dir() / shared->getAccountID()
207 1140 : / "conversation_data" / repository_->id();
208 380 : fetchedPath_ = conversationDataPath_ / "fetched";
209 380 : statusPath_ = conversationDataPath_ / "status";
210 380 : sendingPath_ = conversationDataPath_ / "sending";
211 380 : preferencesPath_ = conversationDataPath_ / ConversationMapKeys::PREFERENCES;
212 380 : activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
213 380 : hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
214 380 : loadActiveCalls();
215 380 : loadStatus();
216 380 : typers_ = std::make_shared<Typers>(shared, repository_->id());
217 380 : }
218 380 : }
219 :
220 1298 : const std::string& toString() const
221 : {
222 1298 : if (fmtStr_.empty()) {
223 367 : if (repository_->mode() == ConversationMode::ONE_TO_ONE) {
224 123 : if (auto acc = account_.lock()) {
225 123 : auto peer = acc->getUsername();
226 368 : for (const auto& member : repository_->getInitialMembers()) {
227 245 : if (member != acc->getUsername()) {
228 122 : peer = member;
229 : }
230 123 : }
231 123 : fmtStr_ = fmt::format("[Conversation (1:1) {}]", peer);
232 246 : }
233 : } else {
234 488 : fmtStr_ = fmt::format("[Conversation {}]", repository_->id());
235 : }
236 : }
237 1298 : return fmtStr_;
238 : }
239 : mutable std::string fmtStr_;
240 :
241 307 : ~Impl()
242 : {
243 : try {
244 307 : if (fallbackTimer_)
245 307 : fallbackTimer_->cancel();
246 0 : } catch (const std::exception& e) {
247 0 : JAMI_ERROR("[Conversation {:s}] {:s}", toString(), e.what());
248 0 : }
249 307 : }
250 :
251 : /**
252 : * If, for whatever reason, the daemon is stopped while hosting a conference,
253 : * we need to announce the end of this call when restarting.
254 : * To avoid to keep active calls forever.
255 : */
256 : std::vector<std::string> commitsEndedCalls();
257 : bool isAdmin() const;
258 : std::filesystem::path repoPath() const;
259 :
260 278 : void announce(const std::string& commitId, bool commitFromSelf = false) const
261 : {
262 278 : std::vector<std::string> vec;
263 278 : if (!commitId.empty())
264 276 : vec.emplace_back(commitId);
265 278 : announce(vec, commitFromSelf);
266 278 : }
267 :
268 293 : void announce(const std::vector<std::string>& commits, bool commitFromSelf = false) const
269 : {
270 293 : std::vector<ConversationCommit> convcommits;
271 293 : convcommits.reserve(commits.size());
272 599 : for (const auto& cid : commits) {
273 306 : auto commit = repository_->getCommit(cid);
274 306 : if (commit != std::nullopt) {
275 306 : convcommits.emplace_back(*commit);
276 : }
277 306 : }
278 293 : announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
279 293 : }
280 :
281 : /**
282 : * Update activeCalls_ via announced commits (in load or via new commits)
283 : * @param commit Commit to check
284 : * @param eraseOnly If we want to ignore added commits
285 : * @param emitSig If we want to emit to client
286 : * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
287 : */
288 1074 : void updateActiveCalls(const std::map<std::string, std::string>& commit,
289 : bool eraseOnly = false,
290 : bool emitSig = true) const
291 : {
292 1074 : if (!repository_)
293 0 : return;
294 1074 : if (commit.at("type") == "member") {
295 : // In this case, we need to check if we are not removing a hosting member or device
296 803 : std::lock_guard lk(activeCallsMtx_);
297 803 : auto it = activeCalls_.begin();
298 803 : auto updateActives = false;
299 804 : while (it != activeCalls_.end()) {
300 1 : if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
301 3 : JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
302 : it->at("id"),
303 : commit.at("uri"));
304 1 : it = activeCalls_.erase(it);
305 1 : updateActives = true;
306 : } else {
307 0 : ++it;
308 : }
309 : }
310 803 : if (updateActives) {
311 1 : saveActiveCalls();
312 1 : if (emitSig)
313 1 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
314 1 : repository_->id(),
315 1 : activeCalls_);
316 : }
317 803 : return;
318 803 : }
319 : // Else, it's a call information
320 601 : if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
321 601 : && commit.find("device") != commit.end()) {
322 59 : auto convId = repository_->id();
323 118 : auto confId = commit.at("confId");
324 118 : auto uri = commit.at("uri");
325 118 : auto device = commit.at("device");
326 59 : std::lock_guard lk(activeCallsMtx_);
327 59 : auto itActive = std::find_if(activeCalls_.begin(),
328 : activeCalls_.end(),
329 27 : [&](const auto& value) {
330 81 : return value.at("id") == confId
331 54 : && value.at("uri") == uri
332 135 : && value.at("device") == device;
333 : });
334 59 : if (commit.find("duration") == commit.end()) {
335 32 : if (itActive == activeCalls_.end() && !eraseOnly) {
336 96 : JAMI_DEBUG(
337 : "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
338 : convId,
339 : confId,
340 : device,
341 : uri);
342 32 : std::map<std::string, std::string> activeCall;
343 32 : activeCall["id"] = confId;
344 32 : activeCall["uri"] = uri;
345 32 : activeCall["device"] = device;
346 32 : activeCalls_.emplace_back(activeCall);
347 32 : saveActiveCalls();
348 32 : if (emitSig)
349 32 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
350 : repository_
351 32 : ->id(),
352 32 : activeCalls_);
353 32 : }
354 : } else {
355 27 : if (itActive != activeCalls_.end()) {
356 27 : itActive = activeCalls_.erase(itActive);
357 : // Unlikely, but we must ensure that no duplicate exists
358 27 : while (itActive != activeCalls_.end()) {
359 0 : itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
360 0 : return value.at("id") == confId && value.at("uri") == uri
361 0 : && value.at("device") == device;
362 : });
363 0 : if (itActive != activeCalls_.end()) {
364 0 : JAMI_ERROR("Duplicate call found. (This is a bug)");
365 0 : itActive = activeCalls_.erase(itActive);
366 : }
367 : }
368 :
369 27 : if (eraseOnly) {
370 0 : JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
371 : "{:s}, account {:s}",
372 : convId,
373 : confId,
374 : device,
375 : uri);
376 : } else {
377 81 : JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
378 : convId,
379 : confId,
380 : device,
381 : uri);
382 : }
383 : }
384 27 : saveActiveCalls();
385 27 : if (emitSig)
386 27 : emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
387 27 : repository_->id(),
388 27 : activeCalls_);
389 : }
390 59 : }
391 : }
392 :
393 1190 : void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false) const
394 : {
395 1190 : auto shared = account_.lock();
396 1190 : if (!shared or !repository_)
397 0 : return;
398 1190 : auto convId = repository_->id();
399 1190 : auto ok = !commits.empty();
400 3568 : auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
401 1190 : addToHistory(commits, true, commitFromSelf);
402 1190 : if (ok) {
403 1188 : bool announceMember = false;
404 2423 : for (const auto& c : commits) {
405 : // Announce member events
406 1235 : if (c.at("type") == "member") {
407 919 : if (c.find("uri") != c.end() && c.find("action") != c.end()) {
408 919 : const auto& uri = c.at("uri");
409 919 : const auto& actionStr = c.at("action");
410 919 : auto action = -1;
411 919 : if (actionStr == "add")
412 438 : action = 0;
413 481 : else if (actionStr == "join")
414 459 : action = 1;
415 22 : else if (actionStr == "remove")
416 1 : action = 2;
417 21 : else if (actionStr == "ban")
418 19 : action = 3;
419 2 : else if (actionStr == "unban")
420 2 : action = 4;
421 919 : if (actionStr == "ban" || actionStr == "remove") {
422 : // In this case, a potential host was removed during a call.
423 20 : updateActiveCalls(c);
424 20 : typers_->removeTyper(uri);
425 : }
426 919 : if (action != -1) {
427 919 : announceMember = true;
428 1838 : emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(
429 919 : accountId_, convId, uri, action);
430 : }
431 : }
432 316 : } else if (c.at("type") == "application/call-history+json") {
433 61 : updateActiveCalls(c);
434 : }
435 : #ifdef ENABLE_PLUGIN
436 : auto& pluginChatManager
437 1235 : = Manager::instance().getJamiPluginManager().getChatServicesManager();
438 1235 : if (pluginChatManager.hasHandlers()) {
439 4 : auto cm = std::make_shared<JamiMessage>(accountId_,
440 : convId,
441 8 : c.at("author") != shared->getUsername(),
442 : c,
443 8 : false);
444 4 : cm->isSwarm = true;
445 4 : pluginChatManager.publishMessage(std::move(cm));
446 4 : }
447 : #endif
448 : // announce message
449 1235 : emitSignal<libjami::ConversationSignal::MessageReceived>(accountId_, convId, c);
450 : }
451 :
452 1188 : if (announceMember && onMembersChanged_) {
453 914 : onMembersChanged_(repository_->memberUris("", {}));
454 : }
455 : }
456 1190 : }
457 :
458 380 : void loadStatus()
459 : {
460 : try {
461 : // read file
462 759 : auto file = fileutils::loadFile(statusPath_);
463 : // load values
464 1 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
465 1 : std::lock_guard lk {messageStatusMtx_};
466 1 : oh.get().convert(messagesStatus_);
467 380 : } catch (const std::exception& e) {
468 379 : }
469 380 : }
470 1097 : void saveStatus()
471 : {
472 1097 : std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
473 1097 : msgpack::pack(file, messagesStatus_);
474 1097 : }
475 :
476 397 : void loadActiveCalls() const
477 : {
478 : try {
479 : // read file
480 782 : auto file = fileutils::loadFile(activeCallsPath_);
481 : // load values
482 12 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
483 12 : std::lock_guard lk {activeCallsMtx_};
484 12 : oh.get().convert(activeCalls_);
485 397 : } catch (const std::exception& e) {
486 385 : return;
487 385 : }
488 : }
489 :
490 77 : void saveActiveCalls() const
491 : {
492 77 : std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
493 77 : msgpack::pack(file, activeCalls_);
494 77 : }
495 :
496 17 : void loadHostedCalls() const
497 : {
498 : try {
499 : // read file
500 28 : auto file = fileutils::loadFile(hostedCallsPath_);
501 : // load values
502 6 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
503 6 : std::lock_guard lk {activeCallsMtx_};
504 6 : oh.get().convert(hostedCalls_);
505 17 : } catch (const std::exception& e) {
506 11 : return;
507 11 : }
508 : }
509 :
510 45 : void saveHostedCalls() const
511 : {
512 45 : std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
513 45 : msgpack::pack(file, hostedCalls_);
514 45 : }
515 :
516 : void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
517 :
518 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
519 : bool includeLeft,
520 : bool includeBanned) const;
521 :
522 7256 : std::string_view bannedType(const std::string& uri) const
523 : {
524 0 : auto crt = fmt::format("{}.crt", uri);
525 14508 : auto bannedMember = repoPath() / "banned" / "members" / crt;
526 7256 : if (std::filesystem::is_regular_file(bannedMember))
527 11 : return "members"sv;
528 14491 : auto bannedAdmin = repoPath() / "banned" / "admins" / crt;
529 7245 : if (std::filesystem::is_regular_file(bannedAdmin))
530 0 : return "admins"sv;
531 14492 : auto bannedInvited = repoPath() / "banned" / "invited" / uri;
532 7246 : if (std::filesystem::is_regular_file(bannedInvited))
533 2 : return "invited"sv;
534 14490 : auto bannedDevice = repoPath() / "banned" / "devices" / crt;
535 7245 : if (std::filesystem::is_regular_file(bannedDevice))
536 0 : return "devices"sv;
537 7245 : return {};
538 7258 : }
539 :
540 5017 : std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
541 : {
542 5017 : auto deviceSockets = gitSocketList_.find(deviceId);
543 10036 : return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
544 : }
545 :
546 2544 : void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
547 : {
548 2544 : gitSocketList_[deviceId] = socket;
549 2546 : }
550 812 : void removeGitSocket(const DeviceId& deviceId)
551 : {
552 812 : auto deviceSockets = gitSocketList_.find(deviceId);
553 812 : if (deviceSockets != gitSocketList_.end())
554 419 : gitSocketList_.erase(deviceSockets);
555 812 : }
556 :
557 : std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
558 : bool includeLeft) const;
559 :
560 : std::set<std::string> checkedMembers_; // Store members we tried
561 : std::function<void()> bootstrapCb_;
562 : #ifdef LIBJAMI_TESTABLE
563 : std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
564 : #endif
565 :
566 : std::mutex writeMtx_ {};
567 : std::unique_ptr<ConversationRepository> repository_;
568 : std::shared_ptr<SwarmManager> swarmManager_;
569 : std::weak_ptr<JamiAccount> account_;
570 : std::atomic_bool isRemoving_ {false};
571 : std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
572 : std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
573 : History* optHistory = nullptr);
574 : void pull();
575 : std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
576 :
577 : // Avoid multiple fetch/merges at the same time.
578 : std::mutex pullcbsMtx_ {};
579 : std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch
580 : std::deque<std::tuple<std::string, std::string, OnPullCb>> pullcbs_ {};
581 : std::shared_ptr<TransferManager> transferManager_ {};
582 : std::filesystem::path conversationDataPath_ {};
583 : std::filesystem::path fetchedPath_ {};
584 :
585 : // Manage last message displayed and status
586 : std::filesystem::path sendingPath_ {};
587 : // Manage last message displayed
588 : std::string accountId_ {};
589 : std::filesystem::path preferencesPath_ {};
590 : OnMembersChanged onMembersChanged_ {};
591 :
592 : // Manage hosted calls on this device
593 : std::filesystem::path hostedCallsPath_ {};
594 : mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
595 : // Manage active calls for this conversation (can be hosted by other devices)
596 : std::filesystem::path activeCallsPath_ {};
597 : mutable std::mutex activeCallsMtx_ {};
598 : mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
599 :
600 : GitSocketList gitSocketList_ {};
601 :
602 : // Bootstrap
603 : std::shared_ptr<asio::io_context> ioContext_;
604 : std::unique_ptr<asio::steady_timer> fallbackTimer_;
605 :
606 : bool isMobile {false};
607 :
608 : /**
609 : * Loaded history represents the linearized history to show for clients
610 : */
611 : mutable History loadedHistory_ {};
612 : std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
613 : const std::vector<std::map<std::string, std::string>>& commits,
614 : bool messageReceived = false,
615 : bool commitFromSelf = false,
616 : History* history = nullptr) const;
617 : // While loading the history, we need to avoid:
618 : // - reloading history (can just be ignored)
619 : // - adding new commits (should wait for history to be loaded)
620 : bool isLoadingHistory_ {false};
621 : mutable std::mutex historyMtx_ {};
622 : mutable std::condition_variable historyCv_ {};
623 :
624 : void handleReaction(History& history,
625 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
626 : void handleEdition(History& history,
627 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
628 : bool messageReceived) const;
629 : bool handleMessage(History& history,
630 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
631 : bool messageReceived) const;
632 :
633 : /**
634 : * {uri, {
635 : * {"fetch", "commitId"},
636 : * {"fetched_ts", "timestamp"},
637 : * {"read", "commitId"},
638 : * {"read_ts", "timestamp"}
639 : * }
640 : * }
641 : */
642 : mutable std::mutex messageStatusMtx_;
643 : std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
644 : std::filesystem::path statusPath_ {};
645 : mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
646 : /**
647 : * Status: 0 = commited, 1 = fetched, 2 = read
648 : * This cache the curent status to add in the messages
649 : */
650 : // Note: only store int32_t cause it's easy to pass to dbus this way
651 : // memberToStatus serves as a cache for loading messages
652 : mutable std::map<std::string, int32_t> memberToStatus;
653 : // futureStatus is used to store the status for receiving messages
654 : // (because we're not sure to fetch the commit before receiving a status change for this)
655 : mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
656 : // Update internal structures regarding status
657 : void updateStatus(const std::string& uri,
658 : libjami::Account::MessageStates status,
659 : const std::string& commitId,
660 : const std::string& ts,
661 : bool emit = false);
662 :
663 :
664 : std::shared_ptr<Typers> typers_;
665 : };
666 :
667 : bool
668 19 : Conversation::Impl::isAdmin() const
669 : {
670 19 : auto shared = account_.lock();
671 19 : if (!shared)
672 0 : return false;
673 :
674 38 : auto adminsPath = repoPath() / "admins";
675 19 : auto cert = shared->identity().second;
676 19 : if (!cert->issuer)
677 0 : return false;
678 19 : auto uri = cert->issuer->getId().toString();
679 19 : return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, uri + ".crt"));
680 19 : }
681 :
682 : std::vector<std::map<std::string, std::string>>
683 601 : Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
684 : {
685 601 : std::vector<std::map<std::string, std::string>> result;
686 601 : auto members = repository_->members();
687 601 : std::lock_guard lk(messageStatusMtx_);
688 2829 : for (const auto& member : members) {
689 2228 : if (member.role == MemberRole::BANNED && !includeBanned) {
690 180 : continue;
691 : }
692 2224 : if (member.role == MemberRole::INVITED && !includeInvited)
693 176 : continue;
694 2048 : if (member.role == MemberRole::LEFT && !includeLeft)
695 0 : continue;
696 2048 : auto mm = member.map();
697 2048 : mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
698 2048 : result.emplace_back(std::move(mm));
699 2048 : }
700 1202 : return result;
701 601 : }
702 :
703 : std::vector<std::string>
704 17 : Conversation::Impl::commitsEndedCalls()
705 : {
706 17 : auto shared = account_.lock();
707 : // Handle current calls
708 17 : std::vector<std::string> commits {};
709 17 : std::unique_lock lk(writeMtx_);
710 17 : std::unique_lock lkA(activeCallsMtx_);
711 17 : for (const auto& hostedCall : hostedCalls_) {
712 : // In this case, this means that we left
713 : // the conference while still hosting it, so activeCalls
714 : // will not be correctly updated
715 : // We don't need to send notifications there, as peers will sync with presence
716 0 : Json::Value value;
717 0 : auto uri = shared->getUsername();
718 0 : auto device = std::string(shared->currentDeviceId());
719 0 : value["uri"] = uri;
720 0 : value["device"] = device;
721 0 : value["confId"] = hostedCall.first;
722 0 : value["type"] = "application/call-history+json";
723 0 : auto now = std::chrono::system_clock::now();
724 0 : auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
725 0 : .count();
726 0 : value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
727 0 : auto itActive = std::find_if(activeCalls_.begin(),
728 : activeCalls_.end(),
729 0 : [confId = hostedCall.first, uri, device](auto value) {
730 0 : return value.at("id") == confId && value.at("uri") == uri
731 0 : && value.at("device") == device;
732 : });
733 0 : if (itActive != activeCalls_.end())
734 0 : activeCalls_.erase(itActive);
735 0 : commits.emplace_back(repository_->commitMessage(Json::writeString(jsonBuilder, value)));
736 :
737 0 : JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
738 0 : }
739 17 : hostedCalls_.clear();
740 17 : saveActiveCalls();
741 17 : saveHostedCalls();
742 34 : return commits;
743 17 : }
744 :
745 : std::filesystem::path
746 34228 : Conversation::Impl::repoPath() const
747 : {
748 68451 : return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
749 : }
750 :
751 : std::vector<std::map<std::string, std::string>>
752 17 : Conversation::Impl::loadMessages(const LogOptions& options)
753 : {
754 17 : if (!repository_)
755 0 : return {};
756 17 : std::vector<ConversationCommit> commits;
757 17 : auto startLogging = options.from == "";
758 17 : auto breakLogging = false;
759 34 : repository_->log(
760 56 : [&](const auto& id, const auto& author, const auto& commit) {
761 56 : if (!commits.empty()) {
762 : // Set linearized parent
763 39 : commits.rbegin()->linearized_parent = id;
764 : }
765 56 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
766 0 : return CallbackResult::Skip;
767 : }
768 56 : if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
769 0 : return CallbackResult::Break; // Stop logging
770 56 : if (breakLogging)
771 0 : return CallbackResult::Break; // Stop logging
772 56 : if (id == options.to) {
773 10 : if (options.includeTo)
774 0 : breakLogging = true; // For the next commit
775 : else
776 10 : return CallbackResult::Break; // Stop logging
777 : }
778 :
779 46 : if (!startLogging && options.from != "" && options.from == id)
780 0 : startLogging = true;
781 46 : if (!startLogging)
782 0 : return CallbackResult::Skip; // Start logging after this one
783 :
784 46 : if (options.fastLog) {
785 0 : if (options.authorUri != "") {
786 0 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
787 0 : return CallbackResult::Break; // Found author, stop
788 : }
789 : }
790 : // Used to only count commit
791 0 : commits.emplace(commits.end(), ConversationCommit {});
792 0 : return CallbackResult::Skip;
793 : }
794 :
795 46 : return CallbackResult::Ok; // Continue
796 : },
797 46 : [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
798 46 : [](auto, auto, auto) { return false; },
799 17 : options.from,
800 17 : options.logIfNotFound);
801 17 : return repository_->convCommitsToMap(commits);
802 17 : }
803 :
804 : std::vector<libjami::SwarmMessage>
805 1532 : Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
806 : {
807 1532 : if (!optHistory) {
808 2 : std::lock_guard lock(historyMtx_);
809 2 : if (!repository_ || isLoadingHistory_)
810 0 : return {};
811 2 : isLoadingHistory_ = true;
812 2 : }
813 :
814 1532 : auto startLogging = options.from == "";
815 1532 : auto breakLogging = false;
816 1532 : auto currentHistorySize = loadedHistory_.messageList.size();
817 1532 : std::vector<std::string> replies;
818 1532 : std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
819 3064 : repository_->log(
820 14912 : [&](const auto& id, const auto& author, const auto& commit) {
821 14912 : if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
822 0 : return CallbackResult::Skip;
823 : }
824 14912 : if (replies.empty()) { // This avoid load until
825 : // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
826 : // until this commit
827 29822 : if ((options.nbOfCommits != 0
828 20513 : && (loadedHistory_.messageList.size() - currentHistorySize)
829 5602 : == options.nbOfCommits))
830 4 : return CallbackResult::Break; // Stop logging
831 14907 : if (breakLogging)
832 0 : return CallbackResult::Break; // Stop logging
833 14907 : if (id == options.to) {
834 599 : if (options.includeTo)
835 0 : breakLogging = true; // For the next commit
836 : else
837 599 : return CallbackResult::Break; // Stop logging
838 : }
839 : }
840 :
841 14309 : if (!startLogging && options.from != "" && options.from == id)
842 1092 : startLogging = true;
843 14309 : if (!startLogging)
844 24 : return CallbackResult::Skip; // Start logging after this one
845 :
846 14285 : if (options.fastLog) {
847 8679 : if (options.authorUri != "") {
848 1 : if (options.authorUri == repository_->uriFromDevice(author.email)) {
849 1 : return CallbackResult::Break; // Found author, stop
850 : }
851 : }
852 : }
853 :
854 14286 : return CallbackResult::Ok; // Continue
855 : },
856 14284 : [&](auto&& cc) {
857 14284 : auto optMessage = repository_->convCommitToMap(cc);
858 14285 : if (!optMessage.has_value())
859 1 : return;
860 14284 : auto message = optMessage.value();
861 14284 : if (message.find("reply-to") != message.end()) {
862 1 : replies.emplace_back(message.at("reply-to"));
863 : }
864 14283 : auto it = std::find(replies.begin(), replies.end(), message.at("id"));
865 14285 : if (it != replies.end()) {
866 1 : replies.erase(it);
867 : }
868 14283 : std::shared_ptr<libjami::SwarmMessage> firstMsg;
869 14283 : if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) {
870 0 : firstMsg = *loadedHistory_.messageList.rbegin();
871 : }
872 42851 : auto added = addToHistory({message}, false, false, optHistory);
873 14284 : if (!added.empty() && firstMsg) {
874 0 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
875 0 : repository_->id(),
876 0 : *firstMsg);
877 : }
878 14284 : msgList.insert(msgList.end(), added.begin(), added.end());
879 14286 : },
880 14286 : [](auto, auto, auto) { return false; },
881 1532 : options.from,
882 1532 : options.logIfNotFound);
883 :
884 : // Convert for client (remove ptr)
885 1532 : std::vector<libjami::SwarmMessage> ret;
886 1532 : ret.reserve(msgList.size());
887 15812 : for (const auto& msg: msgList) {
888 14280 : ret.emplace_back(*msg);
889 : }
890 1532 : if (!optHistory) {
891 2 : std::lock_guard lock(historyMtx_);
892 2 : isLoadingHistory_ = false;
893 2 : historyCv_.notify_all();
894 2 : }
895 :
896 1532 : return ret;
897 1532 : }
898 :
899 : void
900 3 : Conversation::Impl::handleReaction(History& history,
901 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
902 : {
903 3 : auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
904 3 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
905 3 : if (peditIt != history.pendingEditions.end()) {
906 0 : auto oldBody = sharedCommit->body;
907 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
908 0 : if (sharedCommit->body.at("body").empty())
909 0 : return;
910 0 : history.pendingEditions.erase(peditIt);
911 0 : }
912 3 : if (it != history.quickAccess.end()) {
913 3 : it->second->reactions.emplace_back(sharedCommit->body);
914 3 : emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
915 3 : repository_->id(),
916 3 : it->second->id,
917 3 : sharedCommit->body);
918 : } else {
919 0 : history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
920 : }
921 : }
922 :
923 : void
924 7 : Conversation::Impl::handleEdition(History& history,
925 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
926 : bool messageReceived) const
927 : {
928 14 : auto editId = sharedCommit->body.at("edit");
929 7 : auto it = history.quickAccess.find(editId);
930 7 : if (it != history.quickAccess.end()) {
931 5 : auto baseCommit = it->second;
932 5 : if (baseCommit) {
933 5 : auto itReact = baseCommit->body.find("react-to");
934 10 : auto body = sharedCommit->body.at("body");
935 : // Edit reaction
936 5 : if (itReact != baseCommit->body.end()) {
937 1 : baseCommit->body["body"] = body; // Replace body if pending
938 1 : it = history.quickAccess.find(itReact->second);
939 1 : auto itPending = history.pendingReactions.find(itReact->second);
940 1 : if (it != history.quickAccess.end()) {
941 1 : baseCommit = it->second; // Base commit
942 1 : auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
943 1 : baseCommit->reactions.end(),
944 1 : [&](const auto& reaction) {
945 1 : return reaction.at("id") == editId;
946 : });
947 1 : if (itPreviousReact != baseCommit->reactions.end()) {
948 1 : (*itPreviousReact)["body"] = body;
949 1 : if (body.empty()) {
950 1 : baseCommit->reactions.erase(itPreviousReact);
951 2 : emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
952 : repository_
953 1 : ->id(),
954 1 : baseCommit->id,
955 : editId);
956 : }
957 : }
958 0 : } else if (itPending != history.pendingReactions.end()) {
959 : // Else edit if pending
960 0 : auto itReaction = std::find_if(itPending->second.begin(),
961 0 : itPending->second.end(),
962 0 : [&](const auto& reaction) {
963 0 : return reaction.at("id") == editId;
964 : });
965 0 : if (itReaction != itPending->second.end()) {
966 0 : (*itReaction)["body"] = body;
967 0 : if (body.empty())
968 0 : itPending->second.erase(itReaction);
969 : }
970 : } else {
971 : // Add to pending edtions
972 0 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
973 0 : : history.pendingEditions[editId].emplace_back(sharedCommit);
974 : }
975 : } else {
976 : // Normal message
977 4 : it->second->editions.emplace(it->second->editions.begin(), it->second->body);
978 4 : it->second->body["body"] = sharedCommit->body["body"];
979 : // Remove reactions
980 4 : if (sharedCommit->body.at("body").empty())
981 1 : it->second->reactions.clear();
982 8 : emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
983 4 : repository_->id(),
984 4 : *it->second);
985 : }
986 5 : }
987 5 : } else {
988 2 : messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
989 2 : : history.pendingEditions[editId].emplace_back(sharedCommit);
990 : }
991 7 : }
992 :
993 : bool
994 15504 : Conversation::Impl::handleMessage(History& history,
995 : const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
996 : bool messageReceived) const
997 : {
998 15504 : if (messageReceived) {
999 : // For a received message, we place it at the beginning of the list
1000 1206 : if (!history.messageList.empty())
1001 972 : sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1002 1206 : history.messageList.emplace_front(sharedCommit);
1003 : } else {
1004 : // For a loaded message, we load from newest to oldest
1005 : // So we change the parent of the last message.
1006 14298 : if (!history.messageList.empty())
1007 12775 : (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1008 14300 : history.messageList.emplace_back(sharedCommit);
1009 : }
1010 : // Handle pending reactions/editions
1011 15506 : auto reactIt = history.pendingReactions.find(sharedCommit->id);
1012 15506 : if (reactIt != history.pendingReactions.end()) {
1013 0 : for (const auto& commitBody : reactIt->second)
1014 0 : sharedCommit->reactions.emplace_back(commitBody);
1015 0 : history.pendingReactions.erase(reactIt);
1016 : }
1017 15502 : auto peditIt = history.pendingEditions.find(sharedCommit->id);
1018 15505 : if (peditIt != history.pendingEditions.end()) {
1019 0 : auto oldBody = sharedCommit->body;
1020 0 : sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1021 0 : peditIt->second.pop_front();
1022 0 : for (const auto& commit : peditIt->second) {
1023 0 : sharedCommit->editions.emplace_back(commit->body);
1024 : }
1025 0 : sharedCommit->editions.emplace_back(oldBody);
1026 0 : history.pendingEditions.erase(peditIt);
1027 0 : }
1028 : // Announce to client
1029 15504 : if (messageReceived)
1030 2412 : emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
1031 1206 : repository_->id(),
1032 1206 : *sharedCommit);
1033 15506 : return !messageReceived;
1034 : }
1035 :
1036 : std::vector<std::shared_ptr<libjami::SwarmMessage>>
1037 15492 : Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::string>>& commits,
1038 : bool messageReceived,
1039 : bool commitFromSelf,
1040 : History* optHistory) const
1041 : {
1042 15492 : auto acc = account_.lock();
1043 15494 : if (!acc)
1044 0 : return {};
1045 15492 : auto username = acc->getUsername();
1046 15493 : if (messageReceived && (!optHistory && isLoadingHistory_)) {
1047 0 : std::unique_lock lk(historyMtx_);
1048 0 : historyCv_.wait(lk, [&] { return !isLoadingHistory_; });
1049 0 : }
1050 15493 : std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1051 15540 : auto addCommit = [&](const auto& commit) {
1052 15540 : auto* history = optHistory ? optHistory : &loadedHistory_;
1053 31079 : auto commitId = commit.at("id");
1054 15539 : if (history->quickAccess.find(commitId) != history->quickAccess.end())
1055 7 : return; // Already present
1056 15530 : auto typeIt = commit.find("type");
1057 15529 : auto reactToIt = commit.find("react-to");
1058 15533 : auto editIt = commit.find("edit");
1059 : // Nothing to show for the client, skip
1060 15532 : if (typeIt != commit.end() && typeIt->second == "merge")
1061 17 : return;
1062 :
1063 15514 : auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1064 15515 : sharedCommit->fromMapStringString(commit);
1065 : // Set message status based on cache (only on history for client)
1066 15516 : if (!commitFromSelf && optHistory == nullptr) {
1067 916 : std::lock_guard lk(messageStatusMtx_);
1068 13533 : for (const auto& member: repository_->members()) {
1069 : // If we have a status cached, use it
1070 12619 : auto itFuture = futureStatus.find(sharedCommit->id);
1071 12621 : if (itFuture != futureStatus.end()) {
1072 13 : sharedCommit->status = std::move(itFuture->second);
1073 13 : futureStatus.erase(itFuture);
1074 920 : continue;
1075 : }
1076 : // Else we need to compute the status.
1077 12608 : auto& cache = memberToStatus[member.uri];
1078 12606 : if (cache == 0) {
1079 : // Message is sending, sent or displayed
1080 1032 : cache = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1081 : }
1082 12606 : if (!messageReceived) {
1083 : // For loading previous messages, there is 3 cases. Last value cached is displayed, so is every previous commits
1084 : // Else, if last value is sent, we can compare to the last read commit to update the cache
1085 : // Finally if it's sending, we check last fetched commit
1086 12 : if (cache == static_cast<int32_t>(libjami::Account::MessageStates::SENT)) {
1087 0 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1088 0 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1089 : }
1090 12 : } else if (cache <= static_cast<int32_t>(libjami::Account::MessageStates::SENDING)) { // SENDING or UNKNOWN
1091 : // cache can be upgraded to displayed or sent
1092 9 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1093 1 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1094 8 : } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1095 0 : cache = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1096 : }
1097 : }
1098 12 : sharedCommit->status[member.uri] = static_cast<int32_t>(cache);
1099 : } else {
1100 : // If member is author of the message received, they already saw it
1101 12594 : if (member.uri == commit.at("author")) {
1102 : // If member is the author of the commit, they are considered as displayed (same for all previous commits)
1103 907 : messagesStatus_[member.uri]["read"] = sharedCommit->id;
1104 907 : messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1105 907 : sharedCommit->status[commit.at("author")] = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1106 907 : cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1107 907 : continue;
1108 : }
1109 : // For receiving messages, every commit is considered as SENDING, unless we got a update
1110 11685 : auto status = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1111 11685 : if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1112 0 : status = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1113 11682 : } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1114 0 : status = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1115 : }
1116 11688 : sharedCommit->status[member.uri] = static_cast<int32_t>(status);
1117 : }
1118 : }
1119 916 : }
1120 15516 : history->quickAccess[commitId] = sharedCommit;
1121 :
1122 15515 : if (reactToIt != commit.end() && !reactToIt->second.empty()) {
1123 3 : handleReaction(*history, sharedCommit);
1124 15511 : } else if (editIt != commit.end() && !editIt->second.empty()) {
1125 7 : handleEdition(*history, sharedCommit, messageReceived);
1126 15503 : } else if (handleMessage(*history, sharedCommit, messageReceived)) {
1127 14298 : messages.emplace_back(sharedCommit);
1128 : }
1129 15538 : };
1130 15494 : std::for_each(commits.begin(), commits.end(), addCommit);
1131 :
1132 15495 : return messages;
1133 15494 : }
1134 :
1135 178 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1136 : ConversationMode mode,
1137 178 : const std::string& otherMember)
1138 178 : : pimpl_ {new Impl {account, mode, otherMember}}
1139 178 : {}
1140 :
1141 17 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1142 17 : const std::string& conversationId)
1143 17 : : pimpl_ {new Impl {account, conversationId}}
1144 17 : {}
1145 :
1146 194 : Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1147 : const std::string& remoteDevice,
1148 194 : const std::string& conversationId)
1149 194 : : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1150 194 : {}
1151 :
1152 307 : Conversation::~Conversation() {}
1153 :
1154 : std::string
1155 9220 : Conversation::id() const
1156 : {
1157 9220 : return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1158 : }
1159 :
1160 : void
1161 135 : Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
1162 : {
1163 : try {
1164 135 : if (mode() == ConversationMode::ONE_TO_ONE) {
1165 : // Only authorize to add left members
1166 1 : auto initialMembers = getInitialMembers();
1167 1 : auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1168 1 : if (it == initialMembers.end()) {
1169 1 : JAMI_WARN("Cannot add new member in one to one conversation");
1170 1 : cb(false, "");
1171 1 : return;
1172 : }
1173 1 : }
1174 0 : } catch (const std::exception& e) {
1175 0 : JAMI_WARN("Cannot get mode: %s", e.what());
1176 0 : cb(false, "");
1177 0 : return;
1178 0 : }
1179 134 : if (isMember(contactUri, true)) {
1180 0 : JAMI_WARN("Could not add member %s because it's already a member", contactUri.c_str());
1181 0 : cb(false, "");
1182 0 : return;
1183 : }
1184 134 : if (isBanned(contactUri)) {
1185 3 : if (pimpl_->isAdmin()) {
1186 6 : dht::ThreadPool::io().run(
1187 4 : [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1188 2 : if (auto sthis = w.lock()) {
1189 2 : auto members = sthis->pimpl_->repository_->members();
1190 2 : auto type = sthis->pimpl_->bannedType(contactUri);
1191 2 : if (type.empty()) {
1192 0 : cb(false, {});
1193 0 : return;
1194 : }
1195 2 : sthis->pimpl_->voteUnban(contactUri, type, cb);
1196 4 : }
1197 : });
1198 : } else {
1199 1 : JAMI_WARN("Could not add member %s because this member is banned", contactUri.c_str());
1200 1 : cb(false, "");
1201 : }
1202 3 : return;
1203 : }
1204 :
1205 131 : dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1206 131 : if (auto sthis = w.lock()) {
1207 : // Add member files and commit
1208 131 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1209 131 : auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1210 131 : sthis->pimpl_->announce(commit, true);
1211 131 : lk.unlock();
1212 131 : if (cb)
1213 131 : cb(!commit.empty(), commit);
1214 262 : }
1215 131 : });
1216 : }
1217 :
1218 : std::shared_ptr<dhtnet::ChannelSocket>
1219 5016 : Conversation::gitSocket(const DeviceId& deviceId) const
1220 : {
1221 5016 : return pimpl_->gitSocket(deviceId);
1222 : }
1223 :
1224 : void
1225 2544 : Conversation::addGitSocket(const DeviceId& deviceId,
1226 : const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1227 : {
1228 2544 : pimpl_->addGitSocket(deviceId, socket);
1229 2546 : }
1230 :
1231 : void
1232 812 : Conversation::removeGitSocket(const DeviceId& deviceId)
1233 : {
1234 812 : pimpl_->removeGitSocket(deviceId);
1235 811 : }
1236 :
1237 : void
1238 390 : Conversation::shutdownConnections()
1239 : {
1240 390 : pimpl_->fallbackTimer_->cancel();
1241 390 : pimpl_->gitSocketList_.clear();
1242 390 : if (pimpl_->swarmManager_)
1243 390 : pimpl_->swarmManager_->shutdown();
1244 390 : pimpl_->checkedMembers_.clear();
1245 390 : }
1246 :
1247 : void
1248 0 : Conversation::connectivityChanged()
1249 : {
1250 0 : if (pimpl_->swarmManager_)
1251 0 : pimpl_->swarmManager_->maintainBuckets();
1252 0 : }
1253 :
1254 : std::vector<jami::DeviceId>
1255 0 : Conversation::getDeviceIdList() const
1256 : {
1257 0 : return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1258 : }
1259 :
1260 : std::shared_ptr<Typers>
1261 9 : Conversation::typers() const
1262 : {
1263 9 : return pimpl_->typers_;
1264 : }
1265 :
1266 : bool
1267 0 : Conversation::hasSwarmChannel(const std::string& deviceId)
1268 : {
1269 0 : if (!pimpl_->swarmManager_)
1270 0 : return false;
1271 0 : return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1272 : }
1273 :
1274 : void
1275 2 : Conversation::Impl::voteUnban(const std::string& contactUri,
1276 : const std::string_view type,
1277 : const OnDoneCb& cb)
1278 : {
1279 : // Check if admin
1280 2 : if (!isAdmin()) {
1281 0 : JAMI_WARN("You're not an admin of this repo. Cannot unban %s", contactUri.c_str());
1282 0 : cb(false, {});
1283 0 : return;
1284 : }
1285 :
1286 : // Vote for removal
1287 2 : std::unique_lock lk(writeMtx_);
1288 2 : auto voteCommit = repository_->voteUnban(contactUri, type);
1289 2 : if (voteCommit.empty()) {
1290 0 : JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1291 0 : cb(false, "");
1292 0 : return;
1293 : }
1294 :
1295 2 : auto lastId = voteCommit;
1296 2 : std::vector<std::string> commits;
1297 2 : commits.emplace_back(voteCommit);
1298 :
1299 : // If admin, check vote
1300 4 : auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1301 2 : if (!resolveCommit.empty()) {
1302 2 : commits.emplace_back(resolveCommit);
1303 2 : lastId = resolveCommit;
1304 2 : JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1305 : }
1306 2 : announce(commits, true);
1307 2 : lk.unlock();
1308 2 : if (cb)
1309 2 : cb(!lastId.empty(), lastId);
1310 2 : }
1311 :
1312 : void
1313 14 : Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
1314 : {
1315 42 : dht::ThreadPool::io().run([w = weak(),
1316 14 : contactUri = std::move(contactUri),
1317 14 : isDevice = std::move(isDevice),
1318 14 : cb = std::move(cb)] {
1319 14 : if (auto sthis = w.lock()) {
1320 : // Check if admin
1321 14 : if (!sthis->pimpl_->isAdmin()) {
1322 1 : JAMI_WARN("You're not an admin of this repo. Cannot ban %s", contactUri.c_str());
1323 1 : cb(false, {});
1324 2 : return;
1325 : }
1326 :
1327 : // Get current user type
1328 13 : std::string type;
1329 13 : if (isDevice) {
1330 0 : type = "devices";
1331 : } else {
1332 13 : auto members = sthis->pimpl_->repository_->members();
1333 29 : for (const auto& member : members) {
1334 29 : if (member.uri == contactUri) {
1335 13 : if (member.role == MemberRole::INVITED) {
1336 3 : type = "invited";
1337 10 : } else if (member.role == MemberRole::ADMIN) {
1338 1 : type = "admins";
1339 9 : } else if (member.role == MemberRole::MEMBER) {
1340 9 : type = "members";
1341 : }
1342 13 : break;
1343 : }
1344 : }
1345 13 : if (type.empty()) {
1346 0 : cb(false, {});
1347 0 : return;
1348 : }
1349 13 : }
1350 :
1351 : // Vote for removal
1352 13 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1353 13 : auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1354 13 : if (voteCommit.empty()) {
1355 1 : JAMI_WARN("Kicking %s failed", contactUri.c_str());
1356 1 : cb(false, "");
1357 1 : return;
1358 : }
1359 :
1360 12 : auto lastId = voteCommit;
1361 12 : std::vector<std::string> commits;
1362 12 : commits.emplace_back(voteCommit);
1363 :
1364 : // If admin, check vote
1365 24 : auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1366 12 : if (!resolveCommit.empty()) {
1367 12 : commits.emplace_back(resolveCommit);
1368 12 : lastId = resolveCommit;
1369 12 : JAMI_WARN("Vote solved for %s. %s banned",
1370 : contactUri.c_str(),
1371 : isDevice ? "Device" : "Member");
1372 :
1373 12 : const auto nodes = sthis->pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1374 : // Remove nodes from swarmManager
1375 12 : std::vector<NodeId> toRemove;
1376 26 : for (const auto node : nodes)
1377 14 : if (contactUri == sthis->uriFromDevice(node.toString()))
1378 9 : toRemove.emplace_back(node);
1379 12 : sthis->pimpl_->swarmManager_->deleteNode(toRemove);
1380 : // Remove git sockets with this member
1381 12 : std::vector<DeviceId> gitToRm;
1382 25 : for (const auto& [deviceId, _] : sthis->pimpl_->gitSocketList_)
1383 13 : if (contactUri == sthis->uriFromDevice(deviceId.toString()))
1384 9 : gitToRm.emplace_back(deviceId);
1385 21 : for (const auto& did : gitToRm)
1386 9 : sthis->removeGitSocket(did);
1387 12 : }
1388 :
1389 12 : sthis->pimpl_->announce(commits, true);
1390 12 : lk.unlock();
1391 12 : cb(!lastId.empty(), lastId);
1392 29 : }
1393 : });
1394 14 : }
1395 :
1396 : std::vector<std::map<std::string, std::string>>
1397 601 : Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
1398 : {
1399 601 : return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1400 : }
1401 :
1402 : std::set<std::string>
1403 3238 : Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1404 : {
1405 3238 : return pimpl_->repository_->memberUris(filter, filteredRoles);
1406 : }
1407 :
1408 : std::vector<NodeId>
1409 3149 : Conversation::peersToSyncWith() const
1410 : {
1411 3149 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1412 3149 : const auto& nodes = routingTable.getNodes();
1413 3149 : const auto& mobiles = routingTable.getMobileNodes();
1414 3149 : std::vector<NodeId> s;
1415 3149 : s.reserve(nodes.size() + mobiles.size());
1416 3149 : s.insert(s.end(), nodes.begin(), nodes.end());
1417 3149 : s.insert(s.end(), mobiles.begin(), mobiles.end());
1418 33459 : for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1419 30307 : if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1420 201 : s.emplace_back(deviceId);
1421 6298 : return s;
1422 3149 : }
1423 :
1424 : bool
1425 3149 : Conversation::isBootstraped() const
1426 : {
1427 3149 : const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1428 3149 : return !routingTable.getNodes().empty();
1429 : }
1430 :
1431 : std::string
1432 41115 : Conversation::uriFromDevice(const std::string& deviceId) const
1433 : {
1434 41115 : return pimpl_->repository_->uriFromDevice(deviceId);
1435 : }
1436 :
1437 : void
1438 0 : Conversation::monitor()
1439 : {
1440 0 : pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1441 0 : }
1442 :
1443 : std::string
1444 185 : Conversation::join()
1445 : {
1446 185 : return pimpl_->repository_->join();
1447 : }
1448 :
1449 : bool
1450 5215 : Conversation::isMember(const std::string& uri, bool includeInvited) const
1451 : {
1452 5215 : auto repoPath = pimpl_->repoPath();
1453 5214 : auto invitedPath = repoPath / "invited";
1454 5214 : auto adminsPath = repoPath / "admins";
1455 5215 : auto membersPath = repoPath / "members";
1456 20858 : std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1457 5215 : if (includeInvited)
1458 4247 : pathsToCheck.emplace_back(invitedPath);
1459 12824 : for (const auto& path : pathsToCheck) {
1460 59815 : for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1461 52193 : std::string_view crtUri = certificate;
1462 52201 : auto crtIt = crtUri.find(".crt");
1463 52195 : if (path != invitedPath && crtIt == std::string_view::npos) {
1464 0 : JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1465 0 : continue;
1466 0 : }
1467 52247 : if (crtIt != std::string_view::npos)
1468 50341 : crtUri = crtUri.substr(0, crtIt);
1469 52222 : if (crtUri == uri)
1470 4122 : return true;
1471 11733 : }
1472 : }
1473 :
1474 1092 : if (includeInvited && mode() == ConversationMode::ONE_TO_ONE) {
1475 3 : for (const auto& member : getInitialMembers()) {
1476 2 : if (member == uri)
1477 0 : return true;
1478 1 : }
1479 : }
1480 :
1481 1092 : return false;
1482 5213 : }
1483 :
1484 : bool
1485 7253 : Conversation::isBanned(const std::string& uri) const
1486 : {
1487 7253 : return !pimpl_->bannedType(uri).empty();
1488 : }
1489 :
1490 : void
1491 0 : Conversation::sendMessage(std::string&& message,
1492 : const std::string& type,
1493 : const std::string& replyTo,
1494 : OnCommitCb&& onCommit,
1495 : OnDoneCb&& cb)
1496 : {
1497 0 : Json::Value json;
1498 0 : json["body"] = std::move(message);
1499 0 : json["type"] = type;
1500 0 : sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1501 0 : }
1502 :
1503 : void
1504 139 : Conversation::sendMessage(Json::Value&& value,
1505 : const std::string& replyTo,
1506 : OnCommitCb&& onCommit,
1507 : OnDoneCb&& cb)
1508 : {
1509 139 : if (!replyTo.empty()) {
1510 2 : auto commit = pimpl_->repository_->getCommit(replyTo);
1511 2 : if (commit == std::nullopt) {
1512 1 : JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1513 1 : return;
1514 : }
1515 1 : value["reply-to"] = replyTo;
1516 2 : }
1517 552 : dht::ThreadPool::io().run(
1518 414 : [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1519 138 : if (auto sthis = w.lock()) {
1520 138 : auto acc = sthis->pimpl_->account_.lock();
1521 138 : if (!acc)
1522 0 : return;
1523 138 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1524 138 : auto commit = sthis->pimpl_->repository_->commitMessage(
1525 138 : Json::writeString(jsonBuilder, value));
1526 138 : lk.unlock();
1527 138 : if (onCommit)
1528 10 : onCommit(commit);
1529 138 : sthis->pimpl_->announce(commit, true);
1530 138 : if (cb)
1531 138 : cb(!commit.empty(), commit);
1532 276 : }
1533 : });
1534 : }
1535 :
1536 : void
1537 1 : Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
1538 : {
1539 1 : dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1540 1 : if (auto sthis = w.lock()) {
1541 1 : std::vector<std::string> commits;
1542 1 : commits.reserve(messages.size());
1543 1 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1544 3 : for (const auto& message : messages) {
1545 2 : auto commit = sthis->pimpl_->repository_->commitMessage(
1546 2 : Json::writeString(jsonBuilder, message));
1547 2 : commits.emplace_back(std::move(commit));
1548 2 : }
1549 1 : lk.unlock();
1550 1 : sthis->pimpl_->announce(commits, true);
1551 1 : if (cb)
1552 1 : cb(commits);
1553 2 : }
1554 1 : });
1555 1 : }
1556 :
1557 : std::optional<std::map<std::string, std::string>>
1558 27869 : Conversation::getCommit(const std::string& commitId) const
1559 : {
1560 27869 : auto commit = pimpl_->repository_->getCommit(commitId);
1561 27871 : if (commit == std::nullopt)
1562 3258 : return std::nullopt;
1563 24612 : return pimpl_->repository_->convCommitToMap(*commit);
1564 27870 : }
1565 :
1566 : void
1567 7 : Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
1568 : {
1569 7 : if (!cb)
1570 0 : return;
1571 7 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1572 7 : if (auto sthis = w.lock()) {
1573 7 : cb(sthis->pimpl_->loadMessages(options));
1574 7 : }
1575 7 : });
1576 : }
1577 :
1578 : void
1579 2 : Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
1580 : {
1581 2 : if (!cb)
1582 0 : return;
1583 2 : dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1584 2 : if (auto sthis = w.lock()) {
1585 2 : cb(sthis->pimpl_->loadMessages2(options));
1586 2 : }
1587 2 : });
1588 : }
1589 :
1590 : void
1591 0 : Conversation::clearCache()
1592 : {
1593 0 : pimpl_->loadedHistory_.messageList.clear();
1594 0 : pimpl_->loadedHistory_.quickAccess.clear();
1595 0 : pimpl_->loadedHistory_.pendingEditions.clear();
1596 0 : pimpl_->loadedHistory_.pendingReactions.clear();
1597 0 : }
1598 :
1599 : std::string
1600 3635 : Conversation::lastCommitId() const
1601 : {
1602 3635 : LogOptions options;
1603 3634 : options.nbOfCommits = 1;
1604 3634 : options.skipMerge = true;
1605 3634 : History optHistory;
1606 : {
1607 3634 : std::lock_guard lk(pimpl_->historyMtx_);
1608 3635 : if (!pimpl_->loadedHistory_.messageList.empty())
1609 6410 : return (*pimpl_->loadedHistory_.messageList.begin())->id;
1610 3633 : }
1611 :
1612 429 : std::lock_guard lk(pimpl_->writeMtx_);
1613 429 : auto res = pimpl_->loadMessages2(options, &optHistory);
1614 429 : if (res.empty())
1615 4 : return {};
1616 850 : return (*optHistory.messageList.begin())->id;
1617 3634 : }
1618 :
1619 : std::vector<std::map<std::string, std::string>>
1620 1794 : Conversation::Impl::mergeHistory(const std::string& uri)
1621 : {
1622 1794 : if (not repository_) {
1623 0 : JAMI_WARN("Invalid repo. Abort merge");
1624 0 : return {};
1625 : }
1626 3588 : auto remoteHead = repository_->remoteHead(uri);
1627 1794 : if (remoteHead.empty()) {
1628 0 : JAMI_WARN("Could not get HEAD of %s", uri.c_str());
1629 0 : return {};
1630 : }
1631 :
1632 : // Validate commit
1633 1794 : auto [newCommits, err] = repository_->validFetch(uri);
1634 1794 : if (newCommits.empty()) {
1635 897 : if (err)
1636 18 : JAMI_ERR("Could not validate history with %s", uri.c_str());
1637 897 : repository_->removeBranchWith(uri);
1638 897 : return {};
1639 : }
1640 :
1641 : // If validated, merge
1642 897 : auto [ok, cid] = repository_->merge(remoteHead);
1643 897 : if (!ok) {
1644 0 : JAMI_ERR("Could not merge history with %s", uri.c_str());
1645 0 : repository_->removeBranchWith(uri);
1646 0 : return {};
1647 : }
1648 897 : if (!cid.empty()) {
1649 : // A merge commit was generated, should be added in new commits
1650 9 : auto commit = repository_->getCommit(cid);
1651 9 : if (commit != std::nullopt)
1652 9 : newCommits.emplace_back(*commit);
1653 9 : }
1654 :
1655 2691 : JAMI_DEBUG("Successfully merge history with {:s}", uri);
1656 897 : auto result = repository_->convCommitsToMap(newCommits);
1657 1813 : for (const auto& commit : result) {
1658 916 : auto it = commit.find("type");
1659 916 : if (it != commit.end() && it->second == "member") {
1660 768 : repository_->refreshMembers();
1661 : }
1662 : }
1663 897 : return result;
1664 1794 : }
1665 :
1666 : bool
1667 2361 : Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1668 : {
1669 2361 : std::lock_guard lk(pimpl_->pullcbsMtx_);
1670 2360 : auto isInProgress = not pimpl_->pullcbs_.empty();
1671 2360 : auto itPull = std::find_if(pimpl_->pullcbs_.begin(),
1672 2360 : pimpl_->pullcbs_.end(),
1673 5865 : [&](const auto& elem) { return std::get<0>(elem) == deviceId && std::get<1>(elem) == commitId; });
1674 2360 : if (itPull != pimpl_->pullcbs_.end()) {
1675 23 : cb(false);
1676 23 : return false;
1677 : }
1678 2337 : JAMI_INFO() << "Sync " << id() << " with " << deviceId;
1679 2338 : pimpl_->pullcbs_.emplace_back(deviceId, std::move(commitId), std::move(cb));
1680 2338 : if (isInProgress)
1681 390 : return true;
1682 1948 : dht::ThreadPool::io().run([w = weak()] {
1683 1948 : if (auto sthis_ = w.lock())
1684 1948 : sthis_->pimpl_->pull();
1685 1948 : });
1686 1948 : return true;
1687 2361 : }
1688 :
1689 : void
1690 1948 : Conversation::Impl::pull()
1691 : {
1692 1948 : auto& repo = repository_;
1693 :
1694 1948 : std::string deviceId, commitId;
1695 1948 : OnPullCb cb;
1696 : while (true) {
1697 1541242 : decltype(pullcbs_)::value_type pullcb;
1698 1541183 : decltype(fetchingRemotes_.begin()) it;
1699 : {
1700 1541143 : std::lock_guard lk(pullcbsMtx_);
1701 1542535 : if (pullcbs_.empty())
1702 3896 : return;
1703 1539385 : auto& elem = pullcbs_.front();
1704 1538307 : deviceId = std::move(std::get<0>(elem));
1705 1538489 : commitId = std::move(std::get<1>(elem));
1706 1538586 : cb = std::move(std::get<2>(elem));
1707 1539234 : pullcbs_.pop_front();
1708 :
1709 : // Check if already using this remote, if so, no need to pull yet
1710 : // One pull at a time to avoid any early EOF or fetch errors.
1711 1539045 : auto itr = fetchingRemotes_.emplace(deviceId);
1712 1539054 : if (!itr.second) {
1713 : // Go to next pull
1714 1536716 : pullcbs_.emplace_back(std::move(deviceId), std::move(commitId), std::move(cb));
1715 1537332 : continue;
1716 : }
1717 2338 : it = itr.first;
1718 1541618 : }
1719 : // If recently fetched, the commit can already be there, so no need to do complex operations
1720 2338 : if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1721 538 : cb(true);
1722 538 : std::lock_guard lk(pullcbsMtx_);
1723 538 : fetchingRemotes_.erase(it);
1724 538 : continue;
1725 538 : }
1726 : // Pull from remote
1727 1800 : auto fetched = repo->fetch(deviceId);
1728 : {
1729 1800 : std::lock_guard lk(pullcbsMtx_);
1730 1800 : fetchingRemotes_.erase(it);
1731 1800 : }
1732 :
1733 1800 : if (!fetched) {
1734 6 : cb(false);
1735 6 : continue;
1736 : }
1737 1794 : auto oldHead = repo->getHead();
1738 1794 : std::string newHead = oldHead;
1739 1794 : std::unique_lock lk(writeMtx_);
1740 1794 : auto commits = mergeHistory(deviceId);
1741 1794 : if (!commits.empty()) {
1742 897 : newHead = commits.rbegin()->at("id");
1743 : // Note: Because clients needs to linearize the history, they need to know all commits
1744 : // that can be updated.
1745 : // In this case, all commits until the common merge base should be announced.
1746 : // The client ill need to update it's model after this.
1747 897 : std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1748 897 : auto newHeadCommit = repo->getCommit(newHead);
1749 897 : if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1750 10 : mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1751 10 : LogOptions options;
1752 10 : options.to = mergeBase;
1753 10 : auto updatedCommits = loadMessages(options);
1754 : // We announce commits from oldest to update to newest. This generally avoid
1755 : // to get detached commits until they are all announced.
1756 10 : std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1757 10 : announce(updatedCommits);
1758 10 : } else {
1759 887 : announce(commits);
1760 : }
1761 897 : }
1762 1794 : lk.unlock();
1763 1794 : if (cb)
1764 1794 : cb(true);
1765 : // Announce if profile changed
1766 1794 : if (oldHead != newHead) {
1767 897 : auto diffStats = repo->diffStats(newHead, oldHead);
1768 897 : auto changedFiles = repo->changedFiles(diffStats);
1769 897 : if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1770 1794 : != changedFiles.end()) {
1771 5 : if (auto account = account_.lock())
1772 5 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
1773 10 : account->getAccountID(), repo->id(), repo->infos());
1774 : }
1775 897 : }
1776 3081885 : }
1777 1837 : }
1778 :
1779 : void
1780 2361 : Conversation::sync(const std::string& member,
1781 : const std::string& deviceId,
1782 : OnPullCb&& cb,
1783 : std::string commitId)
1784 : {
1785 2361 : if (!pull(deviceId, std::move(cb), commitId))
1786 23 : return;
1787 2338 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
1788 2338 : auto sthis = w.lock();
1789 2338 : if (auto account = a.lock()) {
1790 : // For waiting request, downloadFile
1791 2338 : for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1792 0 : auto path = fileutils::get_data_dir() / account->getAccountID()
1793 0 : / "conversation_data" / sthis->id() / wr.fileId;
1794 0 : auto start = fileutils::size(path.string());
1795 0 : if (start < 0)
1796 0 : start = 0;
1797 0 : sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId, start);
1798 2338 : }
1799 2338 : }
1800 2337 : });
1801 : }
1802 :
1803 : std::map<std::string, std::string>
1804 525 : Conversation::generateInvitation() const
1805 : {
1806 : // Invite the new member to the conversation
1807 525 : Json::Value root;
1808 525 : auto& metadata = root[ConversationMapKeys::METADATAS];
1809 1054 : for (const auto& [k, v] : infos()) {
1810 529 : if (v.size() >= 64000) {
1811 0 : JAMI_WARNING("Cutting invite because the SIP message will be too long");
1812 0 : continue;
1813 0 : }
1814 529 : metadata[k] = v;
1815 525 : }
1816 525 : root[ConversationMapKeys::CONVERSATIONID] = id();
1817 1575 : return {{"application/invite+json", Json::writeString(jsonBuilder, root)}};
1818 525 : }
1819 :
1820 : std::string
1821 6 : Conversation::leave()
1822 : {
1823 6 : setRemovingFlag();
1824 6 : std::lock_guard lk(pimpl_->writeMtx_);
1825 12 : return pimpl_->repository_->leave();
1826 6 : }
1827 :
1828 : void
1829 11 : Conversation::setRemovingFlag()
1830 : {
1831 11 : pimpl_->isRemoving_ = true;
1832 11 : }
1833 :
1834 : bool
1835 5480 : Conversation::isRemoving()
1836 : {
1837 5480 : return pimpl_->isRemoving_;
1838 : }
1839 :
1840 : void
1841 22 : Conversation::erase()
1842 : {
1843 22 : if (pimpl_->conversationDataPath_ != "")
1844 22 : dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1845 22 : if (!pimpl_->repository_)
1846 0 : return;
1847 22 : std::lock_guard lk(pimpl_->writeMtx_);
1848 22 : pimpl_->repository_->erase();
1849 22 : }
1850 :
1851 : ConversationMode
1852 4967 : Conversation::mode() const
1853 : {
1854 4967 : return pimpl_->repository_->mode();
1855 : }
1856 :
1857 : std::vector<std::string>
1858 32 : Conversation::getInitialMembers() const
1859 : {
1860 32 : return pimpl_->repository_->getInitialMembers();
1861 : }
1862 :
1863 : bool
1864 0 : Conversation::isInitialMember(const std::string& uri) const
1865 : {
1866 0 : auto members = getInitialMembers();
1867 0 : return std::find(members.begin(), members.end(), uri) != members.end();
1868 0 : }
1869 :
1870 : void
1871 9 : Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
1872 : {
1873 9 : dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
1874 9 : if (auto sthis = w.lock()) {
1875 9 : auto& repo = sthis->pimpl_->repository_;
1876 9 : std::unique_lock lk(sthis->pimpl_->writeMtx_);
1877 9 : auto commit = repo->updateInfos(map);
1878 9 : sthis->pimpl_->announce(commit, true);
1879 9 : lk.unlock();
1880 9 : if (cb)
1881 9 : cb(!commit.empty(), commit);
1882 9 : if (auto account = sthis->pimpl_->account_.lock())
1883 9 : emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
1884 18 : account->getAccountID(), repo->id(), repo->infos());
1885 18 : }
1886 9 : });
1887 9 : }
1888 :
1889 : std::map<std::string, std::string>
1890 567 : Conversation::infos() const
1891 : {
1892 567 : return pimpl_->repository_->infos();
1893 : }
1894 :
1895 : void
1896 8 : Conversation::updatePreferences(const std::map<std::string, std::string>& map)
1897 : {
1898 8 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
1899 8 : auto prefs = map;
1900 8 : auto itLast = prefs.find(LAST_MODIFIED);
1901 8 : if (itLast != prefs.end()) {
1902 3 : if (std::filesystem::is_regular_file(filePath)) {
1903 1 : auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
1904 : try {
1905 1 : if (lastModified >= std::stoul(itLast->second))
1906 0 : return;
1907 0 : } catch (...) {
1908 0 : return;
1909 0 : }
1910 : }
1911 3 : prefs.erase(itLast);
1912 : }
1913 :
1914 8 : std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
1915 8 : msgpack::pack(file, prefs);
1916 16 : emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_,
1917 16 : id(),
1918 8 : std::move(prefs));
1919 8 : }
1920 :
1921 : std::map<std::string, std::string>
1922 130 : Conversation::preferences(bool includeLastModified) const
1923 : {
1924 : try {
1925 130 : std::map<std::string, std::string> preferences;
1926 130 : auto filePath = pimpl_->conversationDataPath_ / "preferences";
1927 248 : auto file = fileutils::loadFile(filePath);
1928 12 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
1929 12 : oh.get().convert(preferences);
1930 12 : if (includeLastModified)
1931 9 : preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
1932 12 : return preferences;
1933 366 : } catch (const std::exception& e) {
1934 118 : }
1935 118 : return {};
1936 : }
1937 :
1938 : std::vector<uint8_t>
1939 0 : Conversation::vCard() const
1940 : {
1941 : try {
1942 0 : return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
1943 0 : } catch (...) {
1944 0 : }
1945 0 : return {};
1946 : }
1947 :
1948 : std::shared_ptr<TransferManager>
1949 2432 : Conversation::dataTransfer() const
1950 : {
1951 2432 : return pimpl_->transferManager_;
1952 : }
1953 :
1954 : bool
1955 11 : Conversation::onFileChannelRequest(const std::string& member,
1956 : const std::string& fileId,
1957 : bool verifyShaSum) const
1958 : {
1959 11 : if (!isMember(member))
1960 0 : return false;
1961 :
1962 11 : auto sep = fileId.find('_');
1963 11 : if (sep == std::string::npos)
1964 0 : return false;
1965 :
1966 11 : auto interactionId = fileId.substr(0, sep);
1967 11 : auto commit = getCommit(interactionId);
1968 22 : if (commit == std::nullopt || commit->find("type") == commit->end()
1969 22 : || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
1970 22 : || commit->at("type") != "application/data-transfer+json")
1971 0 : return false;
1972 :
1973 11 : auto path = dataTransfer()->path(fileId);
1974 :
1975 11 : if (!std::filesystem::is_regular_file(path)) {
1976 : // Check if dangling symlink
1977 0 : if (std::filesystem::is_symlink(path)) {
1978 0 : dhtnet::fileutils::remove(path, true);
1979 : }
1980 0 : JAMI_DEBUG("[Account {:s}] {:s} asked for non existing file {} in {:s}",
1981 : pimpl_->accountId_,
1982 : member,
1983 : fileId,
1984 : id());
1985 0 : return false;
1986 : }
1987 : // Check that our file is correct before sending
1988 11 : if (verifyShaSum && commit->at("sha3sum") != fileutils::sha3File(path)) {
1989 3 : JAMI_DEBUG(
1990 : "[Account {:s}] {:s} asked for file {:s} in {:s}, but our version is not complete",
1991 : pimpl_->accountId_,
1992 : member,
1993 : fileId,
1994 : id());
1995 1 : return false;
1996 : }
1997 10 : return true;
1998 11 : }
1999 :
2000 : bool
2001 11 : Conversation::downloadFile(const std::string& interactionId,
2002 : const std::string& fileId,
2003 : const std::string& path,
2004 : const std::string&,
2005 : const std::string& deviceId,
2006 : std::size_t start,
2007 : std::size_t end)
2008 : {
2009 11 : auto commit = getCommit(interactionId);
2010 11 : if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2011 0 : JAMI_ERROR("Commit doesn't exists or is not a file transfer");
2012 0 : return false;
2013 : }
2014 11 : auto tid = commit->find("tid");
2015 11 : auto sha3sum = commit->find("sha3sum");
2016 11 : auto size_str = commit->find("totalSize");
2017 :
2018 11 : if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2019 0 : JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2020 0 : return false;
2021 : }
2022 :
2023 11 : auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2024 11 : if (totalSize < 0) {
2025 0 : JAMI_ERROR("Invalid file size {}", totalSize);
2026 0 : return false;
2027 : }
2028 :
2029 : // Be sure to not lock conversation
2030 22 : dht::ThreadPool().io().run([w = weak(),
2031 : deviceId,
2032 : fileId,
2033 : interactionId,
2034 11 : sha3sum = sha3sum->second,
2035 : path,
2036 : totalSize,
2037 : start,
2038 22 : end] {
2039 11 : if (auto shared = w.lock()) {
2040 11 : auto acc = shared->pimpl_->account_.lock();
2041 11 : if (!acc)
2042 0 : return;
2043 11 : shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2044 11 : acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2045 22 : }
2046 : });
2047 11 : return true;
2048 11 : }
2049 :
2050 : void
2051 1106 : Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2052 : {
2053 1106 : dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2054 1106 : auto sthis = w.lock();
2055 1106 : if (!sthis)
2056 0 : return;
2057 1106 : auto acc = sthis->pimpl_->account_.lock();
2058 : // Update fetched for Uri
2059 1106 : auto uri = sthis->uriFromDevice(deviceId);
2060 1106 : if (uri.empty() || !acc || uri == acc->getUsername())
2061 45 : return;
2062 : // When a user fetches a commit, the message is sent for this person
2063 1061 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2064 1196 : });
2065 1106 : }
2066 :
2067 :
2068 : void
2069 1103 : Conversation::Impl::updateStatus(const std::string& uri,
2070 : libjami::Account::MessageStates st,
2071 : const std::string& commitId,
2072 : const std::string& ts,
2073 : bool emit)
2074 : {
2075 : // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2076 1103 : LogOptions options;
2077 1103 : std::map<std::string, std::map<std::string, std::string>> newStatus;
2078 : {
2079 : // Update internal structures.
2080 1103 : std::lock_guard lk(messageStatusMtx_);
2081 1103 : auto& status = messagesStatus_[uri];
2082 1103 : auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2083 1103 : if (oldStatus == commitId)
2084 6 : return; // Nothing to do
2085 1097 : options.to = oldStatus;
2086 1097 : options.from = commitId;
2087 1097 : oldStatus = commitId;
2088 1097 : status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2089 1097 : saveStatus();
2090 1097 : if (emit)
2091 1070 : newStatus[uri].insert(status.begin(), status.end());
2092 1103 : }
2093 1097 : if (emit && messageStatusCb_) {
2094 1070 : messageStatusCb_(newStatus);
2095 : }
2096 : // Update messages status for all commit between the old and new one
2097 1097 : options.logIfNotFound = false;
2098 1097 : options.fastLog = true;
2099 1097 : History optHistory;
2100 1097 : std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status.
2101 1097 : auto res = loadMessages2(options, &optHistory);
2102 1097 : if (res.size() == 0) {
2103 : // In this case, commit is not received yet, so we cache it
2104 7 : futureStatus[commitId][uri] = static_cast<int32_t>(st);
2105 : }
2106 9768 : for (const auto& [cid, _]: optHistory.quickAccess) {
2107 8671 : auto message = loadedHistory_.quickAccess.find(cid);
2108 8671 : if (message != loadedHistory_.quickAccess.end()) {
2109 : // Update message and emit to client,
2110 2283 : message->second->status[uri] = static_cast<int32_t>(st);
2111 4566 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
2112 2283 : accountId_,
2113 2283 : repository_->id(),
2114 : uri,
2115 : cid,
2116 : static_cast<int>(st));
2117 : } else {
2118 : // In this case, commit is not loaded by client, so we cache it
2119 : // No need to emit to client, they will get a correct status on load.
2120 6388 : futureStatus[cid][uri] = static_cast<int32_t>(st);
2121 : }
2122 : }
2123 1109 : }
2124 :
2125 : bool
2126 17 : Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2127 : {
2128 17 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2129 17 : if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2130 2 : return false; // Nothing to do
2131 15 : dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2132 15 : auto sthis = w.lock();
2133 15 : if (!sthis)
2134 0 : return;
2135 15 : sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2136 15 : });
2137 15 : return true;
2138 17 : }
2139 :
2140 : std::map<std::string, std::map<std::string, std::string>>
2141 112 : Conversation::messageStatus() const
2142 : {
2143 112 : std::lock_guard lk(pimpl_->messageStatusMtx_);
2144 224 : return pimpl_->messagesStatus_;
2145 112 : }
2146 :
2147 : void
2148 58 : Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2149 : {
2150 58 : auto acc = pimpl_->account_.lock();
2151 58 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2152 58 : std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2153 130 : for (const auto& [uri, status] : messageStatus) {
2154 72 : auto& oldMs = pimpl_->messagesStatus_[uri];
2155 72 : if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2156 23 : if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2157 23 : stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2158 : }
2159 : }
2160 72 : if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2161 4 : if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2162 4 : stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2163 : }
2164 : }
2165 : }
2166 58 : lk.unlock();
2167 :
2168 85 : for (const auto& [status, uri, commitId, ts] : stVec) {
2169 27 : pimpl_->updateStatus(uri, status, commitId, ts);
2170 : }
2171 58 : }
2172 :
2173 : void
2174 380 : Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2175 : {
2176 380 : std::unique_lock lk(pimpl_->messageStatusMtx_);
2177 380 : pimpl_->messageStatusCb_ = cb;
2178 380 : }
2179 :
2180 : #ifdef LIBJAMI_TESTABLE
2181 : void
2182 504 : Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2183 : {
2184 504 : pimpl_->bootstrapCbTest_ = cb;
2185 504 : }
2186 : #endif
2187 :
2188 : void
2189 800 : Conversation::checkBootstrapMember(const asio::error_code& ec,
2190 : std::vector<std::map<std::string, std::string>> members)
2191 : {
2192 800 : if (ec == asio::error::operation_aborted)
2193 478 : return;
2194 701 : auto acc = pimpl_->account_.lock();
2195 701 : if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2196 77 : return;
2197 : // We bootstrap the DRT with devices who already wrote in the repository.
2198 : // However, in a conversation, a large number of devices may just watch
2199 : // the conversation, but never write any message.
2200 624 : std::string uri;
2201 931 : while (!members.empty()) {
2202 333 : auto member = members.back();
2203 333 : members.pop_back();
2204 333 : uri = member.at("uri");
2205 333 : if (uri != acc->getUsername()
2206 333 : && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2207 26 : break;
2208 333 : }
2209 302 : auto fallbackFailed = [](auto sthis) {
2210 906 : JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback failed. Wait for remote connections.",
2211 : sthis->pimpl_->toString(),
2212 : fmt::ptr(sthis->pimpl_->swarmManager_.get()));
2213 : #ifdef LIBJAMI_TESTABLE
2214 302 : if (sthis->pimpl_->bootstrapCbTest_)
2215 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2216 : #endif
2217 302 : };
2218 : // If members is empty, we finished the fallback un-successfully
2219 624 : if (members.empty() && uri.empty()) {
2220 302 : fallbackFailed(this);
2221 302 : return;
2222 : }
2223 :
2224 : // Fallback, check devices of a member (we didn't check yet) in the conversation
2225 322 : pimpl_->checkedMembers_.emplace(uri);
2226 322 : auto devices = std::make_shared<std::vector<NodeId>>();
2227 1288 : acc->forEachDevice(
2228 644 : dht::InfoHash(uri),
2229 283 : [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2230 : // Test if already sent
2231 283 : if (auto sthis = w.lock()) {
2232 281 : if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2233 274 : devices->emplace_back(dev->getLongId());
2234 283 : }
2235 283 : },
2236 322 : [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2237 322 : auto sthis = w.lock();
2238 322 : if (!sthis)
2239 3 : return;
2240 319 : auto checkNext = true;
2241 319 : if (ok && devices->size() != 0) {
2242 : #ifdef LIBJAMI_TESTABLE
2243 272 : if (sthis->pimpl_->bootstrapCbTest_)
2244 5 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2245 : #endif
2246 816 : JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback with member: {}",
2247 : sthis->pimpl_->toString(),
2248 : fmt::ptr(sthis->pimpl_->swarmManager_.get()),
2249 : uri);
2250 272 : if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2251 0 : checkNext = false;
2252 : }
2253 319 : if (checkNext) {
2254 : // Check next member
2255 319 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2256 638 : sthis->pimpl_->fallbackTimer_->async_wait(
2257 638 : std::bind(&Conversation::checkBootstrapMember,
2258 : sthis,
2259 : std::placeholders::_1,
2260 319 : std::move(members)));
2261 : } else {
2262 : // In this case, all members are checked. Fallback failed
2263 0 : fallbackFailed(sthis);
2264 : }
2265 322 : });
2266 1003 : }
2267 :
2268 : void
2269 503 : Conversation::bootstrap(std::function<void()> onBootstraped,
2270 : const std::vector<DeviceId>& knownDevices)
2271 : {
2272 503 : if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2273 0 : return;
2274 : // Here, we bootstrap the DRT with devices who already wrote in the conversation
2275 : // If this doesn't work, it will try to fallback with checkBootstrapMember
2276 : // If it works, the callback onConnectionChanged will be called with ok=true
2277 503 : pimpl_->bootstrapCb_ = std::move(onBootstraped);
2278 503 : std::vector<DeviceId> devices = knownDevices;
2279 1510 : for (const auto& m : pimpl_->repository_->devices())
2280 1510 : devices.insert(devices.end(), m.second.begin(), m.second.end());
2281 1509 : JAMI_DEBUG("{}[SwarmManager {}] Bootstrap with {} devices",
2282 : pimpl_->toString(),
2283 : fmt::ptr(pimpl_->swarmManager_.get()),
2284 : devices.size());
2285 : // set callback
2286 503 : auto fallback = [](auto sthis, bool now = false) {
2287 : // Fallback
2288 503 : auto acc = sthis->pimpl_->account_.lock();
2289 503 : if (!acc)
2290 0 : return;
2291 503 : auto members = sthis->getMembers(false, false);
2292 503 : std::shuffle(members.begin(), members.end(), acc->rand);
2293 503 : if (now) {
2294 282 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2295 : } else {
2296 221 : auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2297 663 : sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2298 442 : - std::chrono::seconds(timeForBootstrap));
2299 663 : JAMI_DEBUG("{}[SwarmManager {}] Fallback in {} seconds",
2300 : sthis->pimpl_->toString(),
2301 : fmt::ptr(sthis->pimpl_->swarmManager_.get()),
2302 : (20 - timeForBootstrap));
2303 : }
2304 1006 : sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2305 : sthis,
2306 : std::placeholders::_1,
2307 503 : std::move(members)));
2308 503 : };
2309 :
2310 503 : pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2311 : // This will call methods from accounts, so trigger on another thread.
2312 673 : dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2313 673 : auto sthis = w.lock();
2314 673 : if (!sthis)
2315 0 : return;
2316 673 : if (ok) {
2317 : // Bootstrap succeeded!
2318 452 : sthis->pimpl_->checkedMembers_.clear();
2319 452 : if (sthis->pimpl_->bootstrapCb_)
2320 452 : sthis->pimpl_->bootstrapCb_();
2321 : #ifdef LIBJAMI_TESTABLE
2322 452 : if (sthis->pimpl_->bootstrapCbTest_)
2323 9 : sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2324 : #endif
2325 452 : return;
2326 : }
2327 221 : fallback(sthis);
2328 673 : });
2329 673 : });
2330 503 : pimpl_->checkedMembers_.clear();
2331 : // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2332 503 : if (pimpl_->swarmManager_->isShutdown()) {
2333 24 : pimpl_->swarmManager_->maintainBuckets();
2334 479 : } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2335 282 : fallback(this, true);
2336 : }
2337 503 : }
2338 :
2339 : std::vector<std::string>
2340 17 : Conversation::commitsEndedCalls()
2341 : {
2342 17 : pimpl_->loadActiveCalls();
2343 17 : pimpl_->loadHostedCalls();
2344 17 : auto commits = pimpl_->commitsEndedCalls();
2345 17 : if (!commits.empty()) {
2346 : // Announce to client
2347 0 : dht::ThreadPool::io().run([w = weak(), commits] {
2348 0 : if (auto sthis = w.lock())
2349 0 : sthis->pimpl_->announce(commits, true);
2350 0 : });
2351 : }
2352 17 : return commits;
2353 0 : }
2354 :
2355 : void
2356 380 : Conversation::onMembersChanged(OnMembersChanged&& cb)
2357 : {
2358 380 : pimpl_->onMembersChanged_ = std::move(cb);
2359 380 : }
2360 :
2361 : void
2362 380 : Conversation::onNeedSocket(NeedSocketCb needSocket)
2363 : {
2364 760 : pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2365 : w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2366 854 : if (auto sthis = w.lock())
2367 853 : needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2368 1614 : };
2369 380 : }
2370 :
2371 : void
2372 986 : Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2373 : {
2374 986 : auto deviceId = channel->deviceId();
2375 : // Transmit avatar if necessary
2376 : // We do this here, because at this point we know both sides are connected and in
2377 : // the same conversation
2378 : // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2379 986 : auto cert = channel->peerCertificate();
2380 986 : if (!cert || !cert->issuer)
2381 0 : return;
2382 986 : auto member = cert->issuer->getId().toString();
2383 986 : pimpl_->swarmManager_->addChannel(std::move(channel));
2384 986 : dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2385 985 : auto sthis = w.lock();
2386 986 : if (auto account = a.lock()) {
2387 984 : account->sendProfile(sthis->id(), member, deviceId.toString());
2388 986 : }
2389 986 : });
2390 986 : }
2391 :
2392 : uint32_t
2393 4 : Conversation::countInteractions(const std::string& toId,
2394 : const std::string& fromId,
2395 : const std::string& authorUri) const
2396 : {
2397 4 : LogOptions options;
2398 4 : options.to = toId;
2399 4 : options.from = fromId;
2400 4 : options.authorUri = authorUri;
2401 4 : options.logIfNotFound = false;
2402 4 : options.fastLog = true;
2403 4 : History history;
2404 4 : auto res = pimpl_->loadMessages2(options, &history);
2405 8 : return res.size();
2406 4 : }
2407 :
2408 : void
2409 4 : Conversation::search(uint32_t req,
2410 : const Filter& filter,
2411 : const std::shared_ptr<std::atomic_int>& flag) const
2412 : {
2413 : // Because logging a conversation can take quite some time,
2414 : // do it asynchronously
2415 4 : dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2416 4 : if (auto sthis = w.lock()) {
2417 4 : auto acc = sthis->pimpl_->account_.lock();
2418 4 : if (!acc)
2419 0 : return;
2420 :
2421 4 : History history;
2422 4 : std::vector<std::map<std::string, std::string>> commits {};
2423 : // std::regex_constants::ECMAScript is the default flag.
2424 4 : auto re = std::regex(filter.regexSearch,
2425 4 : filter.caseSensitive ? std::regex_constants::ECMAScript
2426 4 : : std::regex_constants::icase);
2427 4 : sthis->pimpl_->repository_->log(
2428 20 : [&](const auto& id, const auto& author, auto& commit) {
2429 100 : if (!filter.author.empty()
2430 20 : && filter.author != sthis->uriFromDevice(author.email)) {
2431 : // Filter author
2432 0 : return CallbackResult::Skip;
2433 : }
2434 20 : auto commitTime = git_commit_time(commit.get());
2435 20 : if (filter.before && filter.before < commitTime) {
2436 : // Only get commits before this date
2437 0 : return CallbackResult::Skip;
2438 : }
2439 20 : if (filter.after && filter.after > commitTime) {
2440 : // Only get commits before this date
2441 0 : if (git_commit_parentcount(commit.get()) <= 1)
2442 0 : return CallbackResult::Break;
2443 : else
2444 0 : return CallbackResult::Skip; // Because we are sorting it with
2445 : // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2446 : }
2447 :
2448 20 : return CallbackResult::Ok; // Continue
2449 : },
2450 20 : [&](auto&& cc) {
2451 40 : if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2452 40 : sthis->pimpl_->addToHistory({optMessage.value()}, false, false, &history);
2453 20 : },
2454 20 : [&](auto id, auto, auto) {
2455 20 : if (id == filter.lastId)
2456 0 : return true;
2457 20 : return false;
2458 : },
2459 : "",
2460 : false);
2461 : // Search on generated history
2462 24 : for (auto& message : history.messageList) {
2463 20 : auto contentType = message->type;
2464 20 : auto isSearchable = contentType == "text/plain"
2465 20 : || contentType == "application/data-transfer+json";
2466 20 : if (filter.type.empty() && !isSearchable) {
2467 : // Not searchable, at least for now
2468 8 : continue;
2469 12 : } else if (contentType == filter.type || filter.type.empty()) {
2470 12 : if (isSearchable) {
2471 : // If it's a text match the body, else the display name
2472 48 : auto body = contentType == "text/plain" ? message->body.at("body")
2473 36 : : message->body.at("displayName");
2474 12 : std::smatch body_match;
2475 12 : if (std::regex_search(body, body_match, re)) {
2476 5 : auto commit = message->body;
2477 5 : commit["id"] = message->id;
2478 5 : commit["type"] = message->type;
2479 5 : commits.emplace_back(commit);
2480 5 : }
2481 12 : } else {
2482 : // Matching type, just add it to the results
2483 0 : commits.emplace_back(message->body);
2484 : }
2485 :
2486 12 : if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2487 0 : break;
2488 : }
2489 20 : }
2490 :
2491 4 : if (commits.size() > 0)
2492 9 : emitSignal<libjami::ConversationSignal::MessagesFound>(req,
2493 3 : acc->getAccountID(),
2494 6 : sthis->id(),
2495 3 : std::move(commits));
2496 : // If we're the latest thread, inform client that the search is finished
2497 4 : if ((*flag)-- == 1 /* decrement return the old value */) {
2498 4 : emitSignal<libjami::ConversationSignal::MessagesFound>(
2499 : req,
2500 4 : acc->getAccountID(),
2501 8 : std::string {},
2502 8 : std::vector<std::map<std::string, std::string>> {});
2503 : }
2504 8 : }
2505 : });
2506 4 : }
2507 :
2508 : void
2509 14 : Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
2510 : {
2511 14 : if (!message.isMember("confId")) {
2512 0 : JAMI_ERR() << "Malformed commit";
2513 0 : return;
2514 : }
2515 :
2516 14 : auto now = std::chrono::system_clock::now();
2517 14 : auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2518 : {
2519 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2520 14 : pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2521 14 : pimpl_->saveHostedCalls();
2522 14 : }
2523 :
2524 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2525 : }
2526 :
2527 : bool
2528 14 : Conversation::isHosting(const std::string& confId) const
2529 : {
2530 14 : auto shared = pimpl_->account_.lock();
2531 14 : if (!shared)
2532 0 : return false;
2533 14 : auto info = infos();
2534 14 : if (info["rdvDevice"] == shared->currentDeviceId() && info["rdvHost"] == shared->getUsername())
2535 0 : return true; // We are the current device Host
2536 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2537 14 : return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2538 14 : }
2539 :
2540 : void
2541 14 : Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
2542 : {
2543 14 : if (!message.isMember("confId")) {
2544 0 : JAMI_ERR() << "Malformed commit";
2545 0 : return;
2546 : }
2547 :
2548 14 : auto erased = false;
2549 : {
2550 14 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2551 14 : erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2552 14 : }
2553 14 : if (erased) {
2554 14 : pimpl_->saveHostedCalls();
2555 14 : sendMessage(std::move(message), "", {}, std::move(cb));
2556 : } else
2557 0 : cb(false, "");
2558 : }
2559 :
2560 : std::vector<std::map<std::string, std::string>>
2561 39 : Conversation::currentCalls() const
2562 : {
2563 39 : std::lock_guard lk(pimpl_->activeCallsMtx_);
2564 78 : return pimpl_->activeCalls_;
2565 39 : }
2566 : } // namespace jami
|