Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "sync_module.h"
19 :
20 : #include "jamidht/conversation_module.h"
21 : #include "jamidht/archive_account_manager.h"
22 :
23 : #include <dhtnet/multiplexed_socket.h>
24 : #include <dhtnet/channel_utils.h>
25 : #include <opendht/thread_pool.h>
26 :
27 : namespace jami {
28 :
29 : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
30 : {
31 : public:
32 : Impl(const std::shared_ptr<JamiAccount>& account);
33 :
34 : std::weak_ptr<JamiAccount> account_;
35 : const std::string accountId_;
36 :
37 : // Sync connections
38 : std::recursive_mutex syncConnectionsMtx_;
39 : std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
40 :
41 : /**
42 : * Build SyncMsg and send it on socket
43 : * @param socket
44 : */
45 : void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::shared_ptr<SyncMsg>& syncMsg);
46 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
47 : };
48 :
49 657 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
50 657 : : account_(account)
51 1314 : , accountId_ {account->getAccountID()}
52 657 : {}
53 :
54 : void
55 231 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
56 : const std::shared_ptr<SyncMsg>& syncMsg)
57 : {
58 231 : auto acc = account_.lock();
59 231 : if (!acc)
60 0 : return;
61 231 : msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
62 231 : std::error_code ec;
63 231 : if (!syncMsg) {
64 : // Send contacts infos
65 : // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
66 : // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
67 188 : if (auto info = acc->accountManager()->getInfo()) {
68 188 : if (info->contacts) {
69 188 : SyncMsg msg;
70 187 : msg.ds = info->contacts->getSyncData();
71 188 : msgpack::pack(buffer, msg);
72 188 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
73 188 : if (ec) {
74 12 : JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
75 3 : return;
76 : }
77 188 : }
78 : }
79 185 : buffer.clear();
80 : // Sync conversations
81 185 : auto c = ConversationModule::convInfos(acc->getAccountID());
82 185 : if (!c.empty()) {
83 88 : SyncMsg msg;
84 88 : msg.c = std::move(c);
85 88 : msgpack::pack(buffer, msg);
86 88 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
87 88 : if (ec) {
88 0 : JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
89 0 : return;
90 : }
91 88 : }
92 185 : buffer.clear();
93 : // Sync requests
94 185 : auto cr = ConversationModule::convRequests(acc->getAccountID());
95 185 : if (!cr.empty()) {
96 19 : SyncMsg msg;
97 19 : msg.cr = std::move(cr);
98 19 : msgpack::pack(buffer, msg);
99 19 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
100 19 : if (ec) {
101 0 : JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
102 0 : return;
103 : }
104 19 : }
105 185 : buffer.clear();
106 185 : auto convModule = acc->convModule(true);
107 185 : if (!convModule)
108 0 : return;
109 : // Sync conversation's preferences
110 185 : auto p = convModule->convPreferences();
111 185 : if (!p.empty()) {
112 2 : SyncMsg msg;
113 2 : msg.p = std::move(p);
114 2 : msgpack::pack(buffer, msg);
115 2 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
116 2 : if (ec) {
117 0 : JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
118 0 : return;
119 : }
120 2 : }
121 185 : buffer.clear();
122 : // Sync read's status
123 184 : auto ms = convModule->convMessageStatus();
124 184 : if (!ms.empty()) {
125 29 : SyncMsg msg;
126 29 : msg.ms = std::move(ms);
127 29 : msgpack::pack(buffer, msg);
128 29 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
129 29 : if (ec) {
130 8 : JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
131 2 : return;
132 : }
133 29 : }
134 182 : buffer.clear();
135 :
136 191 : } else {
137 43 : msgpack::pack(buffer, *syncMsg);
138 43 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
139 43 : if (ec)
140 0 : JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
141 : }
142 236 : }
143 :
144 : ////////////////////////////////////////////////////////////////
145 :
146 657 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
147 657 : : pimpl_ {std::make_shared<Impl>(account)}
148 657 : {}
149 :
150 : void
151 128 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
152 : {
153 128 : std::lock_guard lk(syncConnectionsMtx_);
154 128 : auto connectionsIt = syncConnections_.find(device);
155 128 : if (connectionsIt == syncConnections_.end()) {
156 0 : JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.", accountId_, device.to_view());
157 0 : return;
158 : }
159 128 : auto& connections = connectionsIt->second;
160 128 : auto conn = std::find(connections.begin(), connections.end(), socket);
161 128 : if (conn != connections.end())
162 128 : connections.erase(conn);
163 512 : JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
164 : accountId_,
165 : device.to_view(),
166 : connections.size());
167 128 : if (connections.empty())
168 70 : syncConnections_.erase(connectionsIt);
169 128 : }
170 :
171 : void
172 128 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
173 : const std::string& peerId,
174 : const DeviceId& device)
175 : {
176 128 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
177 128 : pimpl_->syncConnections_[device].emplace_back(socket);
178 :
179 128 : socket->setOnRecv(dhtnet::buildMsgpackReader<SyncMsg>([acc = pimpl_->account_, device, peerId](SyncMsg&& msg) {
180 357 : auto account = acc.lock();
181 357 : if (!account)
182 0 : return std::make_error_code(std::errc::operation_canceled);
183 :
184 : try {
185 357 : if (auto manager = account->accountManager())
186 357 : manager->onSyncData(std::move(msg.ds), false);
187 :
188 357 : if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
189 154 : if (auto cm = account->convModule(true))
190 154 : cm->onSyncData(msg, peerId, device.toString());
191 0 : } catch (const std::exception& e) {
192 0 : JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
193 : account->getAccountID(),
194 : device.to_view(),
195 : e.what());
196 0 : }
197 357 : return std::error_code();
198 357 : }));
199 128 : socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)](const std::error_code&) {
200 128 : if (auto shared = w.lock())
201 128 : shared->onChannelShutdown(s.lock(), device);
202 128 : });
203 :
204 127 : dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket = std::move(socket)]() {
205 128 : if (auto s = w.lock())
206 128 : s->syncInfos(socket, nullptr);
207 128 : });
208 128 : }
209 :
210 : bool
211 72 : SyncModule::isConnected(const DeviceId& deviceId) const
212 : {
213 72 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
214 72 : auto it = pimpl_->syncConnections_.find(deviceId);
215 72 : if (it == pimpl_->syncConnections_.end())
216 68 : return false;
217 4 : return !it->second.empty();
218 72 : }
219 :
220 : void
221 2368 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
222 : {
223 2368 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
224 2368 : size_t count = 0;
225 2471 : for (const auto& [did, sockets] : pimpl_->syncConnections_) {
226 103 : if (not sockets.empty() and (!deviceId || deviceId == did)) {
227 103 : count++;
228 103 : dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
229 103 : if (auto sthis = w.lock())
230 103 : sthis->syncInfos(s, syncMsg);
231 103 : });
232 : }
233 : }
234 2368 : if (count == 0) {
235 9060 : JAMI_WARNING("[Account {}] [device {}] no sync connection.", pimpl_->accountId_, deviceId.toString());
236 : } else {
237 412 : JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices", pimpl_->accountId_, deviceId.to_view(), count);
238 : }
239 2368 : }
240 :
241 : } // namespace jami
|