Line data Source code
1 : /*
2 : * Copyright (C) 2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "routing_table.h"
22 :
23 : #include <dhtnet/multiplexed_socket.h>
24 : #include <opendht/infohash.h>
25 :
26 : #include <math.h>
27 : #include <stdio.h>
28 : #include <iostream>
29 : #include <iterator>
30 : #include <stdlib.h>
31 : #include <time.h>
32 :
33 : constexpr const std::chrono::minutes FIND_PERIOD {10};
34 : using namespace std::placeholders;
35 :
36 : namespace jami {
37 :
38 : using namespace dht;
39 :
40 714 : Bucket::Bucket(const NodeId& id)
41 714 : : lowerLimit_(id)
42 714 : {}
43 :
44 : bool
45 1411 : Bucket::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
46 : {
47 1411 : return addNode(NodeInfo(socket));
48 : }
49 :
50 : bool
51 1647 : Bucket::addNode(NodeInfo&& info)
52 : {
53 1647 : auto nodeId = info.socket->deviceId();
54 1648 : if (nodes.try_emplace(nodeId, std::move(info)).second) {
55 1646 : connecting_nodes.erase(nodeId);
56 1645 : known_nodes.erase(nodeId);
57 1646 : mobile_nodes.erase(nodeId);
58 1646 : return true;
59 : }
60 1 : return false;
61 : }
62 :
63 : bool
64 1414 : Bucket::removeNode(const NodeId& nodeId)
65 : {
66 1414 : auto node = nodes.find(nodeId);
67 1414 : auto isMobile = node->second.isMobile_;
68 1414 : if (node == nodes.end())
69 3 : return false;
70 1412 : nodes.erase(nodeId);
71 1410 : if (isMobile) {
72 108 : addMobileNode(nodeId);
73 : } else {
74 1302 : addKnownNode(nodeId);
75 : }
76 :
77 1410 : return true;
78 : }
79 :
80 : std::set<NodeId>
81 21887 : Bucket::getNodeIds() const
82 : {
83 21887 : std::set<NodeId> nodesId;
84 104802 : for (auto const& key : nodes)
85 82935 : nodesId.insert(key.first);
86 21875 : return nodesId;
87 0 : }
88 :
89 : bool
90 7880 : Bucket::hasNode(const NodeId& nodeId) const
91 : {
92 7880 : return nodes.find(nodeId) != nodes.end();
93 : }
94 :
95 : bool
96 4394 : Bucket::addKnownNode(const NodeId& nodeId)
97 : {
98 4394 : if (!hasNode(nodeId)) {
99 2952 : if (known_nodes.emplace(nodeId).second) {
100 2601 : return true;
101 : }
102 : }
103 1796 : return false;
104 : }
105 :
106 : NodeId
107 134 : Bucket::getKnownNode(unsigned index) const
108 : {
109 134 : if (index > known_nodes.size()) {
110 1 : throw std::out_of_range("End of table for get known Node Id " + std::to_string(index));
111 : }
112 133 : auto it = known_nodes.begin();
113 133 : std::advance(it, index);
114 :
115 133 : return *it;
116 : }
117 :
118 : bool
119 122 : Bucket::addMobileNode(const NodeId& nodeId)
120 : {
121 122 : if (!hasNode(nodeId)) {
122 121 : if (mobile_nodes.emplace(nodeId).second) {
123 121 : known_nodes.erase(nodeId);
124 121 : return true;
125 : }
126 : }
127 1 : return false;
128 : }
129 :
130 : bool
131 818 : Bucket::addConnectingNode(const NodeId& nodeId)
132 : {
133 818 : if (!hasNode(nodeId)) {
134 821 : if (connecting_nodes.emplace(nodeId).second) {
135 782 : known_nodes.erase(nodeId);
136 785 : mobile_nodes.erase(nodeId);
137 783 : return true;
138 : }
139 : }
140 37 : return false;
141 : }
142 :
143 : std::set<NodeId>
144 1160 : Bucket::getKnownNodesRandom(unsigned numberNodes, std::mt19937_64& rd) const
145 : {
146 1160 : std::set<NodeId> nodesToReturn;
147 :
148 1160 : if (getKnownNodesSize() <= numberNodes)
149 1056 : return getKnownNodes();
150 :
151 104 : std::uniform_int_distribution<unsigned> distrib(0, getKnownNodesSize() - 1);
152 :
153 237 : while (nodesToReturn.size() < numberNodes) {
154 133 : nodesToReturn.emplace(getKnownNode(distrib(rd)));
155 : }
156 :
157 104 : return nodesToReturn;
158 1159 : }
159 :
160 : asio::steady_timer&
161 0 : Bucket::getNodeTimer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
162 : {
163 0 : auto node = nodes.find(socket->deviceId());
164 0 : if (node == nodes.end()) {
165 0 : throw std::range_error("Can't find timer " + socket->deviceId().toString());
166 : }
167 0 : return node->second.refresh_timer;
168 : }
169 :
170 : bool
171 4 : Bucket::shutdownNode(const NodeId& nodeId)
172 : {
173 4 : auto node = nodes.find(nodeId);
174 :
175 4 : if (node != nodes.end()) {
176 2 : auto socket = node->second.socket;
177 2 : auto node = socket->deviceId();
178 2 : socket->shutdown();
179 2 : removeNode(node);
180 2 : return true;
181 2 : }
182 2 : return false;
183 : }
184 :
185 : void
186 712 : Bucket::shutdownAllNodes()
187 : {
188 1639 : while (not nodes.empty()) {
189 927 : auto it = nodes.begin();
190 927 : auto socket = it->second.socket;
191 927 : auto nodeId = socket->deviceId();
192 927 : socket->shutdown();
193 927 : removeNode(nodeId);
194 927 : }
195 712 : }
196 :
197 : void
198 0 : Bucket::printBucket(unsigned number) const
199 : {
200 0 : JAMI_ERROR("BUCKET Number: {:d}", number);
201 :
202 0 : unsigned nodeNum = 1;
203 0 : for (auto it = nodes.begin(); it != nodes.end(); ++it) {
204 0 : JAMI_DEBUG("Node {:s} Id: {:s} isMobile: {:s}", std::to_string(nodeNum), it->first.toString(), std::to_string(it->second.isMobile_));
205 0 : nodeNum++;
206 : }
207 0 : JAMI_ERROR("Mobile Nodes");
208 0 : nodeNum = 0;
209 0 : for (auto it = mobile_nodes.begin(); it != mobile_nodes.end(); ++it) {
210 0 : JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
211 0 : nodeNum++;
212 : }
213 :
214 0 : JAMI_ERROR("Known Nodes");
215 0 : nodeNum = 0;
216 0 : for (auto it = known_nodes.begin(); it != known_nodes.end(); ++it) {
217 0 : JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
218 0 : nodeNum++;
219 : }
220 0 : JAMI_ERROR("Connecting_nodes");
221 0 : nodeNum = 0;
222 0 : for (auto it = connecting_nodes.begin(); it != connecting_nodes.end(); ++it) {
223 0 : JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
224 0 : nodeNum++;
225 : }
226 0 : };
227 :
228 : void
229 214 : Bucket::changeMobility(const NodeId& nodeId, bool isMobile)
230 : {
231 214 : auto itn = nodes.find(nodeId);
232 214 : if (itn != nodes.end()) {
233 213 : itn->second.isMobile_ = isMobile;
234 : }
235 213 : }
236 :
237 : // For tests
238 :
239 : std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>>
240 1 : Bucket::getNodeSockets() const
241 : {
242 1 : std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>> sockets;
243 3 : for (auto const& info : nodes)
244 2 : sockets.insert(info.second.socket);
245 1 : return sockets;
246 0 : }
247 :
248 : // ####################################################################################################
249 :
250 404 : RoutingTable::RoutingTable()
251 : {
252 404 : buckets.emplace_back(NodeId::zero());
253 404 : }
254 :
255 : bool
256 10 : RoutingTable::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
257 : {
258 10 : auto bucket = findBucket(socket->deviceId());
259 20 : return addNode(socket, bucket);
260 : }
261 :
262 : bool
263 1945 : RoutingTable::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel,
264 : std::list<Bucket>::iterator& bucket)
265 : {
266 1945 : NodeId nodeId = channel->deviceId();
267 :
268 1945 : if (bucket->hasNode(nodeId) || id_ == nodeId) {
269 565 : return false;
270 : }
271 :
272 1687 : while (bucket->isFull()) {
273 445 : if (contains(bucket, id_)) {
274 308 : split(bucket);
275 308 : bucket = findBucket(nodeId);
276 :
277 : } else {
278 137 : return bucket->addNode(std::move(channel));
279 : }
280 : }
281 1243 : return bucket->addNode(std::move(channel));
282 : }
283 :
284 : bool
285 483 : RoutingTable::removeNode(const NodeId& nodeId)
286 : {
287 483 : return findBucket(nodeId)->removeNode(nodeId);
288 : }
289 :
290 : bool
291 598 : RoutingTable::hasNode(const NodeId& nodeId)
292 : {
293 598 : return findBucket(nodeId)->hasNode(nodeId);
294 : }
295 :
296 : bool
297 3316 : RoutingTable::addKnownNode(const NodeId& nodeId)
298 : {
299 3316 : if (id_ == nodeId)
300 676 : return false;
301 :
302 2637 : auto bucket = findBucket(nodeId);
303 2644 : if (bucket == buckets.end())
304 0 : return false;
305 :
306 2639 : return bucket->addKnownNode(nodeId);
307 : }
308 :
309 : bool
310 15 : RoutingTable::addMobileNode(const NodeId& nodeId)
311 : {
312 15 : if (id_ == nodeId)
313 1 : return false;
314 :
315 14 : auto bucket = findBucket(nodeId);
316 :
317 14 : if (bucket == buckets.end())
318 0 : return 0;
319 :
320 14 : bucket->addMobileNode(nodeId);
321 14 : return 1;
322 : }
323 :
324 : void
325 2 : RoutingTable::removeMobileNode(const NodeId& nodeId)
326 : {
327 2 : return findBucket(nodeId)->removeMobileNode(nodeId);
328 : }
329 :
330 : bool
331 7 : RoutingTable::hasMobileNode(const NodeId& nodeId)
332 : {
333 7 : return findBucket(nodeId)->hasMobileNode(nodeId);
334 : };
335 :
336 : bool
337 713 : RoutingTable::addConnectingNode(const NodeId& nodeId)
338 : {
339 713 : if (id_ == nodeId)
340 1 : return false;
341 :
342 713 : auto bucket = findBucket(nodeId);
343 :
344 717 : if (bucket == buckets.end())
345 0 : return 0;
346 :
347 714 : bucket->addConnectingNode(nodeId);
348 715 : return 1;
349 : }
350 :
351 : void
352 0 : RoutingTable::removeConnectingNode(const NodeId& nodeId)
353 : {
354 0 : findBucket(nodeId)->removeConnectingNode(nodeId);
355 0 : }
356 :
357 : std::list<Bucket>::iterator
358 13253 : RoutingTable::findBucket(const NodeId& nodeId)
359 : {
360 13253 : if (buckets.empty())
361 0 : throw std::runtime_error("No bucket");
362 :
363 13253 : auto b = buckets.begin();
364 :
365 : while (true) {
366 23839 : auto next = std::next(b);
367 23835 : if (next == buckets.end())
368 13276 : return b;
369 17337 : if (std::memcmp(nodeId.data(), next->getLowerLimit().data(), nodeId.size()) < 0)
370 6766 : return b;
371 10593 : b = next;
372 10593 : }
373 : }
374 :
375 : std::vector<NodeId>
376 1286 : RoutingTable::closestNodes(const NodeId& nodeId, unsigned count)
377 : {
378 1286 : std::vector<NodeId> closestNodes;
379 1281 : auto bucket = findBucket(nodeId);
380 3713 : auto sortedBucketInsert = [&](const std::list<Bucket>::iterator& b) {
381 3713 : auto nodes = b->getNodeIds();
382 14738 : for (auto n : nodes) {
383 11036 : if (n != nodeId) {
384 9761 : auto here = std::find_if(closestNodes.begin(),
385 : closestNodes.end(),
386 52115 : [&nodeId, &n](NodeId& NodeId) {
387 52115 : return nodeId.xorCmp(n, NodeId) < 0;
388 : });
389 :
390 9765 : closestNodes.insert(here, n);
391 : }
392 : }
393 3697 : };
394 :
395 1288 : auto itn = bucket;
396 1288 : auto itp = (bucket == buckets.begin()) ? buckets.end() : std::prev(bucket);
397 4278 : while (itn != buckets.end() || itp != buckets.end()) {
398 2992 : if (itn != buckets.end()) {
399 2470 : sortedBucketInsert(itn);
400 2467 : itn = std::next(itn);
401 : }
402 2992 : if (itp != buckets.end()) {
403 1244 : sortedBucketInsert(itp);
404 1244 : itp = (itp == buckets.begin()) ? buckets.end() : std::prev(itp);
405 : }
406 : }
407 :
408 1293 : if (closestNodes.size() > count) {
409 885 : closestNodes.resize(count);
410 : }
411 :
412 2584 : return closestNodes;
413 0 : }
414 :
415 : void
416 0 : RoutingTable::printRoutingTable() const
417 : {
418 0 : int counter = 1;
419 0 : JAMI_DEBUG("SWARM: {:s} ", id_.toString());
420 0 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
421 0 : it->printBucket(counter);
422 0 : counter++;
423 : }
424 0 : JAMI_DEBUG("_____________________________________________________________________________");
425 0 : }
426 :
427 : void
428 2 : RoutingTable::shutdownNode(const NodeId& nodeId)
429 : {
430 2 : findBucket(nodeId)->shutdownNode(nodeId);
431 2 : }
432 :
433 : std::vector<NodeId>
434 5709 : RoutingTable::getNodes() const
435 : {
436 5709 : std::lock_guard lock(mutex_);
437 5710 : std::vector<NodeId> ret;
438 23884 : for (const auto& b : buckets) {
439 18174 : const auto& nodes = b.getNodeIds();
440 18173 : ret.insert(ret.end(), nodes.begin(), nodes.end());
441 18176 : }
442 11420 : return ret;
443 5710 : }
444 :
445 : std::vector<NodeId>
446 1 : RoutingTable::getKnownNodes() const
447 : {
448 1 : std::vector<NodeId> ret;
449 2 : for (const auto& b : buckets) {
450 1 : const auto& nodes = b.getKnownNodes();
451 1 : ret.insert(ret.end(), nodes.begin(), nodes.end());
452 : }
453 1 : return ret;
454 0 : }
455 :
456 : std::vector<NodeId>
457 2618 : RoutingTable::getMobileNodes() const
458 : {
459 2618 : std::vector<NodeId> ret;
460 11326 : for (const auto& b : buckets) {
461 8709 : const auto& nodes = b.getMobileNodes();
462 8709 : ret.insert(ret.end(), nodes.begin(), nodes.end());
463 : }
464 2617 : return ret;
465 0 : }
466 :
467 : std::vector<NodeId>
468 1 : RoutingTable::getConnectingNodes() const
469 : {
470 1 : std::vector<NodeId> ret;
471 2 : for (const auto& b : buckets) {
472 1 : const auto& nodes = b.getConnectingNodes();
473 1 : ret.insert(ret.end(), nodes.begin(), nodes.end());
474 : }
475 1 : return ret;
476 0 : }
477 :
478 : std::vector<NodeId>
479 0 : RoutingTable::getBucketMobileNodes() const
480 : {
481 0 : std::vector<NodeId> ret;
482 0 : auto bucket = findBucket(id_);
483 0 : const auto& nodes = bucket->getMobileNodes();
484 0 : ret.insert(ret.end(), nodes.begin(), nodes.end());
485 :
486 0 : return ret;
487 0 : }
488 :
489 : bool
490 4386 : RoutingTable::contains(const std::list<Bucket>::iterator& bucket, const NodeId& nodeId) const
491 : {
492 4386 : return NodeId::cmp(bucket->getLowerLimit(), nodeId) <= 0
493 7203 : && (std::next(bucket) == buckets.end()
494 7203 : || NodeId::cmp(nodeId, std::next(bucket)->getLowerLimit()) < 0);
495 : }
496 :
497 : std::vector<NodeId>
498 1 : RoutingTable::getAllNodes() const
499 : {
500 1 : std::vector<NodeId> ret;
501 2 : for (const auto& b : buckets) {
502 1 : const auto& nodes = b.getNodeIds();
503 1 : const auto& knownNodes = b.getKnownNodes();
504 1 : const auto& mobileNodes = b.getMobileNodes();
505 1 : const auto& connectingNodes = b.getConnectingNodes();
506 1 : ret.reserve(nodes.size() + knownNodes.size() + mobileNodes.size() + connectingNodes.size());
507 1 : ret.insert(ret.end(), nodes.begin(), nodes.end());
508 1 : ret.insert(ret.end(), knownNodes.begin(), knownNodes.end());
509 1 : ret.insert(ret.end(), mobileNodes.begin(), mobileNodes.end());
510 1 : ret.insert(ret.end(), connectingNodes.begin(), connectingNodes.end());
511 1 : }
512 1 : return ret;
513 0 : }
514 :
515 : void
516 1 : RoutingTable::deleteNode(const NodeId& nodeId)
517 : {
518 1 : auto bucket = findBucket(nodeId);
519 1 : bucket->shutdownNode(nodeId);
520 1 : bucket->removeConnectingNode(nodeId);
521 1 : bucket->removeKnownNode(nodeId);
522 1 : bucket->removeMobileNode(nodeId);
523 1 : }
524 :
525 : NodeId
526 308 : RoutingTable::middle(std::list<Bucket>::iterator& it) const
527 : {
528 308 : unsigned bit = depth(it);
529 308 : if (bit >= 8 * HASH_LEN)
530 0 : throw std::out_of_range("End of table");
531 :
532 308 : NodeId id = it->getLowerLimit();
533 308 : id.setBit(bit, true);
534 308 : return id;
535 : }
536 :
537 : unsigned
538 308 : RoutingTable::depth(std::list<Bucket>::iterator& bucket) const
539 : {
540 308 : int bit1 = bucket->getLowerLimit().lowbit();
541 308 : int bit2 = std::next(bucket) != buckets.end() ? std::next(bucket)->getLowerLimit().lowbit()
542 308 : : -1;
543 308 : return std::max(bit1, bit2) + 1;
544 : }
545 :
546 : bool
547 308 : RoutingTable::split(std::list<Bucket>::iterator& bucket)
548 : {
549 308 : NodeId id = middle(bucket);
550 308 : auto newBucketIt = buckets.emplace(std::next(bucket), id);
551 : // Re-assign nodes
552 308 : auto& nodeSwap = bucket->getNodes();
553 :
554 924 : for (auto it = nodeSwap.begin(); it != nodeSwap.end();) {
555 616 : auto& node = *it;
556 :
557 616 : auto nodeId = it->first;
558 :
559 616 : if (!contains(bucket, nodeId)) {
560 235 : newBucketIt->addNode(std::move(node.second));
561 235 : it = nodeSwap.erase(it);
562 : } else {
563 381 : ++it;
564 : }
565 : }
566 :
567 308 : auto connectingSwap = bucket->getConnectingNodes();
568 574 : for (auto it = connectingSwap.begin(); it != connectingSwap.end();) {
569 266 : auto nodeId = *it;
570 :
571 266 : if (!contains(bucket, nodeId)) {
572 104 : newBucketIt->addConnectingNode(nodeId);
573 104 : it = connectingSwap.erase(it);
574 104 : bucket->removeConnectingNode(nodeId);
575 : } else {
576 162 : ++it;
577 : }
578 : }
579 :
580 308 : auto knownSwap = bucket->getKnownNodes();
581 803 : for (auto it = knownSwap.begin(); it != knownSwap.end();) {
582 495 : auto nodeId = *it;
583 :
584 495 : if (!contains(bucket, nodeId)) {
585 227 : newBucketIt->addKnownNode(nodeId);
586 227 : it = knownSwap.erase(it);
587 227 : bucket->removeKnownNode(nodeId);
588 : } else {
589 268 : ++it;
590 : }
591 : }
592 :
593 308 : auto mobileSwap = bucket->getMobileNodes();
594 308 : for (auto it = mobileSwap.begin(); it != mobileSwap.end();) {
595 0 : auto nodeId = *it;
596 :
597 0 : if (!contains(bucket, nodeId)) {
598 0 : newBucketIt->addMobileNode(nodeId);
599 0 : it = mobileSwap.erase(it);
600 0 : bucket->removeMobileNode(nodeId);
601 : } else {
602 0 : ++it;
603 : }
604 : }
605 :
606 308 : return true;
607 308 : }
608 :
609 : } // namespace jami
|