LCOV - code coverage report
Current view: top level - foo/src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 210 222 94.6 %
Date: 2026-01-22 10:39:23 Functions: 35 37 94.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <dhtnet/channel_utils.h>
      21             : #include <opendht/thread_pool.h>
      22             : 
      23             : namespace jami {
      24             : 
      25             : using namespace swarm_protocol;
      26             : 
      27         581 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      28         581 :     : id_(id)
      29         581 :     , rd(rand)
      30         581 :     , toConnectCb_(toConnectCb)
      31             : {
      32         581 :     routing_table.setId(id);
      33         581 : }
      34             : 
      35         581 : SwarmManager::~SwarmManager()
      36             : {
      37         581 :     if (!isShutdown_)
      38         251 :         shutdown();
      39         581 : }
      40             : 
      41             : bool
      42        1568 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      43             : {
      44        1568 :     isShutdown_ = false;
      45        1571 :     std::vector<NodeId> newNodes;
      46             :     {
      47        1566 :         std::lock_guard lock(mutex);
      48        6306 :         for (const auto& nodeId : known_nodes) {
      49        4739 :             if (addKnownNode(nodeId)) {
      50        2778 :                 newNodes.emplace_back(nodeId);
      51             :             }
      52             :         }
      53        1567 :     }
      54             : 
      55        1571 :     if (newNodes.empty())
      56        1189 :         return false;
      57             : 
      58         381 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      59         384 :         auto shared = w.lock();
      60         384 :         if (!shared)
      61           0 :             return;
      62             :         // If we detect a new node which already got a TCP link
      63             :         // we can use it to speed-up the bootstrap (because opening
      64             :         // a new channel will be easy)
      65         384 :         std::set<NodeId> toConnect;
      66        3166 :         for (const auto& nodeId : newNodes) {
      67        2782 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      68         273 :                 toConnect.emplace(nodeId);
      69             :         }
      70         383 :         shared->maintainBuckets(toConnect);
      71         383 :     });
      72         384 :     return true;
      73        1573 : }
      74             : 
      75             : void
      76         736 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      77             : {
      78             :     {
      79         736 :         std::lock_guard lock(mutex);
      80         749 :         for (const auto& nodeId : mobile_nodes)
      81          12 :             addMobileNodes(nodeId);
      82         734 :     }
      83         735 : }
      84             : 
      85             : void
      86        1095 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      87             : {
      88             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      89        1095 :     if (channel) {
      90        1094 :         auto emit = false;
      91             :         {
      92        1094 :             std::lock_guard lock(mutex);
      93        1094 :             emit = routing_table.findBucket(getId())->isEmpty();
      94        1095 :             auto bucket = routing_table.findBucket(channel->deviceId());
      95        1093 :             if (routing_table.addNode(channel, bucket)) {
      96         761 :                 std::error_code ec;
      97         760 :                 resetNodeExpiry(ec, channel, id_);
      98             :             }
      99        1095 :         }
     100        1095 :         receiveMessage(channel);
     101        1094 :         if (emit && onConnectionChanged_) {
     102             :             // If it's the first channel we add, we're now connected!
     103        1368 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     104         342 :             onConnectionChanged_(true);
     105             :         }
     106             :     }
     107        1094 : }
     108             : 
     109             : void
     110         368 : SwarmManager::removeNode(const NodeId& nodeId)
     111             : {
     112         368 :     std::unique_lock lk(mutex);
     113         368 :     if (isConnectedWith(nodeId)) {
     114         354 :         removeNodeInternal(nodeId);
     115         354 :         lk.unlock();
     116         354 :         maintainBuckets();
     117             :     }
     118         368 : }
     119             : 
     120             : void
     121          16 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     122             : {
     123          16 :     std::lock_guard lock(mutex);
     124          16 :     auto bucket = routing_table.findBucket(nodeId);
     125          16 :     bucket->changeMobility(nodeId, isMobile);
     126          16 : }
     127             : 
     128             : bool
     129         368 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     130             : {
     131         368 :     return routing_table.hasNode(deviceId);
     132             : }
     133             : 
     134             : void
     135         631 : SwarmManager::shutdown()
     136             : {
     137         631 :     if (isShutdown_) {
     138          19 :         return;
     139             :     }
     140         612 :     isShutdown_ = true;
     141         612 :     std::lock_guard lock(mutex);
     142         612 :     routing_table.shutdownAllNodes();
     143         612 : }
     144             : 
     145             : void
     146          22 : SwarmManager::restart()
     147             : {
     148          22 :     isShutdown_ = false;
     149          22 : }
     150             : 
     151             : bool
     152        4739 : SwarmManager::addKnownNode(const NodeId& nodeId)
     153             : {
     154        4739 :     return routing_table.addKnownNode(nodeId);
     155             : }
     156             : 
     157             : void
     158          12 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     159             : {
     160          12 :     if (id_ != nodeId) {
     161          11 :         routing_table.addMobileNode(nodeId);
     162             :     }
     163          12 : }
     164             : 
     165             : void
     166         777 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     167             : {
     168         777 :     std::set<NodeId> nodes = toConnect;
     169         777 :     std::unique_lock lock(mutex);
     170         777 :     auto& buckets = routing_table.getBuckets();
     171        2455 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     172        1676 :         auto& bucket = *it;
     173        1676 :         bool myBucket = routing_table.contains(it, id_);
     174        2579 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     175         901 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     176        1678 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     177        1044 :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
     178        1803 :             for (auto& node : nodesToTry)
     179         761 :                 routing_table.addConnectingNode(node);
     180             : 
     181        1044 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182        1043 :         }
     183             :     }
     184         776 :     lock.unlock();
     185        1613 :     for (auto& node : nodes)
     186         835 :         tryConnect(node);
     187         776 : }
     188             : 
     189             : void
     190         761 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191             :                           const NodeId& nodeId,
     192             :                           Query q,
     193             :                           int numberNodes)
     194             : {
     195         761 :     dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
     196         761 :         msgpack::sbuffer buffer;
     197         761 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198         761 :         Message msg;
     199         761 :         msg.is_mobile = isMobile;
     200         761 :         msg.request = Request {q, numberNodes, nodeId};
     201         761 :         pk.pack(msg);
     202             : 
     203         761 :         std::error_code ec;
     204         761 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205         761 :         if (ec) {
     206           8 :             JAMI_ERROR("{}", ec.message());
     207             :         }
     208         761 :     });
     209         761 : }
     210             : 
     211             : void
     212         735 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     213             : {
     214         735 :     std::lock_guard lock(mutex);
     215             : 
     216         736 :     if (msg_.request->q == Query::FIND) {
     217         735 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     218         735 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     219         735 :         const auto& m_nodes = bucket->getMobileNodes();
     220         736 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     221             : 
     222         735 :         Message msg;
     223         735 :         msg.is_mobile = isMobile_;
     224         735 :         msg.response = std::move(toResponse);
     225             : 
     226         734 :         msgpack::sbuffer buffer((size_t) 60000);
     227         734 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     228         734 :         pk.pack(msg);
     229             : 
     230         734 :         std::error_code ec;
     231         734 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     232         736 :         if (ec) {
     233           0 :             JAMI_ERROR("{}", ec.message());
     234           0 :             return;
     235             :         }
     236         736 :     }
     237         736 : }
     238             : 
     239             : void
     240        1095 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     241             : {
     242        1098 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
     243        1467 :         [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
     244        1467 :             auto shared = w.lock();
     245        1465 :             auto socket = wsocket.lock();
     246        1464 :             if (!shared || !socket)
     247           0 :                 return std::make_error_code(std::errc::operation_canceled);
     248             : 
     249        1461 :             if (msg.is_mobile)
     250          16 :                 shared->changeMobility(socket->deviceId(), msg.is_mobile);
     251             : 
     252        1461 :             if (msg.request) {
     253         733 :                 shared->sendAnswer(socket, msg);
     254             : 
     255         727 :             } else if (msg.response) {
     256         728 :                 shared->setKnownNodes(msg.response->nodes);
     257         733 :                 shared->setMobileNodes(msg.response->mobile_nodes);
     258             :             }
     259        1469 :             return std::error_code();
     260        1470 :         }));
     261             : 
     262        1094 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
     263         673 :         dht::ThreadPool::io().run([w, deviceId] {
     264         674 :             auto shared = w.lock();
     265         674 :             if (shared && !shared->isShutdown_) {
     266         368 :                 shared->removeNode(deviceId);
     267             :             }
     268         673 :         });
     269         674 :     });
     270        1094 : }
     271             : 
     272             : void
     273         761 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     274             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     275             :                               NodeId node)
     276             : {
     277         761 :     NodeId idToFind;
     278         761 :     std::list<Bucket>::iterator bucket;
     279             : 
     280         760 :     if (ec == asio::error::operation_aborted)
     281           0 :         return;
     282             : 
     283         759 :     if (!node) {
     284           0 :         bucket = routing_table.findBucket(socket->deviceId());
     285           0 :         idToFind = bucket->randomId(rd);
     286             :     } else {
     287         761 :         bucket = routing_table.findBucket(node);
     288         761 :         idToFind = node;
     289             :     }
     290             : 
     291         761 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     292             : 
     293         761 :     if (!node) {
     294           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     295           0 :         nodeTimer.expires_after(FIND_PERIOD);
     296           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     297           0 :                                        shared_from_this(),
     298             :                                        std::placeholders::_1,
     299             :                                        socket,
     300           0 :                                        NodeId {}));
     301             :     }
     302             : }
     303             : 
     304             : void
     305         835 : SwarmManager::tryConnect(const NodeId& nodeId)
     306             : {
     307         835 :     if (needSocketCb_)
     308         833 :         needSocketCb_(nodeId.toString(),
     309         702 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     310         702 :                           auto shared = w.lock();
     311         701 :                           if (!shared || shared->isShutdown_)
     312         254 :                               return true;
     313         450 :                           if (socket) {
     314         376 :                               shared->addChannel(socket);
     315         375 :                               return true;
     316             :                           }
     317          73 :                           std::unique_lock lk(shared->mutex);
     318          74 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     319          74 :                           bucket->removeConnectingNode(nodeId);
     320          74 :                           bucket->addKnownNode(nodeId);
     321          74 :                           bucket = shared->routing_table.findBucket(shared->getId());
     322         132 :                           if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
     323         131 :                               && shared->onConnectionChanged_) {
     324          37 :                               lk.unlock();
     325         148 :                               JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
     326          37 :                               shared->onConnectionChanged_(false);
     327             :                           }
     328          74 :                           return true;
     329         704 :                       });
     330         835 : }
     331             : 
     332             : void
     333         354 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     334             : {
     335         354 :     routing_table.removeNode(nodeId);
     336         354 : }
     337             : 
     338             : std::vector<NodeId>
     339          17 : SwarmManager::getAllNodes() const
     340             : {
     341          17 :     std::lock_guard lock(mutex);
     342          34 :     return routing_table.getAllNodes();
     343          17 : }
     344             : 
     345             : std::vector<NodeId>
     346        1748 : SwarmManager::getConnectedNodes() const
     347             : {
     348        1748 :     std::lock_guard lock(mutex);
     349        3496 :     return routing_table.getConnectedNodes();
     350        1748 : }
     351             : 
     352             : bool
     353        2343 : SwarmManager::isConnected() const
     354             : {
     355        2343 :     std::lock_guard lock(mutex);
     356        4686 :     return !routing_table.isEmpty();
     357        2343 : }
     358             : 
     359             : void
     360          17 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     361             : {
     362             :     {
     363          17 :         std::lock_guard lock(mutex);
     364          28 :         for (const auto& node : nodes) {
     365          11 :             routing_table.deleteNode(node);
     366             :         }
     367          17 :     }
     368          17 :     maintainBuckets();
     369          17 : }
     370             : 
     371             : } // namespace jami

Generated by: LCOV version 1.14