LCOV - code coverage report
Current view: top level - foo/src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 210 222 94.6 %
Date: 2025-12-18 10:07:43 Functions: 35 37 94.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <dhtnet/channel_utils.h>
      21             : #include <opendht/thread_pool.h>
      22             : 
      23             : namespace jami {
      24             : 
      25             : using namespace swarm_protocol;
      26             : 
      27         581 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      28         581 :     : id_(id)
      29         581 :     , rd(rand)
      30         581 :     , toConnectCb_(toConnectCb)
      31             : {
      32         581 :     routing_table.setId(id);
      33         581 : }
      34             : 
      35         581 : SwarmManager::~SwarmManager()
      36             : {
      37         581 :     if (!isShutdown_)
      38         258 :         shutdown();
      39         581 : }
      40             : 
      41             : bool
      42        1534 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      43             : {
      44        1534 :     isShutdown_ = false;
      45        1532 :     std::vector<NodeId> newNodes;
      46             :     {
      47        1535 :         std::lock_guard lock(mutex);
      48        6213 :         for (const auto& nodeId : known_nodes) {
      49        4676 :             if (addKnownNode(nodeId)) {
      50        2754 :                 newNodes.emplace_back(nodeId);
      51             :             }
      52             :         }
      53        1531 :     }
      54             : 
      55        1537 :     if (newNodes.empty())
      56        1172 :         return false;
      57             : 
      58         366 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      59         366 :         auto shared = w.lock();
      60         366 :         if (!shared)
      61           0 :             return;
      62             :         // If we detect a new node which already got a TCP link
      63             :         // we can use it to speed-up the bootstrap (because opening
      64             :         // a new channel will be easy)
      65         366 :         std::set<NodeId> toConnect;
      66        3121 :         for (const auto& nodeId : newNodes) {
      67        2757 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      68         266 :                 toConnect.emplace(nodeId);
      69             :         }
      70         364 :         shared->maintainBuckets(toConnect);
      71         366 :     });
      72         366 :     return true;
      73        1538 : }
      74             : 
      75             : void
      76         710 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      77             : {
      78             :     {
      79         710 :         std::lock_guard lock(mutex);
      80         719 :         for (const auto& nodeId : mobile_nodes)
      81          12 :             addMobileNodes(nodeId);
      82         709 :     }
      83         708 : }
      84             : 
      85             : void
      86        1052 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      87             : {
      88             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      89        1052 :     if (channel) {
      90        1052 :         auto emit = false;
      91             :         {
      92        1052 :             std::lock_guard lock(mutex);
      93        1051 :             emit = routing_table.findBucket(getId())->isEmpty();
      94        1052 :             auto bucket = routing_table.findBucket(channel->deviceId());
      95        1052 :             if (routing_table.addNode(channel, bucket)) {
      96         730 :                 std::error_code ec;
      97         730 :                 resetNodeExpiry(ec, channel, id_);
      98             :             }
      99        1052 :         }
     100        1052 :         receiveMessage(channel);
     101        1051 :         if (emit && onConnectionChanged_) {
     102             :             // If it's the first channel we add, we're now connected!
     103        1232 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     104         308 :             onConnectionChanged_(true);
     105             :         }
     106             :     }
     107        1051 : }
     108             : 
     109             : void
     110         355 : SwarmManager::removeNode(const NodeId& nodeId)
     111             : {
     112         355 :     std::unique_lock lk(mutex);
     113         355 :     if (isConnectedWith(nodeId)) {
     114         341 :         removeNodeInternal(nodeId);
     115         340 :         lk.unlock();
     116         341 :         maintainBuckets();
     117             :     }
     118         355 : }
     119             : 
     120             : void
     121           8 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     122             : {
     123           8 :     std::lock_guard lock(mutex);
     124           8 :     auto bucket = routing_table.findBucket(nodeId);
     125           8 :     bucket->changeMobility(nodeId, isMobile);
     126           8 : }
     127             : 
     128             : bool
     129         355 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     130             : {
     131         355 :     return routing_table.hasNode(deviceId);
     132             : }
     133             : 
     134             : void
     135         638 : SwarmManager::shutdown()
     136             : {
     137         638 :     if (isShutdown_) {
     138          18 :         return;
     139             :     }
     140         620 :     isShutdown_ = true;
     141         620 :     std::lock_guard lock(mutex);
     142         620 :     routing_table.shutdownAllNodes();
     143         620 : }
     144             : 
     145             : void
     146          26 : SwarmManager::restart()
     147             : {
     148          26 :     isShutdown_ = false;
     149          26 : }
     150             : 
     151             : bool
     152        4677 : SwarmManager::addKnownNode(const NodeId& nodeId)
     153             : {
     154        4677 :     return routing_table.addKnownNode(nodeId);
     155             : }
     156             : 
     157             : void
     158          12 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     159             : {
     160          12 :     if (id_ != nodeId) {
     161          11 :         routing_table.addMobileNode(nodeId);
     162             :     }
     163          12 : }
     164             : 
     165             : void
     166         750 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     167             : {
     168         750 :     std::set<NodeId> nodes = toConnect;
     169         749 :     std::unique_lock lock(mutex);
     170         750 :     auto& buckets = routing_table.getBuckets();
     171        2200 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     172        1446 :         auto& bucket = *it;
     173        1446 :         bool myBucket = routing_table.contains(it, id_);
     174        2149 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     175         699 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     176        1450 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     177         819 :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
     178        1571 :             for (auto& node : nodesToTry)
     179         753 :                 routing_table.addConnectingNode(node);
     180             : 
     181         818 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182         819 :         }
     183             :     }
     184         750 :     lock.unlock();
     185        1572 :     for (auto& node : nodes)
     186         822 :         tryConnect(node);
     187         749 : }
     188             : 
     189             : void
     190         730 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191             :                           const NodeId& nodeId,
     192             :                           Query q,
     193             :                           int numberNodes)
     194             : {
     195         730 :     dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
     196         730 :         msgpack::sbuffer buffer;
     197         730 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198         730 :         Message msg;
     199         730 :         msg.is_mobile = isMobile;
     200         730 :         msg.request = Request {q, numberNodes, nodeId};
     201         729 :         pk.pack(msg);
     202             : 
     203         730 :         std::error_code ec;
     204         730 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205         730 :         if (ec) {
     206           8 :             JAMI_ERROR("{}", ec.message());
     207             :         }
     208         730 :     });
     209         730 : }
     210             : 
     211             : void
     212         706 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     213             : {
     214         706 :     std::lock_guard lock(mutex);
     215             : 
     216         706 :     if (msg_.request->q == Query::FIND) {
     217         708 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     218         708 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     219         708 :         const auto& m_nodes = bucket->getMobileNodes();
     220         707 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     221             : 
     222         706 :         Message msg;
     223         706 :         msg.is_mobile = isMobile_;
     224         706 :         msg.response = std::move(toResponse);
     225             : 
     226         705 :         msgpack::sbuffer buffer((size_t) 60000);
     227         708 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     228         708 :         pk.pack(msg);
     229             : 
     230         708 :         std::error_code ec;
     231         708 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     232         707 :         if (ec) {
     233           0 :             JAMI_ERROR("{}", ec.message());
     234           0 :             return;
     235             :         }
     236         707 :     }
     237         707 : }
     238             : 
     239             : void
     240        1052 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     241             : {
     242        1055 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
     243        1411 :         [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
     244        1411 :             auto shared = w.lock();
     245        1411 :             auto socket = wsocket.lock();
     246        1413 :             if (!shared || !socket)
     247           0 :                 return std::make_error_code(std::errc::operation_canceled);
     248             : 
     249        1407 :             if (msg.is_mobile)
     250           8 :                 shared->changeMobility(socket->deviceId(), msg.is_mobile);
     251             : 
     252        1407 :             if (msg.request) {
     253         706 :                 shared->sendAnswer(socket, msg);
     254             : 
     255         703 :             } else if (msg.response) {
     256         705 :                 shared->setKnownNodes(msg.response->nodes);
     257         708 :                 shared->setMobileNodes(msg.response->mobile_nodes);
     258             :             }
     259        1414 :             return std::error_code();
     260        1414 :         }));
     261             : 
     262        1050 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
     263         649 :         dht::ThreadPool::io().run([w, deviceId] {
     264         651 :             auto shared = w.lock();
     265         650 :             if (shared && !shared->isShutdown_) {
     266         355 :                 shared->removeNode(deviceId);
     267             :             }
     268         651 :         });
     269         651 :     });
     270        1051 : }
     271             : 
     272             : void
     273         730 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     274             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     275             :                               NodeId node)
     276             : {
     277         730 :     NodeId idToFind;
     278         729 :     std::list<Bucket>::iterator bucket;
     279             : 
     280         728 :     if (ec == asio::error::operation_aborted)
     281           0 :         return;
     282             : 
     283         730 :     if (!node) {
     284           0 :         bucket = routing_table.findBucket(socket->deviceId());
     285           0 :         idToFind = bucket->randomId(rd);
     286             :     } else {
     287         730 :         bucket = routing_table.findBucket(node);
     288         729 :         idToFind = node;
     289             :     }
     290             : 
     291         729 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     292             : 
     293         730 :     if (!node) {
     294           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     295           0 :         nodeTimer.expires_after(FIND_PERIOD);
     296           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     297           0 :                                        shared_from_this(),
     298             :                                        std::placeholders::_1,
     299             :                                        socket,
     300           0 :                                        NodeId {}));
     301             :     }
     302             : }
     303             : 
     304             : void
     305         822 : SwarmManager::tryConnect(const NodeId& nodeId)
     306             : {
     307         822 :     if (needSocketCb_)
     308         819 :         needSocketCb_(nodeId.toString(),
     309         684 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     310         684 :                           auto shared = w.lock();
     311         683 :                           if (!shared || shared->isShutdown_)
     312         260 :                               return true;
     313         423 :                           if (socket) {
     314         359 :                               shared->addChannel(socket);
     315         358 :                               return true;
     316             :                           }
     317          64 :                           std::unique_lock lk(shared->mutex);
     318          64 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     319          64 :                           bucket->removeConnectingNode(nodeId);
     320          64 :                           bucket->addKnownNode(nodeId);
     321          64 :                           bucket = shared->routing_table.findBucket(shared->getId());
     322         115 :                           if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
     323         115 :                               && shared->onConnectionChanged_) {
     324          34 :                               lk.unlock();
     325         136 :                               JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
     326          34 :                               shared->onConnectionChanged_(false);
     327             :                           }
     328          64 :                           return true;
     329         683 :                       });
     330         823 : }
     331             : 
     332             : void
     333         341 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     334             : {
     335         341 :     routing_table.removeNode(nodeId);
     336         340 : }
     337             : 
     338             : std::vector<NodeId>
     339          17 : SwarmManager::getAllNodes() const
     340             : {
     341          17 :     std::lock_guard lock(mutex);
     342          34 :     return routing_table.getAllNodes();
     343          17 : }
     344             : 
     345             : std::vector<NodeId>
     346        1721 : SwarmManager::getConnectedNodes() const
     347             : {
     348        1721 :     std::lock_guard lock(mutex);
     349        3442 :     return routing_table.getConnectedNodes();
     350        1721 : }
     351             : 
     352             : bool
     353        2312 : SwarmManager::isConnected() const
     354             : {
     355        2312 :     std::lock_guard lock(mutex);
     356        4623 :     return !routing_table.isEmpty();
     357        2311 : }
     358             : 
     359             : void
     360          17 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     361             : {
     362             :     {
     363          17 :         std::lock_guard lock(mutex);
     364          28 :         for (const auto& node : nodes) {
     365          11 :             routing_table.deleteNode(node);
     366             :         }
     367          17 :     }
     368          17 :     maintainBuckets();
     369          17 : }
     370             : 
     371             : } // namespace jami

Generated by: LCOV version 1.14