Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "sync_module.h"
19 :
20 : #include "jamidht/conversation_module.h"
21 : #include "jamidht/archive_account_manager.h"
22 : #include <dhtnet/multiplexed_socket.h>
23 :
24 : namespace jami {
25 :
26 : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
27 : {
28 : public:
29 : Impl(std::weak_ptr<JamiAccount>&& account);
30 :
31 : std::weak_ptr<JamiAccount> account_;
32 :
33 : // Sync connections
34 : std::recursive_mutex syncConnectionsMtx_;
35 : std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
36 :
37 : /**
38 : * Build SyncMsg and send it on socket
39 : * @param socket
40 : */
41 : void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
42 : const std::shared_ptr<SyncMsg>& syncMsg);
43 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
44 : };
45 :
46 383 : SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
47 383 : : account_(account)
48 383 : {}
49 :
50 : void
51 223 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
52 : const std::shared_ptr<SyncMsg>& syncMsg)
53 : {
54 223 : auto acc = account_.lock();
55 223 : if (!acc)
56 0 : return;
57 223 : msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
58 223 : std::error_code ec;
59 223 : if (!syncMsg) {
60 : // Send contacts infos
61 : // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
62 : // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
63 201 : if (auto info = acc->accountManager()->getInfo()) {
64 201 : if (info->contacts) {
65 201 : SyncMsg msg;
66 202 : msg.ds = info->contacts->getSyncData();
67 202 : msgpack::pack(buffer, msg);
68 202 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
69 : buffer.size(),
70 : ec);
71 202 : if (ec) {
72 3 : JAMI_ERROR("{:s}", ec.message());
73 1 : return;
74 : }
75 202 : }
76 : }
77 201 : buffer.clear();
78 : // Sync conversations
79 201 : auto c = ConversationModule::convInfos(acc->getAccountID());
80 201 : if (!c.empty()) {
81 92 : SyncMsg msg;
82 92 : msg.c = std::move(c);
83 92 : msgpack::pack(buffer, msg);
84 92 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
85 92 : if (ec) {
86 0 : JAMI_ERROR("{:s}", ec.message());
87 0 : return;
88 : }
89 92 : }
90 200 : buffer.clear();
91 : // Sync requests
92 200 : auto cr = ConversationModule::convRequests(acc->getAccountID());
93 201 : if (!cr.empty()) {
94 20 : SyncMsg msg;
95 20 : msg.cr = std::move(cr);
96 20 : msgpack::pack(buffer, msg);
97 20 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
98 20 : if (ec) {
99 0 : JAMI_ERROR("{:s}", ec.message());
100 0 : return;
101 : }
102 20 : }
103 :
104 201 : auto convModule = acc->convModule(true);
105 201 : if (!convModule)
106 0 : return;
107 : // Sync conversation's preferences
108 201 : auto p = convModule->convPreferences();
109 201 : if (!p.empty()) {
110 3 : SyncMsg msg;
111 3 : msg.p = std::move(p);
112 3 : msgpack::pack(buffer, msg);
113 3 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
114 3 : if (ec) {
115 0 : JAMI_ERROR("{:s}", ec.message());
116 0 : return;
117 : }
118 3 : }
119 201 : buffer.clear();
120 : // Sync read's status
121 201 : auto ms = convModule->convMessageStatus();
122 201 : if (!ms.empty()) {
123 57 : SyncMsg msg;
124 57 : msg.ms = std::move(ms);
125 57 : msgpack::pack(buffer, msg);
126 57 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
127 57 : if (ec) {
128 0 : JAMI_ERROR("{:s}", ec.message());
129 0 : return;
130 : }
131 57 : }
132 201 : buffer.clear();
133 :
134 201 : } else {
135 22 : msgpack::pack(buffer, *syncMsg);
136 22 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
137 22 : if (ec)
138 0 : JAMI_ERROR("{:s}", ec.message());
139 : }
140 225 : }
141 :
142 : ////////////////////////////////////////////////////////////////
143 :
144 383 : SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
145 383 : : pimpl_ {std::make_shared<Impl>(std::move(account))}
146 383 : {}
147 :
148 : void
149 138 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
150 : {
151 138 : std::lock_guard lk(syncConnectionsMtx_);
152 138 : auto connectionsIt = syncConnections_.find(device);
153 138 : if (connectionsIt == syncConnections_.end())
154 0 : return;
155 138 : auto& connections = connectionsIt->second;
156 138 : auto conn = std::find(connections.begin(), connections.end(), socket);
157 138 : if (conn != connections.end())
158 138 : connections.erase(conn);
159 138 : if (connections.empty())
160 72 : syncConnections_.erase(connectionsIt);
161 138 : }
162 :
163 : void
164 138 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
165 : const std::string& peerId,
166 : const DeviceId& device)
167 : {
168 138 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
169 138 : pimpl_->syncConnections_[device].emplace_back(socket);
170 :
171 138 : socket->onShutdown([w = pimpl_->weak_from_this(), device, s=std::weak_ptr(socket)]() {
172 138 : if (auto shared = w.lock())
173 138 : shared->onChannelShutdown(s.lock(), device);
174 138 : });
175 :
176 : struct DecodingContext
177 : {
178 6841 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
179 : nullptr,
180 : 512};
181 : };
182 :
183 138 : socket->setOnRecv([acc = pimpl_->account_.lock(), device, peerId,
184 : ctx = std::make_shared<DecodingContext>()
185 : ](const uint8_t* buf, size_t len) {
186 375 : if (!buf || !acc)
187 0 : return len;
188 :
189 375 : ctx->pac.reserve_buffer(len);
190 375 : std::copy_n(buf, len, ctx->pac.buffer());
191 375 : ctx->pac.buffer_consumed(len);
192 :
193 375 : msgpack::object_handle oh;
194 : try {
195 750 : while (ctx->pac.next(oh)) {
196 375 : SyncMsg msg;
197 375 : oh.get().convert(msg);
198 375 : if (auto manager = acc->accountManager())
199 375 : manager->onSyncData(std::move(msg.ds), false);
200 :
201 375 : if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
202 177 : if (auto cm = acc->convModule(true))
203 177 : cm->onSyncData(msg, peerId, device.toString());
204 375 : }
205 0 : } catch (const std::exception& e) {
206 0 : JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
207 0 : }
208 :
209 375 : return len;
210 375 : });
211 :
212 137 : pimpl_->syncInfos(socket, nullptr);
213 138 : }
214 :
215 : bool
216 112 : SyncModule::isConnected(const DeviceId& deviceId) const
217 : {
218 112 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
219 112 : auto it = pimpl_->syncConnections_.find(deviceId);
220 112 : if (it == pimpl_->syncConnections_.end())
221 83 : return false;
222 29 : return !it->second.empty();
223 112 : }
224 :
225 : void
226 1552 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
227 : {
228 1552 : std::lock_guard lk(pimpl_->syncConnectionsMtx_);
229 1638 : for (auto& [did, sockets] : pimpl_->syncConnections_) {
230 86 : if (not sockets.empty()) {
231 86 : if (!deviceId || deviceId == did) {
232 86 : pimpl_->syncInfos(sockets[0], syncMsg);
233 : }
234 : }
235 : }
236 1552 : }
237 : } // namespace jami
|