LCOV - code coverage report
Current view: top level - foo/src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 210 222 94.6 %
Date: 2026-02-28 10:41:24 Functions: 35 37 94.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <dhtnet/channel_utils.h>
      21             : #include <opendht/thread_pool.h>
      22             : 
      23             : namespace jami {
      24             : 
      25             : using namespace swarm_protocol;
      26             : 
      27         591 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      28         591 :     : id_(id)
      29         591 :     , rd(rand)
      30         591 :     , toConnectCb_(toConnectCb)
      31             : {
      32         591 :     routing_table.setId(id);
      33         591 : }
      34             : 
      35         590 : SwarmManager::~SwarmManager()
      36             : {
      37         590 :     if (!isShutdown_)
      38         246 :         shutdown();
      39         590 : }
      40             : 
      41             : bool
      42        1996 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      43             : {
      44        1996 :     isShutdown_ = false;
      45        1992 :     std::vector<NodeId> newNodes;
      46             :     {
      47        1988 :         std::lock_guard lock(mutex);
      48        7457 :         for (const auto& nodeId : known_nodes) {
      49        5460 :             if (addKnownNode(nodeId)) {
      50        3047 :                 newNodes.emplace_back(nodeId);
      51             :             }
      52             :         }
      53        1995 :     }
      54             : 
      55        1999 :     if (newNodes.empty())
      56        1414 :         return false;
      57             : 
      58         581 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      59         583 :         auto shared = w.lock();
      60         581 :         if (!shared)
      61           0 :             return;
      62             :         // If we detect a new node which already got a TCP link
      63             :         // we can use it to speed-up the bootstrap (because opening
      64             :         // a new channel will be easy)
      65         581 :         std::set<NodeId> toConnect;
      66        3632 :         for (const auto& nodeId : newNodes) {
      67        3050 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      68         270 :                 toConnect.emplace(nodeId);
      69             :         }
      70         582 :         shared->maintainBuckets(toConnect);
      71         583 :     });
      72         583 :     return true;
      73        1997 : }
      74             : 
      75             : void
      76        1161 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      77             : {
      78             :     {
      79        1161 :         std::lock_guard lock(mutex);
      80        1178 :         for (const auto& nodeId : mobile_nodes)
      81          16 :             addMobileNodes(nodeId);
      82        1158 :     }
      83        1160 : }
      84             : 
      85             : void
      86        1524 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      87             : {
      88             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      89        1524 :     if (channel) {
      90        1524 :         auto emit = false;
      91             :         {
      92        1524 :             std::lock_guard lock(mutex);
      93        1524 :             emit = routing_table.findBucket(getId())->isEmpty();
      94        1524 :             auto bucket = routing_table.findBucket(channel->deviceId());
      95        1524 :             if (routing_table.addNode(channel, bucket)) {
      96        1183 :                 std::error_code ec;
      97        1183 :                 resetNodeExpiry(ec, channel, id_);
      98             :             }
      99        1524 :         }
     100        1524 :         receiveMessage(channel);
     101        1524 :         if (emit && onConnectionChanged_) {
     102             :             // If it's the first channel we add, we're now connected!
     103        1484 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     104         371 :             onConnectionChanged_(true);
     105             :         }
     106             :     }
     107        1524 : }
     108             : 
     109             : void
     110         435 : SwarmManager::removeNode(const NodeId& nodeId)
     111             : {
     112         435 :     std::unique_lock lk(mutex);
     113         436 :     if (isConnectedWith(nodeId)) {
     114         419 :         removeNodeInternal(nodeId);
     115         418 :         lk.unlock();
     116         419 :         maintainBuckets();
     117             :     }
     118         436 : }
     119             : 
     120             : void
     121         221 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     122             : {
     123         221 :     std::lock_guard lock(mutex);
     124         222 :     auto bucket = routing_table.findBucket(nodeId);
     125         220 :     bucket->changeMobility(nodeId, isMobile);
     126         222 : }
     127             : 
     128             : bool
     129         436 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     130             : {
     131         436 :     return routing_table.hasNode(deviceId);
     132             : }
     133             : 
     134             : void
     135         640 : SwarmManager::shutdown()
     136             : {
     137         640 :     if (isShutdown_) {
     138          18 :         return;
     139             :     }
     140         622 :     isShutdown_ = true;
     141         622 :     std::lock_guard lock(mutex);
     142         622 :     routing_table.shutdownAllNodes();
     143         622 : }
     144             : 
     145             : void
     146          23 : SwarmManager::restart()
     147             : {
     148          23 :     isShutdown_ = false;
     149          23 : }
     150             : 
     151             : bool
     152        5462 : SwarmManager::addKnownNode(const NodeId& nodeId)
     153             : {
     154        5462 :     return routing_table.addKnownNode(nodeId);
     155             : }
     156             : 
     157             : void
     158          16 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     159             : {
     160          16 :     if (id_ != nodeId) {
     161          15 :         routing_table.addMobileNode(nodeId);
     162             :     }
     163          16 : }
     164             : 
     165             : void
     166        1040 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     167             : {
     168        1040 :     std::set<NodeId> nodes = toConnect;
     169        1041 :     std::unique_lock lock(mutex);
     170        1041 :     auto& buckets = routing_table.getBuckets();
     171        3342 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     172        2303 :         auto& bucket = *it;
     173        2297 :         bool myBucket = routing_table.contains(it, id_);
     174        3571 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     175        1268 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     176        2304 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     177        1303 :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
     178        2271 :             for (auto& node : nodesToTry)
     179         971 :                 routing_table.addConnectingNode(node);
     180             : 
     181        1298 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182        1299 :         }
     183             :     }
     184        1042 :     lock.unlock();
     185        2084 :     for (auto& node : nodes)
     186        1040 :         tryConnect(node);
     187        1043 : }
     188             : 
     189             : void
     190        1183 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191             :                           const NodeId& nodeId,
     192             :                           Query q,
     193             :                           int numberNodes)
     194             : {
     195        1183 :     dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
     196        1183 :         msgpack::sbuffer buffer;
     197        1183 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198        1183 :         Message msg;
     199        1183 :         msg.is_mobile = isMobile;
     200        1183 :         msg.request = Request {q, numberNodes, nodeId};
     201        1182 :         pk.pack(msg);
     202             : 
     203        1181 :         std::error_code ec;
     204        1181 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205        1183 :         if (ec) {
     206           4 :             JAMI_ERROR("{}", ec.message());
     207             :         }
     208        1183 :     });
     209        1183 : }
     210             : 
     211             : void
     212        1153 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     213             : {
     214        1153 :     std::lock_guard lock(mutex);
     215             : 
     216        1156 :     if (msg_.request->q == Query::FIND) {
     217        1148 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     218        1158 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     219        1156 :         const auto& m_nodes = bucket->getMobileNodes();
     220        1155 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     221             : 
     222        1159 :         Message msg;
     223        1159 :         msg.is_mobile = isMobile_;
     224        1159 :         msg.response = std::move(toResponse);
     225             : 
     226        1157 :         msgpack::sbuffer buffer((size_t) 60000);
     227        1162 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     228        1161 :         pk.pack(msg);
     229             : 
     230        1159 :         std::error_code ec;
     231        1154 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     232        1162 :         if (ec) {
     233           0 :             JAMI_ERROR("{}", ec.message());
     234           0 :             return;
     235             :         }
     236        1162 :     }
     237        1162 : }
     238             : 
     239             : void
     240        1524 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     241             : {
     242        1524 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
     243        2301 :         [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
     244        2301 :             auto shared = w.lock();
     245        2303 :             auto socket = wsocket.lock();
     246        2302 :             if (!shared || !socket)
     247           0 :                 return std::make_error_code(std::errc::operation_canceled);
     248             : 
     249        2297 :             if (msg.is_mobile)
     250         219 :                 shared->changeMobility(socket->deviceId(), msg.is_mobile);
     251             : 
     252        2299 :             if (msg.request) {
     253        1154 :                 shared->sendAnswer(socket, msg);
     254             : 
     255        1155 :             } else if (msg.response) {
     256        1157 :                 shared->setKnownNodes(msg.response->nodes);
     257        1158 :                 shared->setMobileNodes(msg.response->mobile_nodes);
     258             :             }
     259        2319 :             return std::error_code();
     260        2319 :         }));
     261             : 
     262        1524 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
     263         838 :         dht::ThreadPool::io().run([w, deviceId] {
     264         837 :             auto shared = w.lock();
     265         837 :             if (shared && !shared->isShutdown_) {
     266         436 :                 shared->removeNode(deviceId);
     267             :             }
     268         838 :         });
     269         839 :     });
     270        1523 : }
     271             : 
     272             : void
     273        1183 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     274             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     275             :                               NodeId node)
     276             : {
     277        1183 :     NodeId idToFind;
     278        1183 :     std::list<Bucket>::iterator bucket;
     279             : 
     280        1183 :     if (ec == asio::error::operation_aborted)
     281           0 :         return;
     282             : 
     283        1183 :     if (!node) {
     284           0 :         bucket = routing_table.findBucket(socket->deviceId());
     285           0 :         idToFind = bucket->randomId(rd);
     286             :     } else {
     287        1183 :         bucket = routing_table.findBucket(node);
     288        1183 :         idToFind = node;
     289             :     }
     290             : 
     291        1183 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     292             : 
     293        1183 :     if (!node) {
     294           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     295           0 :         nodeTimer.expires_after(FIND_PERIOD);
     296           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     297           0 :                                        shared_from_this(),
     298             :                                        std::placeholders::_1,
     299             :                                        socket,
     300           0 :                                        NodeId {}));
     301             :     }
     302             : }
     303             : 
     304             : void
     305        1039 : SwarmManager::tryConnect(const NodeId& nodeId)
     306             : {
     307        1039 :     if (needSocketCb_)
     308        1035 :         needSocketCb_(nodeId.toString(),
     309         890 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     310         890 :                           auto shared = w.lock();
     311         890 :                           if (!shared || shared->isShutdown_)
     312         246 :                               return true;
     313         643 :                           if (socket) {
     314         585 :                               shared->addChannel(socket);
     315         585 :                               return true;
     316             :                           }
     317          58 :                           std::unique_lock lk(shared->mutex);
     318          58 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     319          58 :                           bucket->removeConnectingNode(nodeId);
     320          58 :                           bucket->addKnownNode(nodeId);
     321          58 :                           bucket = shared->routing_table.findBucket(shared->getId());
     322         104 :                           if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
     323         104 :                               && shared->onConnectionChanged_) {
     324          34 :                               lk.unlock();
     325         136 :                               JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
     326          34 :                               shared->onConnectionChanged_(false);
     327             :                           }
     328          58 :                           return true;
     329         889 :                       });
     330        1041 : }
     331             : 
     332             : void
     333         419 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     334             : {
     335         419 :     routing_table.removeNode(nodeId);
     336         419 : }
     337             : 
     338             : std::vector<NodeId>
     339          16 : SwarmManager::getAllNodes() const
     340             : {
     341          16 :     std::lock_guard lock(mutex);
     342          32 :     return routing_table.getAllNodes();
     343          16 : }
     344             : 
     345             : std::vector<NodeId>
     346        1796 : SwarmManager::getConnectedNodes() const
     347             : {
     348        1796 :     std::lock_guard lock(mutex);
     349        3593 :     return routing_table.getConnectedNodes();
     350        1797 : }
     351             : 
     352             : bool
     353        2387 : SwarmManager::isConnected() const
     354             : {
     355        2387 :     std::lock_guard lock(mutex);
     356        4774 :     return !routing_table.isEmpty();
     357        2387 : }
     358             : 
     359             : void
     360          16 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     361             : {
     362             :     {
     363          16 :         std::lock_guard lock(mutex);
     364          27 :         for (const auto& node : nodes) {
     365          11 :             routing_table.deleteNode(node);
     366             :         }
     367          16 :     }
     368          16 :     maintainBuckets();
     369          16 : }
     370             : 
     371             : } // namespace jami

Generated by: LCOV version 1.14