Line data Source code
1 : /*
2 : * Copyright (C) 2021-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "sync_module.h"
22 :
23 : #include "jamidht/conversation_module.h"
24 : #include "jamidht/archive_account_manager.h"
25 : #include <dhtnet/multiplexed_socket.h>
26 :
27 : namespace jami {
28 :
29 : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
30 : {
31 : public:
32 : Impl(std::weak_ptr<JamiAccount>&& account);
33 :
34 : std::weak_ptr<JamiAccount> account_;
35 :
36 : // Sync connections
37 : std::recursive_mutex syncConnectionsMtx_;
38 : std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
39 :
40 168 : std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
41 :
42 : /**
43 : * Build SyncMsg and send it on socket
44 : * @param socket
45 : */
46 : void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
47 : const std::shared_ptr<SyncMsg>& syncMsg);
48 : };
49 :
50 289 : SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
51 289 : : account_(account)
52 289 : {}
53 :
54 : void
55 240 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
56 : const std::shared_ptr<SyncMsg>& syncMsg)
57 : {
58 240 : auto acc = account_.lock();
59 239 : if (!acc)
60 0 : return;
61 239 : Json::Value syncValue;
62 240 : msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
63 240 : std::error_code ec;
64 240 : if (!syncMsg) {
65 : // Send contacts infos
66 : // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
67 : // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all informations).
68 222 : if (auto info = acc->accountManager()->getInfo()) {
69 221 : if (info->contacts) {
70 222 : SyncMsg msg;
71 221 : msg.ds = info->contacts->getSyncData();
72 221 : msgpack::pack(buffer, msg);
73 221 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
74 : buffer.size(),
75 : ec);
76 222 : if (ec) {
77 0 : JAMI_ERROR("{:s}", ec.message());
78 0 : return;
79 : }
80 222 : }
81 : }
82 222 : buffer.clear();
83 : // Sync conversations
84 221 : auto c = ConversationModule::convInfos(acc->getAccountID());
85 222 : if (!c.empty()) {
86 86 : SyncMsg msg;
87 86 : msg.c = std::move(c);
88 86 : msgpack::pack(buffer, msg);
89 86 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
90 86 : if (ec) {
91 6 : JAMI_ERROR("{:s}", ec.message());
92 2 : return;
93 : }
94 86 : }
95 220 : buffer.clear();
96 : // Sync requests
97 220 : auto cr = ConversationModule::convRequests(acc->getAccountID());
98 220 : if (!cr.empty()) {
99 20 : SyncMsg msg;
100 20 : msg.cr = std::move(cr);
101 20 : msgpack::pack(buffer, msg);
102 20 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
103 20 : if (ec) {
104 0 : JAMI_ERROR("{:s}", ec.message());
105 0 : return;
106 : }
107 20 : }
108 :
109 220 : auto convModule = acc->convModule(true);
110 220 : if (!convModule)
111 0 : return;
112 : // Sync conversation's preferences
113 220 : auto p = convModule->convPreferences();
114 220 : if (!p.empty()) {
115 0 : SyncMsg msg;
116 0 : msg.p = std::move(p);
117 0 : msgpack::pack(buffer, msg);
118 0 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
119 0 : if (ec) {
120 0 : JAMI_ERROR("{:s}", ec.message());
121 0 : return;
122 : }
123 0 : }
124 220 : buffer.clear();
125 : // Sync read's status
126 220 : auto ms = convModule->convMessageStatus();
127 220 : if (!ms.empty()) {
128 54 : SyncMsg msg;
129 54 : msg.ms = std::move(ms);
130 54 : msgpack::pack(buffer, msg);
131 54 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
132 54 : if (ec) {
133 0 : JAMI_ERROR("{:s}", ec.message());
134 0 : return;
135 : }
136 54 : }
137 220 : buffer.clear();
138 :
139 222 : } else {
140 18 : msgpack::pack(buffer, *syncMsg);
141 18 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
142 18 : if (ec)
143 0 : JAMI_ERROR("{:s}", ec.message());
144 : }
145 244 : }
146 :
147 : ////////////////////////////////////////////////////////////////
148 :
149 289 : SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
150 289 : : pimpl_ {std::make_shared<Impl>(std::move(account))}
151 289 : {}
152 :
153 : void
154 111 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
155 : const std::string& peerId,
156 : const DeviceId& device)
157 : {
158 111 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
159 112 : pimpl_->syncConnections_[device].emplace_back(socket);
160 :
161 112 : socket->onShutdown([w = pimpl_->weak(), peerId, device, socket]() {
162 56 : auto shared = w.lock();
163 56 : if (!shared)
164 0 : return;
165 56 : std::lock_guard lk(shared->syncConnectionsMtx_);
166 56 : auto& connections = shared->syncConnections_[device];
167 56 : auto conn = connections.begin();
168 166 : while (conn != connections.end()) {
169 110 : if (*conn == socket)
170 56 : conn = connections.erase(conn);
171 : else
172 54 : conn++;
173 : }
174 56 : });
175 :
176 :
177 : struct DecodingContext
178 : {
179 7071 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
180 : nullptr,
181 : 512};
182 : };
183 :
184 112 : socket->setOnRecv([acc = pimpl_->account_.lock(), device, peerId,
185 : ctx = std::make_shared<DecodingContext>()
186 : ](const uint8_t* buf, size_t len) {
187 384 : if (!buf || !acc)
188 0 : return len;
189 :
190 384 : ctx->pac.reserve_buffer(len);
191 384 : std::copy_n(buf, len, ctx->pac.buffer());
192 384 : ctx->pac.buffer_consumed(len);
193 :
194 384 : msgpack::object_handle oh;
195 384 : SyncMsg msg;
196 :
197 : try {
198 769 : while (ctx->pac.next(oh)) {
199 384 : oh.get().convert(msg);
200 384 : if (auto manager = acc->accountManager())
201 382 : manager->onSyncData(std::move(msg.ds), false);
202 :
203 384 : if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
204 162 : if (auto cm = acc->convModule(true))
205 162 : cm->onSyncData(msg, peerId, device.toString());
206 : }
207 0 : } catch (const std::exception& e) {
208 0 : JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
209 0 : }
210 :
211 384 : return len;
212 384 : });
213 :
214 112 : pimpl_->syncInfos(socket, nullptr);
215 111 : }
216 :
217 : void
218 56 : SyncModule::syncWith(const DeviceId& deviceId,
219 : const std::shared_ptr<dhtnet::ChannelSocket>& socket,
220 : const std::shared_ptr<SyncMsg>& syncMsg)
221 : {
222 56 : if (!socket)
223 0 : return;
224 : {
225 56 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
226 56 : socket->onShutdown([w = pimpl_->weak(), socket, deviceId]() {
227 : // When sock is shutdown update syncConnections_ to be able to resync asap
228 56 : auto shared = w.lock();
229 56 : if (!shared)
230 0 : return;
231 56 : std::lock_guard lk(shared->syncConnectionsMtx_);
232 56 : auto& connections = shared->syncConnections_[deviceId];
233 56 : auto conn = connections.begin();
234 195 : while (conn != connections.end()) {
235 139 : if (*conn == socket)
236 112 : conn = connections.erase(conn);
237 : else
238 27 : conn++;
239 : }
240 56 : if (connections.empty()) {
241 29 : shared->syncConnections_.erase(deviceId);
242 : }
243 56 : });
244 56 : pimpl_->syncConnections_[deviceId].emplace_back(socket);
245 56 : }
246 56 : pimpl_->syncInfos(socket, syncMsg);
247 : }
248 :
249 : void
250 1326 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
251 : {
252 1326 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
253 1399 : for (auto& [did, sockets] : pimpl_->syncConnections_) {
254 73 : if (not sockets.empty()) {
255 72 : if (!deviceId || deviceId == did) {
256 72 : pimpl_->syncInfos(sockets[0], syncMsg);
257 : }
258 : }
259 : }
260 1326 : }
261 : } // namespace jami
|