Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "swarm_manager.h"
19 : #include <dhtnet/multiplexed_socket.h>
20 : #include <opendht/thread_pool.h>
21 :
22 : namespace jami {
23 :
24 : using namespace swarm_protocol;
25 :
26 577 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
27 577 : : id_(id)
28 577 : , rd(rand)
29 577 : , toConnectCb_(toConnectCb)
30 : {
31 577 : routing_table.setId(id);
32 577 : }
33 :
34 577 : SwarmManager::~SwarmManager()
35 : {
36 577 : if (!isShutdown_)
37 248 : shutdown();
38 577 : }
39 :
40 : bool
41 1854 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
42 : {
43 1854 : isShutdown_ = false;
44 1854 : std::vector<NodeId> newNodes;
45 : {
46 1854 : std::lock_guard lock(mutex);
47 7049 : for (const auto& nodeId : known_nodes) {
48 5195 : if (addKnownNode(nodeId)) {
49 2951 : newNodes.emplace_back(nodeId);
50 : }
51 : }
52 1854 : }
53 :
54 1854 : if (newNodes.empty())
55 1337 : return false;
56 :
57 517 : dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
58 517 : auto shared = w.lock();
59 517 : if (!shared)
60 0 : return;
61 : // If we detect a new node which already got a TCP link
62 : // we can use it to speed-up the bootstrap (because opening
63 : // a new channel will be easy)
64 517 : std::set<NodeId> toConnect;
65 3468 : for (const auto& nodeId : newNodes) {
66 2951 : if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
67 269 : toConnect.emplace(nodeId);
68 : }
69 517 : shared->maintainBuckets(toConnect);
70 517 : });
71 517 : return true;
72 1854 : }
73 :
74 : void
75 1032 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
76 : {
77 : {
78 1032 : std::lock_guard lock(mutex);
79 1045 : for (const auto& nodeId : mobile_nodes)
80 13 : addMobileNodes(nodeId);
81 1032 : }
82 1032 : }
83 :
84 : void
85 1512 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
86 : {
87 : // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
88 1512 : if (channel) {
89 1512 : auto emit = false;
90 : {
91 1512 : std::lock_guard lock(mutex);
92 1512 : emit = routing_table.findBucket(getId())->isEmpty();
93 1512 : auto bucket = routing_table.findBucket(channel->deviceId());
94 1512 : if (routing_table.addNode(channel, bucket)) {
95 1057 : std::error_code ec;
96 1057 : resetNodeExpiry(ec, channel, id_);
97 : }
98 1512 : }
99 1512 : receiveMessage(channel);
100 1512 : if (emit && onConnectionChanged_) {
101 : // If it's the first channel we add, we're now connected!
102 1008 : JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
103 336 : onConnectionChanged_(true);
104 : }
105 : }
106 1512 : }
107 :
108 : void
109 393 : SwarmManager::removeNode(const NodeId& nodeId)
110 : {
111 393 : std::unique_lock lk(mutex);
112 393 : if (isConnectedWith(nodeId)) {
113 374 : removeNodeInternal(nodeId);
114 374 : lk.unlock();
115 374 : maintainBuckets();
116 : }
117 393 : }
118 :
119 : void
120 176 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
121 : {
122 176 : std::lock_guard lock(mutex);
123 176 : auto bucket = routing_table.findBucket(nodeId);
124 176 : bucket->changeMobility(nodeId, isMobile);
125 176 : }
126 :
127 : bool
128 393 : SwarmManager::isConnectedWith(const NodeId& deviceId)
129 : {
130 393 : return routing_table.hasNode(deviceId);
131 : }
132 :
133 : void
134 633 : SwarmManager::shutdown()
135 : {
136 633 : if (isShutdown_) {
137 20 : return;
138 : }
139 613 : isShutdown_ = true;
140 613 : std::lock_guard lock(mutex);
141 613 : routing_table.shutdownAllNodes();
142 613 : }
143 :
144 : void
145 26 : SwarmManager::restart()
146 : {
147 26 : isShutdown_ = false;
148 26 : }
149 :
150 : bool
151 5195 : SwarmManager::addKnownNode(const NodeId& nodeId)
152 : {
153 5195 : return routing_table.addKnownNode(nodeId);
154 : }
155 :
156 : void
157 13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
158 : {
159 13 : if (id_ != nodeId) {
160 12 : routing_table.addMobileNode(nodeId);
161 : }
162 13 : }
163 :
164 : void
165 933 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
166 : {
167 933 : std::set<NodeId> nodes = toConnect;
168 933 : std::unique_lock lock(mutex);
169 933 : auto& buckets = routing_table.getBuckets();
170 2917 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
171 1984 : auto& bucket = *it;
172 1984 : bool myBucket = routing_table.contains(it, id_);
173 3035 : auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
174 1051 : : bucket.getConnectingNodesSize() + bucket.getNodesSize();
175 1984 : if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
176 : auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
177 1121 : rd);
178 2005 : for (auto& node : nodesToTry)
179 884 : routing_table.addConnectingNode(node);
180 :
181 1121 : nodes.insert(nodesToTry.begin(), nodesToTry.end());
182 1121 : }
183 : }
184 933 : lock.unlock();
185 1877 : for (auto& node : nodes)
186 944 : tryConnect(node);
187 933 : }
188 :
189 : void
190 1057 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
191 : const NodeId& nodeId,
192 : Query q,
193 : int numberNodes)
194 : {
195 1057 : dht::ThreadPool::io().run([socket, isMobile=isMobile_, nodeId, q, numberNodes] {
196 1057 : msgpack::sbuffer buffer;
197 1057 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
198 1057 : Message msg;
199 1057 : msg.is_mobile = isMobile;
200 1057 : msg.request = Request {q, numberNodes, nodeId};
201 1057 : pk.pack(msg);
202 :
203 1057 : std::error_code ec;
204 1057 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
205 1057 : if (ec) {
206 15 : JAMI_ERROR("{}", ec.message());
207 : }
208 1057 : });
209 1057 : }
210 :
211 : void
212 1031 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
213 : const Message& msg_)
214 : {
215 1031 : std::lock_guard lock(mutex);
216 :
217 1031 : if (msg_.request->q == Query::FIND) {
218 1031 : auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
219 1031 : auto bucket = routing_table.findBucket(msg_.request->nodeId);
220 1031 : const auto& m_nodes = bucket->getMobileNodes();
221 1031 : Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
222 :
223 1031 : Message msg;
224 1031 : msg.is_mobile = isMobile_;
225 1031 : msg.response = std::move(toResponse);
226 :
227 1031 : msgpack::sbuffer buffer((size_t) 60000);
228 1031 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
229 1031 : pk.pack(msg);
230 :
231 1031 : std::error_code ec;
232 1031 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
233 1031 : if (ec) {
234 0 : JAMI_ERROR("{}", ec.message());
235 0 : return;
236 : }
237 1031 : }
238 1031 : }
239 :
240 : void
241 1512 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
242 : {
243 : struct DecodingContext
244 : {
245 16742 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
246 : nullptr,
247 : 512};
248 : };
249 :
250 1512 : socket->setOnRecv([w = weak(),
251 : wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
252 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
253 2061 : ctx->pac.reserve_buffer(len);
254 2061 : std::copy_n(buf, len, ctx->pac.buffer());
255 2061 : ctx->pac.buffer_consumed(len);
256 :
257 2061 : msgpack::object_handle oh;
258 4122 : while (ctx->pac.next(oh)) {
259 2061 : auto shared = w.lock();
260 2061 : auto socket = wsocket.lock();
261 2061 : if (!shared || !socket)
262 0 : return size_t {0};
263 :
264 : try {
265 2061 : Message msg;
266 2061 : oh.get().convert(msg);
267 :
268 2061 : if (msg.is_mobile)
269 176 : shared->changeMobility(socket->deviceId(), msg.is_mobile);
270 :
271 2061 : if (msg.request) {
272 1031 : shared->sendAnswer(socket, msg);
273 :
274 1030 : } else if (msg.response) {
275 1030 : shared->setKnownNodes(msg.response->nodes);
276 1030 : shared->setMobileNodes(msg.response->mobile_nodes);
277 : }
278 :
279 2061 : } catch (const std::exception& e) {
280 0 : JAMI_WARNING("Error DRT recv: {}", e.what());
281 0 : return len;
282 0 : }
283 2061 : }
284 :
285 2061 : return len;
286 2061 : });
287 :
288 1512 : socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
289 1062 : dht::ThreadPool::io().run([w, deviceId] {
290 752 : auto shared = w.lock();
291 752 : if (shared && !shared->isShutdown_) {
292 393 : shared->removeNode(deviceId);
293 : }
294 752 : });
295 1062 : });
296 1512 : }
297 :
298 : void
299 1057 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
300 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
301 : NodeId node)
302 : {
303 1057 : NodeId idToFind;
304 1057 : std::list<Bucket>::iterator bucket;
305 :
306 1057 : if (ec == asio::error::operation_aborted)
307 0 : return;
308 :
309 1057 : if (!node) {
310 0 : bucket = routing_table.findBucket(socket->deviceId());
311 0 : idToFind = bucket->randomId(rd);
312 : } else {
313 1057 : bucket = routing_table.findBucket(node);
314 1057 : idToFind = node;
315 : }
316 :
317 1057 : sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
318 :
319 1057 : if (!node) {
320 0 : auto& nodeTimer = bucket->getNodeTimer(socket);
321 0 : nodeTimer.expires_after(FIND_PERIOD);
322 0 : nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
323 0 : shared_from_this(),
324 : std::placeholders::_1,
325 : socket,
326 0 : NodeId {}));
327 : }
328 : }
329 :
330 : void
331 944 : SwarmManager::tryConnect(const NodeId& nodeId)
332 : {
333 944 : if (needSocketCb_)
334 939 : needSocketCb_(nodeId.toString(),
335 889 : [w = weak(),
336 : nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
337 889 : auto shared = w.lock();
338 889 : if (!shared || shared->isShutdown_)
339 262 : return true;
340 627 : if (socket) {
341 572 : shared->addChannel(socket);
342 572 : return true;
343 : }
344 55 : std::unique_lock lk(shared->mutex);
345 55 : auto bucket = shared->routing_table.findBucket(nodeId);
346 55 : bucket->removeConnectingNode(nodeId);
347 55 : bucket->addKnownNode(nodeId);
348 55 : bucket = shared->routing_table.findBucket(shared->getId());
349 98 : if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
350 98 : && shared->onConnectionChanged_) {
351 29 : lk.unlock();
352 87 : JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed",
353 : fmt::ptr(shared.get()));
354 29 : shared->onConnectionChanged_(false);
355 : }
356 55 : return true;
357 889 : });
358 944 : }
359 :
360 : void
361 374 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
362 : {
363 374 : routing_table.removeNode(nodeId);
364 374 : }
365 :
366 : std::vector<NodeId>
367 0 : SwarmManager::getAllNodes() const
368 : {
369 0 : std::lock_guard lock(mutex);
370 0 : return routing_table.getAllNodes();
371 0 : }
372 :
373 : void
374 16 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
375 : {
376 : {
377 16 : std::lock_guard lock(mutex);
378 27 : for (const auto& node : nodes) {
379 11 : routing_table.deleteNode(node);
380 : }
381 16 : }
382 16 : maintainBuckets();
383 16 : }
384 :
385 : } // namespace jami
|