Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "sync_module.h"
19 :
20 : #include "jamidht/conversation_module.h"
21 : #include "jamidht/archive_account_manager.h"
22 :
23 : #include <dhtnet/multiplexed_socket.h>
24 : #include <opendht/thread_pool.h>
25 :
26 : namespace jami {
27 :
28 : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
29 : {
30 : public:
31 : Impl(const std::shared_ptr<JamiAccount>& account);
32 :
33 : std::weak_ptr<JamiAccount> account_;
34 : const std::string accountId_;
35 :
36 : // Sync connections
37 : std::recursive_mutex syncConnectionsMtx_;
38 : std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
39 : syncConnections_;
40 :
41 : /**
42 : * Build SyncMsg and send it on socket
43 : * @param socket
44 : */
45 : void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
46 : const std::shared_ptr<SyncMsg>& syncMsg);
47 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
48 : const DeviceId& device);
49 : };
50 :
51 559 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
52 559 : : account_(account)
53 1118 : , accountId_ {account->getAccountID()}
54 559 : {}
55 :
56 : void
57 200 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
58 : const std::shared_ptr<SyncMsg>& syncMsg)
59 : {
60 200 : auto acc = account_.lock();
61 200 : if (!acc)
62 0 : return;
63 200 : msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
64 200 : std::error_code ec;
65 200 : if (!syncMsg) {
66 : // Send contacts infos
67 : // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
68 : // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
69 156 : if (auto info = acc->accountManager()->getInfo()) {
70 156 : if (info->contacts) {
71 156 : SyncMsg msg;
72 156 : msg.ds = info->contacts->getSyncData();
73 156 : msgpack::pack(buffer, msg);
74 156 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
75 : buffer.size(),
76 : ec);
77 156 : if (ec) {
78 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
79 : accountId_,
80 : socket->deviceId(),
81 : ec.message());
82 0 : return;
83 : }
84 156 : }
85 : }
86 156 : buffer.clear();
87 : // Sync conversations
88 156 : auto c = ConversationModule::convInfos(acc->getAccountID());
89 156 : if (!c.empty()) {
90 66 : SyncMsg msg;
91 66 : msg.c = std::move(c);
92 66 : msgpack::pack(buffer, msg);
93 66 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
94 66 : if (ec) {
95 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
96 : accountId_,
97 : socket->deviceId(),
98 : ec.message());
99 0 : return;
100 : }
101 66 : }
102 156 : buffer.clear();
103 : // Sync requests
104 156 : auto cr = ConversationModule::convRequests(acc->getAccountID());
105 156 : if (!cr.empty()) {
106 19 : SyncMsg msg;
107 19 : msg.cr = std::move(cr);
108 19 : msgpack::pack(buffer, msg);
109 19 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
110 19 : if (ec) {
111 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
112 : accountId_,
113 : socket->deviceId(),
114 : ec.message());
115 0 : return;
116 : }
117 19 : }
118 156 : buffer.clear();
119 156 : auto convModule = acc->convModule(true);
120 156 : if (!convModule)
121 0 : return;
122 : // Sync conversation's preferences
123 156 : auto p = convModule->convPreferences();
124 156 : if (!p.empty()) {
125 0 : SyncMsg msg;
126 0 : msg.p = std::move(p);
127 0 : msgpack::pack(buffer, msg);
128 0 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
129 0 : if (ec) {
130 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
131 : accountId_,
132 : socket->deviceId(),
133 : ec.message());
134 0 : return;
135 : }
136 0 : }
137 156 : buffer.clear();
138 : // Sync read's status
139 156 : auto ms = convModule->convMessageStatus();
140 156 : if (!ms.empty()) {
141 41 : SyncMsg msg;
142 41 : msg.ms = std::move(ms);
143 41 : msgpack::pack(buffer, msg);
144 41 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
145 41 : if (ec) {
146 3 : JAMI_ERROR("[Account {}] [device {}] {:s}",
147 : accountId_,
148 : socket->deviceId(),
149 : ec.message());
150 1 : return;
151 : }
152 41 : }
153 155 : buffer.clear();
154 :
155 159 : } else {
156 44 : msgpack::pack(buffer, *syncMsg);
157 44 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
158 44 : if (ec)
159 0 : JAMI_ERROR("[Account {}] [device {}] {:s}",
160 : accountId_,
161 : socket->deviceId(),
162 : ec.message());
163 : }
164 201 : }
165 :
166 : ////////////////////////////////////////////////////////////////
167 :
168 559 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
169 559 : : pimpl_ {std::make_shared<Impl>(account)}
170 559 : {}
171 :
172 : void
173 108 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
174 : const DeviceId& device)
175 : {
176 108 : std::lock_guard lk(syncConnectionsMtx_);
177 108 : auto connectionsIt = syncConnections_.find(device);
178 108 : if (connectionsIt == syncConnections_.end()) {
179 0 : JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.",
180 : accountId_,
181 : device.to_view());
182 0 : return;
183 : }
184 108 : auto& connections = connectionsIt->second;
185 108 : auto conn = std::find(connections.begin(), connections.end(), socket);
186 108 : if (conn != connections.end())
187 108 : connections.erase(conn);
188 324 : JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
189 : accountId_,
190 : device.to_view(),
191 : connections.size());
192 108 : if (connections.empty())
193 56 : syncConnections_.erase(connectionsIt);
194 108 : }
195 :
196 : void
197 108 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
198 : const std::string& peerId,
199 : const DeviceId& device)
200 : {
201 108 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
202 108 : pimpl_->syncConnections_[device].emplace_back(socket);
203 :
204 108 : socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
205 108 : if (auto shared = w.lock())
206 108 : shared->onChannelShutdown(s.lock(), device);
207 108 : });
208 :
209 : struct DecodingContext
210 : {
211 5784 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
212 : nullptr,
213 : 512};
214 : };
215 :
216 108 : socket->setOnRecv([acc = pimpl_->account_.lock(),
217 : device,
218 : peerId,
219 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
220 309 : if (!buf || !acc)
221 0 : return len;
222 :
223 309 : ctx->pac.reserve_buffer(len);
224 309 : std::copy_n(buf, len, ctx->pac.buffer());
225 309 : ctx->pac.buffer_consumed(len);
226 :
227 309 : msgpack::object_handle oh;
228 : try {
229 618 : while (ctx->pac.next(oh)) {
230 309 : SyncMsg msg;
231 309 : oh.get().convert(msg);
232 309 : if (auto manager = acc->accountManager())
233 309 : manager->onSyncData(std::move(msg.ds), false);
234 :
235 560 : if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty()
236 560 : || !msg.ms.empty())
237 127 : if (auto cm = acc->convModule(true))
238 127 : cm->onSyncData(msg, peerId, device.toString());
239 309 : }
240 0 : } catch (const std::exception& e) {
241 0 : JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
242 : acc->getAccountID(),
243 : device.to_view(),
244 : e.what());
245 0 : }
246 :
247 309 : return len;
248 309 : });
249 :
250 108 : dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket]() {
251 108 : if (auto s = w.lock())
252 108 : s->syncInfos(socket, nullptr);
253 108 : });
254 108 : }
255 :
256 : bool
257 111 : SyncModule::isConnected(const DeviceId& deviceId) const
258 : {
259 111 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
260 111 : auto it = pimpl_->syncConnections_.find(deviceId);
261 111 : if (it == pimpl_->syncConnections_.end())
262 104 : return false;
263 7 : return !it->second.empty();
264 111 : }
265 :
266 : void
267 2610 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
268 : {
269 2610 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
270 2610 : size_t count = 0;
271 2702 : for (const auto& [did, sockets] : pimpl_->syncConnections_) {
272 92 : if (not sockets.empty() and (!deviceId || deviceId == did)) {
273 92 : count++;
274 92 : dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
275 92 : if (auto sthis = w.lock())
276 92 : sthis->syncInfos(s, syncMsg);
277 92 : });
278 : }
279 : }
280 2610 : if (count == 0) {
281 7554 : JAMI_WARNING("[Account {}] [device {}] no sync connection.",
282 : pimpl_->accountId_,
283 : deviceId.toString());
284 : } else {
285 276 : JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices",
286 : pimpl_->accountId_,
287 : deviceId.to_view(),
288 : count);
289 : }
290 2610 : }
291 :
292 : } // namespace jami
|