Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "sync_module.h"
19 :
20 : #include "jamidht/conversation_module.h"
21 : #include "jamidht/archive_account_manager.h"
22 :
23 : #include <dhtnet/multiplexed_socket.h>
24 : #include <opendht/thread_pool.h>
25 :
26 : namespace jami {
27 :
28 : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
29 : {
30 : public:
31 : Impl(const std::shared_ptr<JamiAccount>& account);
32 :
33 : std::weak_ptr<JamiAccount> account_;
34 : const std::string accountId_;
35 :
36 : // Sync connections
37 : std::recursive_mutex syncConnectionsMtx_;
38 : std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
39 : syncConnections_;
40 :
41 : /**
42 : * Build SyncMsg and send it on socket
43 : * @param socket
44 : */
45 : void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
46 : const std::shared_ptr<SyncMsg>& syncMsg);
47 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
48 : const DeviceId& device);
49 : };
50 :
51 686 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
52 686 : : account_(account)
53 1372 : , accountId_ {account->getAccountID()}
54 686 : {}
55 :
56 : void
57 17905 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
58 : const std::shared_ptr<SyncMsg>& syncMsg)
59 : {
60 17905 : auto acc = account_.lock();
61 17905 : if (!acc)
62 0 : return;
63 17905 : msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
64 17905 : std::error_code ec;
65 17905 : if (!syncMsg) {
66 : // Send contacts infos
67 : // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
68 : // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
69 195 : if (auto info = acc->accountManager()->getInfo()) {
70 195 : if (info->contacts) {
71 195 : SyncMsg msg;
72 195 : msg.ds = info->contacts->getSyncData();
73 195 : msgpack::pack(buffer, msg);
74 195 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
75 : buffer.size(),
76 : ec);
77 195 : if (ec) {
78 3 : JAMI_ERROR("[Account {}] [device {}] {:s}",
79 : accountId_,
80 : socket->deviceId(),
81 : ec.message());
82 1 : return;
83 : }
84 195 : }
85 : }
86 194 : buffer.clear();
87 : // Sync conversations
88 194 : auto c = ConversationModule::convInfos(acc->getAccountID());
89 194 : if (!c.empty()) {
90 88 : SyncMsg msg;
91 88 : msg.c = std::move(c);
92 88 : msgpack::pack(buffer, msg);
93 88 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
94 88 : if (ec) {
95 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
96 : accountId_,
97 : socket->deviceId(),
98 : ec.message());
99 0 : return;
100 : }
101 88 : }
102 194 : buffer.clear();
103 : // Sync requests
104 194 : auto cr = ConversationModule::convRequests(acc->getAccountID());
105 194 : if (!cr.empty()) {
106 22 : SyncMsg msg;
107 22 : msg.cr = std::move(cr);
108 22 : msgpack::pack(buffer, msg);
109 22 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
110 22 : if (ec) {
111 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
112 : accountId_,
113 : socket->deviceId(),
114 : ec.message());
115 0 : return;
116 : }
117 22 : }
118 194 : buffer.clear();
119 194 : auto convModule = acc->convModule(true);
120 194 : if (!convModule)
121 0 : return;
122 : // Sync conversation's preferences
123 194 : auto p = convModule->convPreferences();
124 194 : if (!p.empty()) {
125 3 : SyncMsg msg;
126 3 : msg.p = std::move(p);
127 3 : msgpack::pack(buffer, msg);
128 3 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
129 3 : if (ec) {
130 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
131 : accountId_,
132 : socket->deviceId(),
133 : ec.message());
134 0 : return;
135 : }
136 3 : }
137 194 : buffer.clear();
138 : // Sync read's status
139 194 : auto ms = convModule->convMessageStatus();
140 194 : if (!ms.empty()) {
141 50 : SyncMsg msg;
142 50 : msg.ms = std::move(ms);
143 50 : msgpack::pack(buffer, msg);
144 50 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
145 50 : if (ec) {
146 6 : JAMI_ERROR("[Account {}] [device {}] {:s}",
147 : accountId_,
148 : socket->deviceId(),
149 : ec.message());
150 2 : return;
151 : }
152 50 : }
153 192 : buffer.clear();
154 :
155 200 : } else {
156 17710 : msgpack::pack(buffer, *syncMsg);
157 17710 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
158 17710 : if (ec)
159 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
160 : accountId_,
161 : socket->deviceId(),
162 : ec.message());
163 : }
164 17908 : }
165 :
166 : ////////////////////////////////////////////////////////////////
167 :
168 686 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
169 686 : : pimpl_ {std::make_shared<Impl>(account)}
170 686 : {}
171 :
172 : void
173 132 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
174 : const DeviceId& device)
175 : {
176 132 : std::lock_guard lk(syncConnectionsMtx_);
177 132 : auto connectionsIt = syncConnections_.find(device);
178 132 : if (connectionsIt == syncConnections_.end()) {
179 0 : JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.",
180 : accountId_,
181 : device.to_view());
182 0 : return;
183 : }
184 132 : auto& connections = connectionsIt->second;
185 132 : auto conn = std::find(connections.begin(), connections.end(), socket);
186 132 : if (conn != connections.end())
187 132 : connections.erase(conn);
188 396 : JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
189 : accountId_,
190 : device.to_view(),
191 : connections.size());
192 132 : if (connections.empty())
193 70 : syncConnections_.erase(connectionsIt);
194 132 : }
195 :
196 : void
197 132 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
198 : const std::string& peerId,
199 : const DeviceId& device)
200 : {
201 132 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
202 132 : pimpl_->syncConnections_[device].emplace_back(socket);
203 :
204 132 : socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
205 132 : if (auto shared = w.lock())
206 132 : shared->onChannelShutdown(s.lock(), device);
207 132 : });
208 :
209 : struct DecodingContext
210 : {
211 431083 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
212 : nullptr,
213 : 512};
214 : };
215 :
216 132 : socket->setOnRecv([acc = pimpl_->account_.lock(),
217 : device,
218 : peerId,
219 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
220 18050 : if (!buf || !acc)
221 0 : return len;
222 :
223 18050 : ctx->pac.reserve_buffer(len);
224 18050 : std::copy_n(buf, len, ctx->pac.buffer());
225 18050 : ctx->pac.buffer_consumed(len);
226 :
227 18050 : msgpack::object_handle oh;
228 : try {
229 36100 : while (ctx->pac.next(oh)) {
230 18050 : SyncMsg msg;
231 18050 : oh.get().convert(msg);
232 18050 : if (auto manager = acc->accountManager())
233 18050 : manager->onSyncData(std::move(msg.ds), false);
234 :
235 36019 : if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty()
236 36019 : || !msg.ms.empty())
237 170 : if (auto cm = acc->convModule(true))
238 170 : cm->onSyncData(msg, peerId, device.toString());
239 18050 : }
240 0 : } catch (const std::exception& e) {
241 0 : JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
242 : acc->getAccountID(),
243 : device.to_view(),
244 : e.what());
245 0 : }
246 :
247 18050 : return len;
248 18050 : });
249 :
250 132 : dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket]() {
251 132 : if (auto s = w.lock())
252 132 : s->syncInfos(socket, nullptr);
253 132 : });
254 132 : }
255 :
256 : bool
257 71 : SyncModule::isConnected(const DeviceId& deviceId) const
258 : {
259 71 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
260 71 : auto it = pimpl_->syncConnections_.find(deviceId);
261 71 : if (it == pimpl_->syncConnections_.end())
262 70 : return false;
263 1 : return !it->second.empty();
264 71 : }
265 :
266 : void
267 20073 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
268 : {
269 20073 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
270 20073 : size_t count = 0;
271 37846 : for (const auto& [did, sockets] : pimpl_->syncConnections_) {
272 17773 : if (not sockets.empty() and (!deviceId || deviceId == did)) {
273 17773 : count++;
274 17773 : dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
275 17773 : if (auto sthis = w.lock())
276 17773 : sthis->syncInfos(s, syncMsg);
277 17773 : });
278 : }
279 : }
280 20073 : if (count == 0) {
281 6900 : JAMI_WARNING("[Account {}] [device {}] no sync connection.",
282 : pimpl_->accountId_,
283 : deviceId.toString());
284 : } else {
285 53319 : JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices",
286 : pimpl_->accountId_,
287 : deviceId.to_view(),
288 : count);
289 : }
290 20073 : }
291 :
292 : } // namespace jami
|