LCOV - code coverage report
Current view: top level - foo/src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 211 222 95.0 %
Date: 2026-04-01 09:29:43 Functions: 35 37 94.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <dhtnet/channel_utils.h>
      21             : #include <opendht/thread_pool.h>
      22             : 
      23             : namespace jami {
      24             : 
      25             : using namespace swarm_protocol;
      26             : 
      27         592 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      28         592 :     : id_(id)
      29         592 :     , rd(rand)
      30         592 :     , toConnectCb_(toConnectCb)
      31             : {
      32         592 :     routing_table.setId(id);
      33         592 : }
      34             : 
      35         591 : SwarmManager::~SwarmManager()
      36             : {
      37         591 :     if (!isShutdown_)
      38         247 :         shutdown();
      39         591 : }
      40             : 
      41             : bool
      42        1989 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      43             : {
      44        1989 :     isShutdown_ = false;
      45        1991 :     std::vector<NodeId> newNodes;
      46             :     {
      47        1985 :         std::lock_guard lock(mutex);
      48        7439 :         for (const auto& nodeId : known_nodes) {
      49        5440 :             if (addKnownNode(nodeId)) {
      50        3013 :                 newNodes.emplace_back(nodeId);
      51             :             }
      52             :         }
      53        1988 :     }
      54             : 
      55        1999 :     if (newNodes.empty())
      56        1430 :         return false;
      57             : 
      58         564 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      59         566 :         auto shared = w.lock();
      60         567 :         if (!shared)
      61           0 :             return;
      62             :         // If we detect a new node which already got a TCP link
      63             :         // we can use it to speed-up the bootstrap (because opening
      64             :         // a new channel will be easy)
      65         566 :         std::set<NodeId> toConnect;
      66        3581 :         for (const auto& nodeId : newNodes) {
      67        3014 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      68         269 :                 toConnect.emplace(nodeId);
      69             :         }
      70         564 :         shared->maintainBuckets(toConnect);
      71         565 :     });
      72         567 :     return true;
      73        1997 : }
      74             : 
      75             : void
      76        1158 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      77             : {
      78             :     {
      79        1158 :         std::lock_guard lock(mutex);
      80        1174 :         for (const auto& nodeId : mobile_nodes)
      81          13 :             addMobileNodes(nodeId);
      82        1155 :     }
      83        1160 : }
      84             : 
      85             : void
      86        1534 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      87             : {
      88             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      89        1534 :     if (channel) {
      90        1534 :         auto emit = false;
      91             :         {
      92        1534 :             std::lock_guard lock(mutex);
      93        1535 :             emit = routing_table.findBucket(getId())->isEmpty();
      94        1533 :             auto bucket = routing_table.findBucket(channel->deviceId());
      95        1534 :             if (routing_table.addNode(channel, bucket)) {
      96        1183 :                 std::error_code ec;
      97        1183 :                 resetNodeExpiry(ec, channel, id_);
      98             :             }
      99        1535 :         }
     100        1535 :         receiveMessage(channel);
     101        1535 :         if (emit && onConnectionChanged_) {
     102             :             // If it's the first channel we add, we're now connected!
     103        1324 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     104         331 :             onConnectionChanged_(true);
     105             :         }
     106             :     }
     107        1535 : }
     108             : 
     109             : void
     110         467 : SwarmManager::removeNode(const NodeId& nodeId)
     111             : {
     112         467 :     std::unique_lock lk(mutex);
     113         468 :     if (isConnectedWith(nodeId)) {
     114         452 :         removeNodeInternal(nodeId);
     115         450 :         lk.unlock();
     116         451 :         maintainBuckets();
     117             :     }
     118         468 : }
     119             : 
     120             : void
     121         202 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     122             : {
     123         202 :     std::lock_guard lock(mutex);
     124         202 :     auto bucket = routing_table.findBucket(nodeId);
     125         202 :     bucket->changeMobility(nodeId, isMobile);
     126         202 : }
     127             : 
     128             : bool
     129         468 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     130             : {
     131         468 :     return routing_table.hasNode(deviceId);
     132             : }
     133             : 
     134             : void
     135         642 : SwarmManager::shutdown()
     136             : {
     137         642 :     if (isShutdown_) {
     138          17 :         return;
     139             :     }
     140         625 :     isShutdown_ = true;
     141         625 :     std::lock_guard lock(mutex);
     142         625 :     routing_table.shutdownAllNodes();
     143         625 : }
     144             : 
     145             : void
     146          24 : SwarmManager::restart()
     147             : {
     148          24 :     isShutdown_ = false;
     149          24 : }
     150             : 
     151             : bool
     152        5440 : SwarmManager::addKnownNode(const NodeId& nodeId)
     153             : {
     154        5440 :     return routing_table.addKnownNode(nodeId);
     155             : }
     156             : 
     157             : void
     158          13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     159             : {
     160          13 :     if (id_ != nodeId) {
     161          12 :         routing_table.addMobileNode(nodeId);
     162             :     }
     163          13 : }
     164             : 
     165             : void
     166        1054 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     167             : {
     168        1054 :     std::set<NodeId> nodes = toConnect;
     169        1055 :     std::unique_lock lock(mutex);
     170        1058 :     auto& buckets = routing_table.getBuckets();
     171        3474 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     172        2415 :         auto& bucket = *it;
     173        2409 :         bool myBucket = routing_table.contains(it, id_);
     174        3778 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     175        1359 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     176        2416 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     177        1379 :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
     178        2373 :             for (auto& node : nodesToTry)
     179         995 :                 routing_table.addConnectingNode(node);
     180             : 
     181        1383 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182        1383 :         }
     183             :     }
     184        1059 :     lock.unlock();
     185        2119 :     for (auto& node : nodes)
     186        1060 :         tryConnect(node);
     187        1058 : }
     188             : 
     189             : void
     190        1183 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191             :                           const NodeId& nodeId,
     192             :                           Query q,
     193             :                           int numberNodes)
     194             : {
     195        1183 :     dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
     196        1183 :         msgpack::sbuffer buffer;
     197        1183 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198        1183 :         Message msg;
     199        1183 :         msg.is_mobile = isMobile;
     200        1183 :         msg.request = Request {q, numberNodes, nodeId};
     201        1180 :         pk.pack(msg);
     202             : 
     203        1180 :         std::error_code ec;
     204        1180 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205        1183 :         if (ec) {
     206           4 :             JAMI_ERROR("{}", ec.message());
     207             :         }
     208        1183 :     });
     209        1182 : }
     210             : 
     211             : void
     212        1150 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     213             : {
     214        1150 :     std::lock_guard lock(mutex);
     215             : 
     216        1155 :     if (msg_.request->q == Query::FIND) {
     217        1150 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     218        1157 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     219        1158 :         const auto& m_nodes = bucket->getMobileNodes();
     220        1157 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     221             : 
     222        1153 :         Message msg;
     223        1153 :         msg.is_mobile = isMobile_;
     224        1153 :         msg.response = std::move(toResponse);
     225             : 
     226        1153 :         msgpack::sbuffer buffer((size_t) 60000);
     227        1160 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     228        1158 :         pk.pack(msg);
     229             : 
     230        1156 :         std::error_code ec;
     231        1155 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     232        1160 :         if (ec) {
     233           0 :             JAMI_ERROR("{}", ec.message());
     234           0 :             return;
     235             :         }
     236        1160 :     }
     237        1160 : }
     238             : 
     239             : void
     240        1535 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     241             : {
     242        1535 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
     243        2304 :         [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
     244        2304 :             auto shared = w.lock();
     245        2299 :             auto socket = wsocket.lock();
     246        2301 :             if (!shared || !socket)
     247           0 :                 return std::make_error_code(std::errc::operation_canceled);
     248             : 
     249        2298 :             if (msg.is_mobile)
     250         202 :                 shared->changeMobility(socket->deviceId(), msg.is_mobile);
     251             : 
     252        2298 :             if (msg.request) {
     253        1151 :                 shared->sendAnswer(socket, msg);
     254             : 
     255        1153 :             } else if (msg.response) {
     256        1154 :                 shared->setKnownNodes(msg.response->nodes);
     257        1157 :                 shared->setMobileNodes(msg.response->mobile_nodes);
     258             :             }
     259        2314 :             return std::error_code();
     260        2314 :         }));
     261             : 
     262        1535 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
     263         834 :         dht::ThreadPool::io().run([w, deviceId] {
     264         839 :             auto shared = w.lock();
     265         835 :             if (shared && !shared->isShutdown_) {
     266         467 :                 shared->removeNode(deviceId);
     267             :             }
     268         839 :         });
     269         841 :     });
     270        1534 : }
     271             : 
     272             : void
     273        1183 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     274             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     275             :                               NodeId node)
     276             : {
     277        1183 :     NodeId idToFind;
     278        1183 :     std::list<Bucket>::iterator bucket;
     279             : 
     280        1182 :     if (ec == asio::error::operation_aborted)
     281           0 :         return;
     282             : 
     283        1183 :     if (!node) {
     284           0 :         bucket = routing_table.findBucket(socket->deviceId());
     285           0 :         idToFind = bucket->randomId(rd);
     286             :     } else {
     287        1182 :         bucket = routing_table.findBucket(node);
     288        1183 :         idToFind = node;
     289             :     }
     290             : 
     291        1183 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     292             : 
     293        1182 :     if (!node) {
     294           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     295           0 :         nodeTimer.expires_after(FIND_PERIOD);
     296           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     297           0 :                                        shared_from_this(),
     298             :                                        std::placeholders::_1,
     299             :                                        socket,
     300           1 :                                        NodeId {}));
     301             :     }
     302             : }
     303             : 
     304             : void
     305        1060 : SwarmManager::tryConnect(const NodeId& nodeId)
     306             : {
     307        1060 :     if (needSocketCb_)
     308        1055 :         needSocketCb_(nodeId.toString(),
     309         932 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     310         932 :                           auto shared = w.lock();
     311         934 :                           if (!shared || shared->isShutdown_)
     312         282 :                               return true;
     313         647 :                           if (socket) {
     314         583 :                               shared->addChannel(socket);
     315         583 :                               return true;
     316             :                           }
     317          64 :                           std::unique_lock lk(shared->mutex);
     318          64 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     319          64 :                           bucket->removeConnectingNode(nodeId);
     320          64 :                           bucket->addKnownNode(nodeId);
     321          64 :                           bucket = shared->routing_table.findBucket(shared->getId());
     322         116 :                           if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
     323         116 :                               && shared->onConnectionChanged_) {
     324          31 :                               lk.unlock();
     325         124 :                               JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
     326          31 :                               shared->onConnectionChanged_(false);
     327             :                           }
     328          64 :                           return true;
     329         929 :                       });
     330        1059 : }
     331             : 
     332             : void
     333         452 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     334             : {
     335         452 :     routing_table.removeNode(nodeId);
     336         450 : }
     337             : 
     338             : std::vector<NodeId>
     339          17 : SwarmManager::getAllNodes() const
     340             : {
     341          17 :     std::lock_guard lock(mutex);
     342          34 :     return routing_table.getAllNodes();
     343          17 : }
     344             : 
     345             : std::vector<NodeId>
     346        1784 : SwarmManager::getConnectedNodes() const
     347             : {
     348        1784 :     std::lock_guard lock(mutex);
     349        3568 :     return routing_table.getConnectedNodes();
     350        1784 : }
     351             : 
     352             : bool
     353        2367 : SwarmManager::isConnected() const
     354             : {
     355        2367 :     std::lock_guard lock(mutex);
     356        4734 :     return !routing_table.isEmpty();
     357        2367 : }
     358             : 
     359             : void
     360          17 : SwarmManager::deleteNode(const std::vector<NodeId>& nodes)
     361             : {
     362             :     {
     363          17 :         std::lock_guard lock(mutex);
     364          29 :         for (const auto& node : nodes) {
     365          12 :             routing_table.deleteNode(node);
     366             :         }
     367          17 :     }
     368          17 :     maintainBuckets();
     369          17 : }
     370             : 
     371             : } // namespace jami

Generated by: LCOV version 1.14