Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "swarm_manager.h"
19 : #include <dhtnet/multiplexed_socket.h>
20 : #include <dhtnet/channel_utils.h>
21 : #include <opendht/thread_pool.h>
22 :
23 : namespace jami {
24 :
25 : using namespace swarm_protocol;
26 :
27 592 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
28 592 : : id_(id)
29 592 : , rd(rand)
30 592 : , toConnectCb_(toConnectCb)
31 : {
32 592 : routing_table.setId(id);
33 592 : }
34 :
35 592 : SwarmManager::~SwarmManager()
36 : {
37 592 : if (!isShutdown_)
38 248 : shutdown();
39 592 : }
40 :
41 : bool
42 2028 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
43 : {
44 2028 : isShutdown_ = false;
45 2026 : std::vector<NodeId> newNodes;
46 : {
47 2026 : std::lock_guard lock(mutex);
48 4757 : for (const auto& nodeId : known_nodes) {
49 2733 : if (addKnownNode(nodeId)) {
50 673 : newNodes.emplace_back(nodeId);
51 : }
52 : }
53 2018 : }
54 :
55 2029 : if (newNodes.empty())
56 1509 : return false;
57 :
58 518 : dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
59 518 : auto shared = w.lock();
60 517 : if (!shared)
61 0 : return;
62 : // If we detect a new node which already got a TCP link
63 : // we can use it to speed-up the bootstrap (because opening
64 : // a new channel will be easy)
65 516 : std::set<NodeId> toConnect;
66 1192 : for (const auto& nodeId : newNodes) {
67 673 : if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
68 143 : toConnect.emplace(nodeId);
69 : }
70 518 : shared->maintainBuckets(toConnect);
71 518 : });
72 519 : return true;
73 2028 : }
74 :
75 : void
76 1281 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
77 : {
78 : {
79 1281 : std::lock_guard lock(mutex);
80 1294 : for (const auto& nodeId : mobile_nodes)
81 13 : addMobileNodes(nodeId);
82 1281 : }
83 1281 : }
84 :
85 : void
86 2180 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
87 : {
88 : // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
89 2180 : if (channel) {
90 2180 : auto emit = false;
91 : {
92 2180 : std::lock_guard lock(mutex);
93 2178 : emit = routing_table.findBucket(getId())->isEmpty();
94 2179 : auto bucket = routing_table.findBucket(channel->deviceId());
95 2180 : if (routing_table.addNode(channel, bucket)) {
96 1308 : std::error_code ec;
97 1308 : resetNodeExpiry(ec, channel, id_);
98 : }
99 2179 : }
100 2179 : receiveMessage(channel);
101 2180 : if (emit && onConnectionChanged_) {
102 : // If it's the first channel we add, we're now connected!
103 1256 : JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
104 314 : onConnectionChanged_(true);
105 : }
106 : }
107 2180 : }
108 :
109 : void
110 585 : SwarmManager::removeNode(const NodeId& nodeId)
111 : {
112 585 : std::unique_lock lk(mutex);
113 585 : if (isConnectedWith(nodeId)) {
114 508 : removeNodeInternal(nodeId);
115 508 : lk.unlock();
116 507 : maintainBuckets();
117 : }
118 586 : }
119 :
120 : void
121 195 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
122 : {
123 195 : std::lock_guard lock(mutex);
124 196 : auto bucket = routing_table.findBucket(nodeId);
125 196 : bucket->changeMobility(nodeId, isMobile);
126 196 : }
127 :
128 : bool
129 1142 : SwarmManager::isConnectedWith(const NodeId& deviceId)
130 : {
131 1142 : return routing_table.hasNode(deviceId);
132 : }
133 :
134 : void
135 642 : SwarmManager::shutdown()
136 : {
137 642 : if (isShutdown_) {
138 16 : return;
139 : }
140 626 : isShutdown_ = true;
141 626 : std::lock_guard lock(mutex);
142 626 : routing_table.shutdownAllNodes();
143 626 : }
144 :
145 : void
146 22 : SwarmManager::restart()
147 : {
148 22 : isShutdown_ = false;
149 22 : }
150 :
151 : bool
152 3287 : SwarmManager::addKnownNode(const NodeId& nodeId)
153 : {
154 3287 : return routing_table.addKnownNode(nodeId);
155 : }
156 :
157 : void
158 13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
159 : {
160 13 : if (id_ != nodeId) {
161 12 : routing_table.addMobileNode(nodeId);
162 : }
163 13 : }
164 :
165 : void
166 1057 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
167 : {
168 1057 : std::set<NodeId> nodes = toConnect;
169 1057 : std::unique_lock lock(mutex);
170 1059 : auto& buckets = routing_table.getBuckets();
171 4063 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
172 3010 : auto& bucket = *it;
173 3007 : bool myBucket = routing_table.contains(it, id_);
174 4961 : auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
175 1955 : : bucket.getConnectingNodesSize() + bucket.getNodesSize();
176 3005 : if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
177 1651 : auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
178 2439 : for (auto& node : nodesToTry)
179 785 : routing_table.addConnectingNode(node);
180 :
181 1650 : nodes.insert(nodesToTry.begin(), nodesToTry.end());
182 1654 : }
183 : }
184 1059 : lock.unlock();
185 1864 : for (const auto& node : nodes)
186 805 : tryConnect(node);
187 1056 : }
188 :
189 : void
190 1308 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
191 : const NodeId& nodeId,
192 : Query q,
193 : int numberNodes)
194 : {
195 1308 : dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
196 1308 : msgpack::sbuffer buffer;
197 1308 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
198 1308 : Message msg;
199 1308 : msg.is_mobile = isMobile;
200 1308 : msg.request = Request {q, numberNodes, nodeId};
201 1308 : pk.pack(msg);
202 :
203 1308 : std::error_code ec;
204 1308 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
205 1308 : if (ec) {
206 4 : JAMI_ERROR("{}", ec.message());
207 : }
208 1308 : });
209 1308 : }
210 :
211 : void
212 1276 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
213 : {
214 1276 : std::lock_guard lock(mutex);
215 :
216 1278 : if (msg_.request->q == Query::FIND) {
217 1274 : auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
218 1283 : auto bucket = routing_table.findBucket(msg_.request->nodeId);
219 1281 : const auto& m_nodes = bucket->getMobileNodes();
220 2561 : Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
221 :
222 1278 : Message msg;
223 1278 : msg.is_mobile = isMobile_;
224 1278 : msg.response = std::move(toResponse);
225 :
226 1280 : msgpack::sbuffer buffer((size_t) 60000);
227 1284 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
228 1283 : pk.pack(msg);
229 :
230 1280 : std::error_code ec;
231 1279 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
232 1284 : if (ec) {
233 8 : JAMI_ERROR("{}", ec.message());
234 2 : return;
235 : }
236 1290 : }
237 1284 : }
238 :
239 : void
240 2179 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
241 : {
242 4359 : socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
243 4359 : [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
244 2551 : auto shared = w.lock();
245 2548 : auto socket = wsocket.lock();
246 2555 : if (!shared || !socket)
247 0 : return std::make_error_code(std::errc::operation_canceled);
248 :
249 2548 : if (msg.is_mobile)
250 195 : shared->changeMobility(socket->deviceId(), msg.is_mobile);
251 :
252 2549 : if (msg.request) {
253 1276 : shared->sendAnswer(socket, msg);
254 :
255 1274 : } else if (msg.response) {
256 1275 : shared->setKnownNodes(msg.response->nodes);
257 1280 : shared->setMobileNodes(msg.response->mobile_nodes);
258 : }
259 2562 : return std::error_code();
260 2561 : }));
261 :
262 2180 : socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
263 1111 : dht::ThreadPool::io().run([w, deviceId] {
264 1114 : auto shared = w.lock();
265 1114 : if (shared && !shared->isShutdown_) {
266 586 : shared->removeNode(deviceId);
267 : }
268 1116 : });
269 1117 : });
270 2179 : }
271 :
272 : void
273 1308 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
274 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
275 : NodeId node)
276 : {
277 1308 : NodeId idToFind;
278 1308 : std::list<Bucket>::iterator bucket;
279 :
280 1308 : if (ec == asio::error::operation_aborted)
281 0 : return;
282 :
283 1308 : if (!node) {
284 0 : bucket = routing_table.findBucket(socket->deviceId());
285 0 : idToFind = bucket->randomId(rd);
286 : } else {
287 1308 : bucket = routing_table.findBucket(node);
288 1308 : idToFind = node;
289 : }
290 :
291 1308 : sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
292 :
293 1308 : if (!node) {
294 0 : auto& nodeTimer = bucket->getNodeTimer(socket);
295 0 : nodeTimer.expires_after(FIND_PERIOD);
296 0 : nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
297 0 : shared_from_this(),
298 : std::placeholders::_1,
299 : socket,
300 0 : NodeId {}));
301 : }
302 : }
303 :
304 : void
305 1359 : SwarmManager::tryConnect(const NodeId& nodeId, bool noNewSocket)
306 : {
307 1359 : if (needSocketCb_)
308 1351 : needSocketCb_(
309 2706 : nodeId.toString(),
310 2706 : [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
311 1267 : auto shared = w.lock();
312 1267 : if (!shared || shared->isShutdown_)
313 192 : return true;
314 1075 : if (socket) {
315 990 : shared->addChannel(socket);
316 990 : return true;
317 : }
318 85 : std::unique_lock lk(shared->mutex);
319 85 : auto bucket = shared->routing_table.findBucket(nodeId);
320 85 : bucket->removeConnectingNode(nodeId);
321 85 : bucket->addKnownNode(nodeId);
322 85 : bucket = shared->routing_table.findBucket(shared->getId());
323 85 : if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty() && shared->onConnectionChanged_) {
324 41 : lk.unlock();
325 164 : JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
326 41 : shared->onConnectionChanged_(false);
327 : }
328 85 : return true;
329 1267 : },
330 : noNewSocket);
331 1359 : }
332 :
333 : void
334 508 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
335 : {
336 508 : routing_table.removeNode(nodeId);
337 506 : }
338 :
339 : void
340 559 : SwarmManager::connectNode(const NodeId& nodeId)
341 : {
342 : {
343 559 : std::lock_guard lock(mutex);
344 559 : if (isShutdown_)
345 1 : return;
346 558 : if (isConnectedWith(nodeId))
347 4 : return;
348 554 : addKnownNode(nodeId);
349 554 : if (!routing_table.addConnectingNode(nodeId))
350 0 : return;
351 559 : }
352 554 : tryConnect(nodeId, true);
353 : }
354 :
355 : std::vector<NodeId>
356 16 : SwarmManager::getAllNodes() const
357 : {
358 16 : std::lock_guard lock(mutex);
359 32 : return routing_table.getAllNodes();
360 16 : }
361 :
362 : std::vector<NodeId>
363 1765 : SwarmManager::getConnectedNodes() const
364 : {
365 1765 : std::lock_guard lock(mutex);
366 3530 : return routing_table.getConnectedNodes();
367 1765 : }
368 :
369 : std::vector<std::map<std::string, std::string>>
370 0 : SwarmManager::getRoutingTableInfo() const
371 : {
372 0 : std::lock_guard lock(mutex);
373 0 : auto stats = routing_table.getRoutingTableStats();
374 0 : std::vector<std::map<std::string, std::string>> result;
375 0 : result.reserve(stats.size());
376 0 : for (const auto& stat : stats) {
377 0 : result.push_back({{"id", stat.id},
378 0 : {"device", stat.id},
379 0 : {"status", stat.status},
380 0 : {"remoteAddress", stat.remoteAddress},
381 0 : {"mobile", stat.isMobile ? "true" : "false"}});
382 0 : if (stat.connectionTime != std::chrono::system_clock::time_point::min()) {
383 0 : auto tt = std::chrono::system_clock::to_time_t(stat.connectionTime);
384 0 : result.back().emplace("connectionTime", std::to_string(tt));
385 : }
386 : }
387 0 : return result;
388 0 : }
389 :
390 : bool
391 2632 : SwarmManager::isConnected() const
392 : {
393 2632 : std::lock_guard lock(mutex);
394 5264 : return !routing_table.isEmpty();
395 2632 : }
396 :
397 : void
398 11 : SwarmManager::deleteNode(const std::vector<NodeId>& nodes)
399 : {
400 : {
401 11 : std::lock_guard lock(mutex);
402 22 : for (const auto& node : nodes) {
403 11 : routing_table.deleteNode(node);
404 : }
405 11 : }
406 11 : maintainBuckets();
407 11 : }
408 :
409 : } // namespace jami
|