Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "sync_module.h"
19 :
20 : #include "jamidht/conversation_module.h"
21 : #include "jamidht/archive_account_manager.h"
22 : #include <dhtnet/multiplexed_socket.h>
23 :
24 : namespace jami {
25 :
26 : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
27 : {
28 : public:
29 : Impl(std::weak_ptr<JamiAccount>&& account);
30 :
31 : std::weak_ptr<JamiAccount> account_;
32 :
33 : // Sync connections
34 : std::recursive_mutex syncConnectionsMtx_;
35 : std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
36 :
37 : /**
38 : * Build SyncMsg and send it on socket
39 : * @param socket
40 : */
41 : void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
42 : const std::shared_ptr<SyncMsg>& syncMsg);
43 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
44 : };
45 :
46 382 : SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
47 382 : : account_(account)
48 382 : {}
49 :
50 : void
51 233 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
52 : const std::shared_ptr<SyncMsg>& syncMsg)
53 : {
54 233 : auto acc = account_.lock();
55 233 : if (!acc)
56 0 : return;
57 233 : msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
58 233 : std::error_code ec;
59 233 : if (!syncMsg) {
60 : // Send contacts infos
61 : // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
62 : // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
63 211 : if (auto info = acc->accountManager()->getInfo()) {
64 211 : if (info->contacts) {
65 211 : SyncMsg msg;
66 210 : msg.ds = info->contacts->getSyncData();
67 210 : msgpack::pack(buffer, msg);
68 211 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
69 : buffer.size(),
70 : ec);
71 211 : if (ec) {
72 3 : JAMI_ERROR("{:s}", ec.message());
73 1 : return;
74 : }
75 211 : }
76 : }
77 210 : buffer.clear();
78 : // Sync conversations
79 210 : auto c = ConversationModule::convInfos(acc->getAccountID());
80 210 : if (!c.empty()) {
81 97 : SyncMsg msg;
82 97 : msg.c = std::move(c);
83 97 : msgpack::pack(buffer, msg);
84 97 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
85 97 : if (ec) {
86 0 : JAMI_ERROR("{:s}", ec.message());
87 0 : return;
88 : }
89 97 : }
90 210 : buffer.clear();
91 : // Sync requests
92 210 : auto cr = ConversationModule::convRequests(acc->getAccountID());
93 210 : if (!cr.empty()) {
94 23 : SyncMsg msg;
95 23 : msg.cr = std::move(cr);
96 23 : msgpack::pack(buffer, msg);
97 23 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
98 23 : if (ec) {
99 0 : JAMI_ERROR("{:s}", ec.message());
100 0 : return;
101 : }
102 23 : }
103 :
104 209 : auto convModule = acc->convModule(true);
105 209 : if (!convModule)
106 0 : return;
107 : // Sync conversation's preferences
108 209 : auto p = convModule->convPreferences();
109 210 : if (!p.empty()) {
110 3 : SyncMsg msg;
111 3 : msg.p = std::move(p);
112 3 : msgpack::pack(buffer, msg);
113 3 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
114 3 : if (ec) {
115 0 : JAMI_ERROR("{:s}", ec.message());
116 0 : return;
117 : }
118 3 : }
119 210 : buffer.clear();
120 : // Sync read's status
121 210 : auto ms = convModule->convMessageStatus();
122 210 : if (!ms.empty()) {
123 58 : SyncMsg msg;
124 58 : msg.ms = std::move(ms);
125 58 : msgpack::pack(buffer, msg);
126 58 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
127 58 : if (ec) {
128 0 : JAMI_ERROR("{:s}", ec.message());
129 0 : return;
130 : }
131 58 : }
132 210 : buffer.clear();
133 :
134 210 : } else {
135 22 : msgpack::pack(buffer, *syncMsg);
136 22 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
137 22 : if (ec)
138 0 : JAMI_ERROR("{:s}", ec.message());
139 : }
140 233 : }
141 :
142 : ////////////////////////////////////////////////////////////////
143 :
144 382 : SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
145 382 : : pimpl_ {std::make_shared<Impl>(std::move(account))}
146 382 : {}
147 :
148 : void
149 144 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
150 : {
151 144 : std::lock_guard lk(syncConnectionsMtx_);
152 144 : auto connectionsIt = syncConnections_.find(device);
153 144 : if (connectionsIt == syncConnections_.end())
154 0 : return;
155 144 : auto& connections = connectionsIt->second;
156 144 : auto conn = std::find(connections.begin(), connections.end(), socket);
157 144 : if (conn != connections.end())
158 144 : connections.erase(conn);
159 144 : if (connections.empty())
160 72 : syncConnections_.erase(connectionsIt);
161 144 : }
162 :
163 : void
164 144 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
165 : const std::string& peerId,
166 : const DeviceId& device)
167 : {
168 144 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
169 144 : pimpl_->syncConnections_[device].emplace_back(socket);
170 :
171 143 : socket->onShutdown([w = pimpl_->weak_from_this(), device, s=std::weak_ptr(socket)]() {
172 144 : if (auto shared = w.lock())
173 144 : shared->onChannelShutdown(s.lock(), device);
174 144 : });
175 :
176 : struct DecodingContext
177 : {
178 7233 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
179 : nullptr,
180 : 512};
181 : };
182 :
183 144 : socket->setOnRecv([acc = pimpl_->account_.lock(), device, peerId,
184 : ctx = std::make_shared<DecodingContext>()
185 : ](const uint8_t* buf, size_t len) {
186 395 : if (!buf || !acc)
187 0 : return len;
188 :
189 395 : ctx->pac.reserve_buffer(len);
190 395 : std::copy_n(buf, len, ctx->pac.buffer());
191 395 : ctx->pac.buffer_consumed(len);
192 :
193 395 : msgpack::object_handle oh;
194 : try {
195 790 : while (ctx->pac.next(oh)) {
196 395 : SyncMsg msg;
197 395 : oh.get().convert(msg);
198 394 : if (auto manager = acc->accountManager())
199 394 : manager->onSyncData(std::move(msg.ds), false);
200 :
201 395 : if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
202 186 : if (auto cm = acc->convModule(true))
203 186 : cm->onSyncData(msg, peerId, device.toString());
204 395 : }
205 0 : } catch (const std::exception& e) {
206 0 : JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
207 0 : }
208 :
209 395 : return len;
210 395 : });
211 :
212 144 : pimpl_->syncInfos(socket, nullptr);
213 144 : }
214 :
215 : bool
216 118 : SyncModule::isConnected(const DeviceId& deviceId) const
217 : {
218 118 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
219 118 : auto it = pimpl_->syncConnections_.find(deviceId);
220 118 : if (it == pimpl_->syncConnections_.end())
221 86 : return false;
222 32 : return !it->second.empty();
223 118 : }
224 :
225 : void
226 1555 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
227 : {
228 1555 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
229 1644 : for (auto& [did, sockets] : pimpl_->syncConnections_) {
230 89 : if (not sockets.empty()) {
231 89 : if (!deviceId || deviceId == did) {
232 89 : pimpl_->syncInfos(sockets[0], syncMsg);
233 : }
234 : }
235 : }
236 1555 : }
237 : } // namespace jami
|