Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "routing_table.h"
19 :
20 : #include <dhtnet/multiplexed_socket.h>
21 : #include <opendht/infohash.h>
22 : #include <opendht/thread_pool.h>
23 :
24 : #include <math.h>
25 : #include <stdio.h>
26 : #include <iostream>
27 : #include <iterator>
28 : #include <stdlib.h>
29 : #include <time.h>
30 :
31 : using namespace std::placeholders;
32 :
33 : namespace jami {
34 :
35 : using namespace dht;
36 :
37 720 : Bucket::Bucket(const NodeId& id)
38 720 : : lowerLimit_(id)
39 720 : {}
40 :
41 : bool
42 832 : Bucket::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
43 : {
44 832 : return addNode(NodeInfo(socket));
45 : }
46 :
47 : bool
48 955 : Bucket::addNode(NodeInfo&& info)
49 : {
50 955 : auto nodeId = info.socket->deviceId();
51 956 : if (nodes.try_emplace(nodeId, std::move(info)).second) {
52 956 : connecting_nodes.erase(nodeId);
53 954 : known_nodes.erase(nodeId);
54 955 : mobile_nodes.erase(nodeId);
55 954 : return true;
56 : }
57 1 : return false;
58 : }
59 :
60 : bool
61 835 : Bucket::removeNode(const NodeId& nodeId)
62 : {
63 835 : auto node = nodes.find(nodeId);
64 835 : auto isMobile = node->second.isMobile_;
65 835 : if (node == nodes.end())
66 3 : return false;
67 832 : nodes.erase(nodeId);
68 832 : if (isMobile) {
69 5 : addMobileNode(nodeId);
70 : } else {
71 827 : addKnownNode(nodeId);
72 : }
73 :
74 831 : return true;
75 : }
76 :
77 : std::set<NodeId>
78 1591 : Bucket::getNodeIds() const
79 : {
80 1591 : std::set<NodeId> nodesId;
81 4821 : for (auto const& key : nodes)
82 3244 : nodesId.insert(key.first);
83 1590 : return nodesId;
84 0 : }
85 :
86 : bool
87 6904 : Bucket::hasNode(const NodeId& nodeId) const
88 : {
89 6904 : return nodes.find(nodeId) != nodes.end();
90 : }
91 :
92 : bool
93 4580 : Bucket::addKnownNode(const NodeId& nodeId)
94 : {
95 4580 : if (!hasNode(nodeId)) {
96 4125 : if (known_nodes.emplace(nodeId).second) {
97 3886 : return true;
98 : }
99 : }
100 700 : return false;
101 : }
102 :
103 : NodeId
104 163 : Bucket::getKnownNode(unsigned index) const
105 : {
106 163 : if (index > known_nodes.size()) {
107 1 : throw std::out_of_range("End of table for get known Node Id " + std::to_string(index));
108 : }
109 162 : auto it = known_nodes.begin();
110 162 : std::advance(it, index);
111 :
112 162 : return *it;
113 : }
114 :
115 : bool
116 18 : Bucket::addMobileNode(const NodeId& nodeId)
117 : {
118 18 : if (!hasNode(nodeId)) {
119 17 : if (mobile_nodes.emplace(nodeId).second) {
120 17 : known_nodes.erase(nodeId);
121 17 : return true;
122 : }
123 : }
124 1 : return false;
125 : }
126 :
127 : bool
128 808 : Bucket::addConnectingNode(const NodeId& nodeId)
129 : {
130 808 : if (!hasNode(nodeId)) {
131 809 : if (connecting_nodes.emplace(nodeId).second) {
132 765 : known_nodes.erase(nodeId);
133 766 : mobile_nodes.erase(nodeId);
134 765 : return true;
135 : }
136 : }
137 42 : return false;
138 : }
139 :
140 : std::set<NodeId>
141 820 : Bucket::getKnownNodesRandom(unsigned numberNodes, std::mt19937_64& rd) const
142 : {
143 820 : std::set<NodeId> nodesToReturn;
144 :
145 819 : if (getKnownNodesSize() <= numberNodes)
146 711 : return getKnownNodes();
147 :
148 109 : std::uniform_int_distribution<unsigned> distrib(0, getKnownNodesSize() - 1);
149 :
150 271 : while (nodesToReturn.size() < numberNodes) {
151 162 : nodesToReturn.emplace(getKnownNode(distrib(rd)));
152 : }
153 :
154 109 : return nodesToReturn;
155 819 : }
156 :
157 : asio::steady_timer&
158 0 : Bucket::getNodeTimer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
159 : {
160 0 : auto node = nodes.find(socket->deviceId());
161 0 : if (node == nodes.end()) {
162 0 : throw std::range_error("Unable to find timer " + socket->deviceId().toString());
163 : }
164 0 : return node->second.refresh_timer;
165 : }
166 :
167 : bool
168 14 : Bucket::shutdownNode(const NodeId& nodeId)
169 : {
170 14 : auto node = nodes.find(nodeId);
171 14 : if (node != nodes.end()) {
172 12 : auto& socket = node->second.socket;
173 12 : auto node = socket->deviceId();
174 24 : dht::ThreadPool::io().run([socket] { socket->shutdown(); });
175 12 : removeNode(node);
176 12 : return true;
177 : }
178 2 : return false;
179 : }
180 :
181 : void
182 755 : Bucket::shutdownAllNodes()
183 : {
184 1229 : while (not nodes.empty()) {
185 474 : auto it = nodes.begin();
186 822 : dht::ThreadPool::io().run([socket = it->second.socket] { socket->shutdown(); });
187 474 : auto nodeId = it->first;
188 474 : removeNode(nodeId);
189 : }
190 755 : }
191 :
192 : void
193 0 : Bucket::printBucket(unsigned number) const
194 : {
195 0 : JAMI_ERROR("BUCKET Number: {:d}", number);
196 :
197 0 : unsigned nodeNum = 1;
198 0 : for (auto it = nodes.begin(); it != nodes.end(); ++it) {
199 0 : JAMI_DEBUG("Node {:s} Id: {:s} isMobile: {:s}",
200 : std::to_string(nodeNum),
201 : it->first.toString(),
202 : std::to_string(it->second.isMobile_));
203 0 : nodeNum++;
204 : }
205 0 : JAMI_ERROR("Mobile Nodes");
206 0 : nodeNum = 0;
207 0 : for (auto it = mobile_nodes.begin(); it != mobile_nodes.end(); ++it) {
208 0 : JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
209 0 : nodeNum++;
210 : }
211 :
212 0 : JAMI_ERROR("Known Nodes");
213 0 : nodeNum = 0;
214 0 : for (auto it = known_nodes.begin(); it != known_nodes.end(); ++it) {
215 0 : JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
216 0 : nodeNum++;
217 : }
218 0 : JAMI_ERROR("Connecting_nodes");
219 0 : nodeNum = 0;
220 0 : for (auto it = connecting_nodes.begin(); it != connecting_nodes.end(); ++it) {
221 0 : JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
222 0 : nodeNum++;
223 : }
224 0 : };
225 :
226 : void
227 8 : Bucket::changeMobility(const NodeId& nodeId, bool isMobile)
228 : {
229 8 : auto itn = nodes.find(nodeId);
230 8 : if (itn != nodes.end()) {
231 8 : itn->second.isMobile_ = isMobile;
232 : }
233 8 : }
234 :
235 : // For tests
236 :
237 : std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>>
238 1 : Bucket::getNodeSockets() const
239 : {
240 1 : std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>> sockets;
241 3 : for (auto const& info : nodes)
242 2 : sockets.insert(info.second.socket);
243 1 : return sockets;
244 0 : }
245 :
246 : // ####################################################################################################
247 :
248 583 : RoutingTable::RoutingTable()
249 : {
250 583 : buckets.emplace_back(NodeId::zero());
251 583 : }
252 :
253 : bool
254 2311 : RoutingTable::isEmpty() const
255 : {
256 3273 : for (const auto& bucket : buckets) {
257 2357 : if (!bucket.isEmpty()) {
258 1395 : return false;
259 : }
260 : }
261 916 : return true;
262 : }
263 :
264 : bool
265 10 : RoutingTable::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
266 : {
267 10 : auto bucket = findBucket(socket->deviceId());
268 20 : return addNode(socket, bucket);
269 : }
270 :
271 : bool
272 1131 : RoutingTable::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel,
273 : std::list<Bucket>::iterator& bucket)
274 : {
275 1131 : NodeId nodeId = channel->deviceId();
276 :
277 1131 : if (bucket->hasNode(nodeId) || id_ == nodeId) {
278 331 : return false;
279 : }
280 :
281 935 : while (bucket->isFull()) {
282 204 : if (contains(bucket, id_)) {
283 135 : split(bucket);
284 135 : bucket = findBucket(nodeId);
285 :
286 : } else {
287 69 : return bucket->addNode(std::move(channel));
288 : }
289 : }
290 731 : return bucket->addNode(std::move(channel));
291 : }
292 :
293 : bool
294 346 : RoutingTable::removeNode(const NodeId& nodeId)
295 : {
296 346 : return findBucket(nodeId)->removeNode(nodeId);
297 : }
298 :
299 : bool
300 363 : RoutingTable::hasNode(const NodeId& nodeId)
301 : {
302 363 : return findBucket(nodeId)->hasNode(nodeId);
303 : }
304 :
305 : bool
306 4680 : RoutingTable::addKnownNode(const NodeId& nodeId)
307 : {
308 4680 : if (id_ == nodeId)
309 1232 : return false;
310 :
311 3449 : auto bucket = findBucket(nodeId);
312 3448 : if (bucket == buckets.end())
313 0 : return false;
314 :
315 3447 : return bucket->addKnownNode(nodeId);
316 : }
317 :
318 : bool
319 14 : RoutingTable::addMobileNode(const NodeId& nodeId)
320 : {
321 14 : if (id_ == nodeId)
322 1 : return false;
323 :
324 13 : auto bucket = findBucket(nodeId);
325 :
326 13 : if (bucket == buckets.end())
327 0 : return false;
328 :
329 13 : bucket->addMobileNode(nodeId);
330 13 : return true;
331 : }
332 :
333 : void
334 2 : RoutingTable::removeMobileNode(const NodeId& nodeId)
335 : {
336 2 : return findBucket(nodeId)->removeMobileNode(nodeId);
337 : }
338 :
339 : bool
340 5 : RoutingTable::hasMobileNode(const NodeId& nodeId)
341 : {
342 5 : return findBucket(nodeId)->hasMobileNode(nodeId);
343 : };
344 :
345 : bool
346 756 : RoutingTable::addConnectingNode(const NodeId& nodeId)
347 : {
348 756 : if (id_ == nodeId)
349 1 : return false;
350 :
351 755 : auto bucket = findBucket(nodeId);
352 :
353 754 : if (bucket == buckets.end())
354 0 : return 0;
355 :
356 754 : bucket->addConnectingNode(nodeId);
357 754 : return 1;
358 : }
359 :
360 : void
361 0 : RoutingTable::removeConnectingNode(const NodeId& nodeId)
362 : {
363 0 : findBucket(nodeId)->removeConnectingNode(nodeId);
364 0 : }
365 :
366 : std::list<Bucket>::iterator
367 9859 : RoutingTable::findBucket(const NodeId& nodeId)
368 : {
369 9859 : if (buckets.empty())
370 0 : throw std::runtime_error("No bucket");
371 :
372 9860 : auto b = buckets.begin();
373 :
374 : while (true) {
375 14104 : auto next = std::next(b);
376 14096 : if (next == buckets.end())
377 9856 : return b;
378 6347 : if (std::memcmp(nodeId.data(), next->getLowerLimit().data(), nodeId.size()) < 0)
379 2108 : return b;
380 4249 : b = next;
381 4249 : }
382 : }
383 :
384 : std::vector<NodeId>
385 712 : RoutingTable::closestNodes(const NodeId& nodeId, unsigned count)
386 : {
387 712 : std::vector<NodeId> closestNodes;
388 711 : auto bucket = findBucket(nodeId);
389 1488 : auto sortedBucketInsert = [&](const std::list<Bucket>::iterator& b) {
390 1488 : auto nodes = b->getNodeIds();
391 4547 : for (auto n : nodes) {
392 3063 : if (n != nodeId) {
393 2358 : auto here = std::find_if(closestNodes.begin(), closestNodes.end(), [&nodeId, &n](NodeId& NodeId) {
394 7679 : return nodeId.xorCmp(n, NodeId) < 0;
395 : });
396 :
397 2360 : closestNodes.insert(here, n);
398 : }
399 : }
400 1470 : };
401 :
402 712 : auto itn = bucket;
403 712 : auto itp = (bucket == buckets.begin()) ? buckets.end() : std::prev(bucket);
404 1933 : while (itn != buckets.end() || itp != buckets.end()) {
405 1220 : if (itn != buckets.end()) {
406 1072 : sortedBucketInsert(itn);
407 1070 : itn = std::next(itn);
408 : }
409 1221 : if (itp != buckets.end()) {
410 420 : sortedBucketInsert(itp);
411 419 : itp = (itp == buckets.begin()) ? buckets.end() : std::prev(itp);
412 : }
413 : }
414 :
415 713 : if (closestNodes.size() > count) {
416 304 : closestNodes.resize(count);
417 : }
418 :
419 1426 : return closestNodes;
420 0 : }
421 :
422 : void
423 0 : RoutingTable::printRoutingTable() const
424 : {
425 0 : int counter = 1;
426 0 : JAMI_DEBUG("SWARM: {:s} ", id_.toString());
427 0 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
428 0 : it->printBucket(counter);
429 0 : counter++;
430 : }
431 0 : JAMI_DEBUG("_____________________________________________________________________________");
432 0 : }
433 :
434 : void
435 2 : RoutingTable::shutdownNode(const NodeId& nodeId)
436 : {
437 2 : findBucket(nodeId)->shutdownNode(nodeId);
438 2 : }
439 :
440 : std::vector<NodeId>
441 31 : RoutingTable::getNodes() const
442 : {
443 31 : std::lock_guard lock(mutex_);
444 31 : std::vector<NodeId> ret;
445 111 : for (const auto& b : buckets) {
446 80 : const auto& nodes = b.getNodeIds();
447 80 : ret.insert(ret.end(), nodes.begin(), nodes.end());
448 80 : }
449 62 : return ret;
450 31 : }
451 :
452 : std::vector<NodeId>
453 1 : RoutingTable::getKnownNodes() const
454 : {
455 1 : std::vector<NodeId> ret;
456 2 : for (const auto& b : buckets) {
457 1 : const auto& nodes = b.getKnownNodes();
458 1 : ret.insert(ret.end(), nodes.begin(), nodes.end());
459 : }
460 1 : return ret;
461 0 : }
462 :
463 : std::vector<NodeId>
464 1 : RoutingTable::getMobileNodes() const
465 : {
466 1 : std::vector<NodeId> ret;
467 2 : for (const auto& b : buckets) {
468 1 : const auto& nodes = b.getMobileNodes();
469 1 : ret.insert(ret.end(), nodes.begin(), nodes.end());
470 : }
471 1 : return ret;
472 0 : }
473 :
474 : std::vector<NodeId>
475 1 : RoutingTable::getConnectingNodes() const
476 : {
477 1 : std::vector<NodeId> ret;
478 2 : for (const auto& b : buckets) {
479 1 : const auto& nodes = b.getConnectingNodes();
480 1 : ret.insert(ret.end(), nodes.begin(), nodes.end());
481 : }
482 1 : return ret;
483 0 : }
484 :
485 : std::vector<NodeId>
486 0 : RoutingTable::getBucketMobileNodes() const
487 : {
488 0 : std::vector<NodeId> ret;
489 0 : auto bucket = findBucket(id_);
490 0 : const auto& nodes = bucket->getMobileNodes();
491 0 : ret.insert(ret.end(), nodes.begin(), nodes.end());
492 :
493 0 : return ret;
494 0 : }
495 :
496 : bool
497 2405 : RoutingTable::contains(const std::list<Bucket>::iterator& bucket, const NodeId& nodeId) const
498 : {
499 2405 : return NodeId::cmp(bucket->getLowerLimit(), nodeId) <= 0
500 2410 : && (std::next(bucket) == buckets.end() || NodeId::cmp(nodeId, std::next(bucket)->getLowerLimit()) < 0);
501 : }
502 :
503 : std::vector<NodeId>
504 17 : RoutingTable::getAllNodes() const
505 : {
506 17 : std::vector<NodeId> ret;
507 34 : for (const auto& b : buckets) {
508 17 : const auto& nodes = b.getNodeIds();
509 17 : const auto& knownNodes = b.getKnownNodes();
510 17 : const auto& mobileNodes = b.getMobileNodes();
511 17 : const auto& connectingNodes = b.getConnectingNodes();
512 17 : ret.reserve(ret.size() + nodes.size() + knownNodes.size() + mobileNodes.size() + connectingNodes.size());
513 17 : ret.insert(ret.end(), nodes.begin(), nodes.end());
514 17 : ret.insert(ret.end(), knownNodes.begin(), knownNodes.end());
515 17 : ret.insert(ret.end(), mobileNodes.begin(), mobileNodes.end());
516 17 : ret.insert(ret.end(), connectingNodes.begin(), connectingNodes.end());
517 17 : }
518 17 : return ret;
519 0 : }
520 :
521 : std::vector<NodeId>
522 1721 : RoutingTable::getConnectedNodes() const
523 : {
524 1721 : std::vector<NodeId> ret;
525 5198 : for (const auto& b : buckets) {
526 3477 : const auto& nodes = b.getNodes();
527 3477 : const auto& mobiles = b.getMobileNodes();
528 3477 : ret.reserve(ret.size() + nodes.size() + mobiles.size());
529 10362 : for (const auto& n : nodes)
530 6886 : ret.emplace_back(n.first);
531 3478 : ret.insert(ret.end(), mobiles.begin(), mobiles.end());
532 : }
533 1721 : return ret;
534 0 : }
535 :
536 : void
537 11 : RoutingTable::deleteNode(const NodeId& nodeId)
538 : {
539 11 : auto bucket = findBucket(nodeId);
540 11 : bucket->shutdownNode(nodeId);
541 11 : bucket->removeConnectingNode(nodeId);
542 11 : bucket->removeKnownNode(nodeId);
543 11 : bucket->removeMobileNode(nodeId);
544 11 : }
545 :
546 : NodeId
547 135 : RoutingTable::middle(std::list<Bucket>::iterator& it) const
548 : {
549 135 : unsigned bit = depth(it);
550 135 : if (bit >= 8 * HASH_LEN)
551 0 : throw std::out_of_range("End of table");
552 :
553 135 : NodeId id = it->getLowerLimit();
554 135 : id.setBit(bit, true);
555 135 : return id;
556 : }
557 :
558 : unsigned
559 135 : RoutingTable::depth(std::list<Bucket>::iterator& bucket) const
560 : {
561 135 : int bit1 = bucket->getLowerLimit().lowbit();
562 135 : int bit2 = std::next(bucket) != buckets.end() ? std::next(bucket)->getLowerLimit().lowbit() : -1;
563 135 : return std::max(bit1, bit2) + 1;
564 : }
565 :
566 : bool
567 135 : RoutingTable::split(std::list<Bucket>::iterator& bucket)
568 : {
569 135 : NodeId id = middle(bucket);
570 135 : auto newBucketIt = buckets.emplace(std::next(bucket), id);
571 : // Re-assign nodes
572 135 : auto& nodeSwap = bucket->getNodes();
573 :
574 405 : for (auto it = nodeSwap.begin(); it != nodeSwap.end();) {
575 270 : auto& node = *it;
576 :
577 270 : auto nodeId = it->first;
578 :
579 270 : if (!contains(bucket, nodeId)) {
580 124 : newBucketIt->addNode(std::move(node.second));
581 124 : it = nodeSwap.erase(it);
582 : } else {
583 146 : ++it;
584 : }
585 : }
586 :
587 135 : auto connectingSwap = bucket->getConnectingNodes();
588 235 : for (auto it = connectingSwap.begin(); it != connectingSwap.end();) {
589 100 : auto nodeId = *it;
590 :
591 100 : if (!contains(bucket, nodeId)) {
592 52 : newBucketIt->addConnectingNode(nodeId);
593 52 : it = connectingSwap.erase(it);
594 52 : bucket->removeConnectingNode(nodeId);
595 : } else {
596 48 : ++it;
597 : }
598 : }
599 :
600 135 : auto knownSwap = bucket->getKnownNodes();
601 522 : for (auto it = knownSwap.begin(); it != knownSwap.end();) {
602 387 : auto nodeId = *it;
603 :
604 387 : if (!contains(bucket, nodeId)) {
605 234 : newBucketIt->addKnownNode(nodeId);
606 234 : it = knownSwap.erase(it);
607 234 : bucket->removeKnownNode(nodeId);
608 : } else {
609 153 : ++it;
610 : }
611 : }
612 :
613 135 : auto mobileSwap = bucket->getMobileNodes();
614 135 : for (auto it = mobileSwap.begin(); it != mobileSwap.end();) {
615 0 : auto nodeId = *it;
616 :
617 0 : if (!contains(bucket, nodeId)) {
618 0 : newBucketIt->addMobileNode(nodeId);
619 0 : it = mobileSwap.erase(it);
620 0 : bucket->removeMobileNode(nodeId);
621 : } else {
622 0 : ++it;
623 : }
624 : }
625 :
626 135 : return true;
627 135 : }
628 :
629 : } // namespace jami
|