LCOV - code coverage report
Current view: top level - foo/src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 209 239 87.4 %
Date: 2026-04-22 10:25:21 Functions: 33 38 86.8 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <dhtnet/channel_utils.h>
      21             : #include <opendht/thread_pool.h>
      22             : 
      23             : namespace jami {
      24             : 
      25             : using namespace swarm_protocol;
      26             : 
      27         593 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      28         593 :     : id_(id)
      29         593 :     , rd(rand)
      30         593 :     , toConnectCb_(toConnectCb)
      31             : {
      32         593 :     routing_table.setId(id);
      33         593 : }
      34             : 
      35         593 : SwarmManager::~SwarmManager()
      36             : {
      37         593 :     if (!isShutdown_)
      38         243 :         shutdown();
      39         593 : }
      40             : 
      41             : bool
      42        1718 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      43             : {
      44        1718 :     isShutdown_ = false;
      45        1720 :     std::vector<NodeId> newNodes;
      46             :     {
      47        1714 :         std::lock_guard lock(mutex);
      48        3871 :         for (const auto& nodeId : known_nodes) {
      49        2151 :             if (addKnownNode(nodeId)) {
      50         737 :                 newNodes.emplace_back(nodeId);
      51             :             }
      52             :         }
      53        1713 :     }
      54             : 
      55        1722 :     if (newNodes.empty())
      56        1152 :         return false;
      57             : 
      58         570 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      59         571 :         auto shared = w.lock();
      60         570 :         if (!shared)
      61           0 :             return;
      62             :         // If we detect a new node which already got a TCP link
      63             :         // we can use it to speed-up the bootstrap (because opening
      64             :         // a new channel will be easy)
      65         571 :         std::set<NodeId> toConnect;
      66        1311 :         for (const auto& nodeId : newNodes) {
      67         740 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      68         191 :                 toConnect.emplace(nodeId);
      69             :         }
      70         567 :         shared->maintainBuckets(toConnect);
      71         571 :     });
      72         571 :     return true;
      73        1723 : }
      74             : 
      75             : void
      76         942 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      77             : {
      78             :     {
      79         942 :         std::lock_guard lock(mutex);
      80         956 :         for (const auto& nodeId : mobile_nodes)
      81          13 :             addMobileNodes(nodeId);
      82         940 :     }
      83         943 : }
      84             : 
      85             : void
      86        1214 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      87             : {
      88             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      89        1214 :     if (channel) {
      90        1214 :         auto emit = false;
      91             :         {
      92        1214 :             std::lock_guard lock(mutex);
      93        1214 :             emit = routing_table.findBucket(getId())->isEmpty();
      94        1214 :             auto bucket = routing_table.findBucket(channel->deviceId());
      95        1214 :             if (routing_table.addNode(channel, bucket)) {
      96         963 :                 std::error_code ec;
      97         963 :                 resetNodeExpiry(ec, channel, id_);
      98             :             }
      99        1214 :         }
     100        1214 :         receiveMessage(channel);
     101        1214 :         if (emit && onConnectionChanged_) {
     102             :             // If it's the first channel we add, we're now connected!
     103         876 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     104         219 :             onConnectionChanged_(true);
     105             :         }
     106             :     }
     107        1214 : }
     108             : 
     109             : void
     110         339 : SwarmManager::removeNode(const NodeId& nodeId)
     111             : {
     112         339 :     std::unique_lock lk(mutex);
     113         341 :     if (isConnectedWith(nodeId)) {
     114         335 :         removeNodeInternal(nodeId);
     115         334 :         lk.unlock();
     116         335 :         maintainBuckets();
     117             :     }
     118         339 : }
     119             : 
     120             : void
     121         186 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     122             : {
     123         186 :     std::lock_guard lock(mutex);
     124         186 :     auto bucket = routing_table.findBucket(nodeId);
     125         186 :     bucket->changeMobility(nodeId, isMobile);
     126         186 : }
     127             : 
     128             : bool
     129         341 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     130             : {
     131         341 :     return routing_table.hasNode(deviceId);
     132             : }
     133             : 
     134             : void
     135         639 : SwarmManager::shutdown()
     136             : {
     137         639 :     if (isShutdown_) {
     138          16 :         return;
     139             :     }
     140         623 :     isShutdown_ = true;
     141         623 :     std::lock_guard lock(mutex);
     142         623 :     routing_table.shutdownAllNodes();
     143         623 : }
     144             : 
     145             : void
     146          22 : SwarmManager::restart()
     147             : {
     148          22 :     isShutdown_ = false;
     149          22 : }
     150             : 
     151             : bool
     152        2154 : SwarmManager::addKnownNode(const NodeId& nodeId)
     153             : {
     154        2154 :     return routing_table.addKnownNode(nodeId);
     155             : }
     156             : 
     157             : void
     158          13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     159             : {
     160          13 :     if (id_ != nodeId) {
     161          12 :         routing_table.addMobileNode(nodeId);
     162             :     }
     163          13 : }
     164             : 
     165             : void
     166         944 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     167             : {
     168         944 :     std::set<NodeId> nodes = toConnect;
     169         945 :     std::unique_lock lock(mutex);
     170         946 :     auto& buckets = routing_table.getBuckets();
     171        3121 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     172        2176 :         auto& bucket = *it;
     173        2170 :         bool myBucket = routing_table.contains(it, id_);
     174        3402 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     175        1227 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     176        2176 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     177        1315 :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
     178        2138 :             for (auto& node : nodesToTry)
     179         821 :                 routing_table.addConnectingNode(node);
     180             : 
     181        1308 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182        1315 :         }
     183             :     }
     184         945 :     lock.unlock();
     185        1810 :     for (const auto& node : nodes)
     186         863 :         tryConnect(node);
     187         945 : }
     188             : 
     189             : void
     190         963 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191             :                           const NodeId& nodeId,
     192             :                           Query q,
     193             :                           int numberNodes)
     194             : {
     195         963 :     dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
     196         962 :         msgpack::sbuffer buffer;
     197         963 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198         963 :         Message msg;
     199         963 :         msg.is_mobile = isMobile;
     200         963 :         msg.request = Request {q, numberNodes, nodeId};
     201         961 :         pk.pack(msg);
     202             : 
     203         961 :         std::error_code ec;
     204         963 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205         963 :         if (ec) {
     206           0 :             JAMI_ERROR("{}", ec.message());
     207             :         }
     208         963 :     });
     209         963 : }
     210             : 
     211             : void
     212         939 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     213             : {
     214         939 :     std::lock_guard lock(mutex);
     215             : 
     216         941 :     if (msg_.request->q == Query::FIND) {
     217         935 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     218         940 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     219         942 :         const auto& m_nodes = bucket->getMobileNodes();
     220         939 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     221             : 
     222         938 :         Message msg;
     223         938 :         msg.is_mobile = isMobile_;
     224         938 :         msg.response = std::move(toResponse);
     225             : 
     226         941 :         msgpack::sbuffer buffer((size_t) 60000);
     227         943 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     228         943 :         pk.pack(msg);
     229             : 
     230         940 :         std::error_code ec;
     231         941 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     232         943 :         if (ec) {
     233           0 :             JAMI_ERROR("{}", ec.message());
     234           0 :             return;
     235             :         }
     236         943 :     }
     237         942 : }
     238             : 
     239             : void
     240        1214 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     241             : {
     242        1214 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
     243        1874 :         [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
     244        1874 :             auto shared = w.lock();
     245        1872 :             auto socket = wsocket.lock();
     246        1872 :             if (!shared || !socket)
     247           0 :                 return std::make_error_code(std::errc::operation_canceled);
     248             : 
     249        1868 :             if (msg.is_mobile)
     250         186 :                 shared->changeMobility(socket->deviceId(), msg.is_mobile);
     251             : 
     252        1868 :             if (msg.request) {
     253         937 :                 shared->sendAnswer(socket, msg);
     254             : 
     255         932 :             } else if (msg.response) {
     256         933 :                 shared->setKnownNodes(msg.response->nodes);
     257         940 :                 shared->setMobileNodes(msg.response->mobile_nodes);
     258             :             }
     259        1883 :             return std::error_code();
     260        1881 :         }));
     261             : 
     262        1214 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
     263         628 :         dht::ThreadPool::io().run([w, deviceId] {
     264         628 :             auto shared = w.lock();
     265         626 :             if (shared && !shared->isShutdown_) {
     266         340 :                 shared->removeNode(deviceId);
     267             :             }
     268         628 :         });
     269         629 :     });
     270        1214 : }
     271             : 
     272             : void
     273         963 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     274             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     275             :                               NodeId node)
     276             : {
     277         963 :     NodeId idToFind;
     278         963 :     std::list<Bucket>::iterator bucket;
     279             : 
     280         962 :     if (ec == asio::error::operation_aborted)
     281           0 :         return;
     282             : 
     283         962 :     if (!node) {
     284           0 :         bucket = routing_table.findBucket(socket->deviceId());
     285           0 :         idToFind = bucket->randomId(rd);
     286             :     } else {
     287         963 :         bucket = routing_table.findBucket(node);
     288         963 :         idToFind = node;
     289             :     }
     290             : 
     291         963 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     292             : 
     293         963 :     if (!node) {
     294           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     295           0 :         nodeTimer.expires_after(FIND_PERIOD);
     296           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     297           0 :                                        shared_from_this(),
     298             :                                        std::placeholders::_1,
     299             :                                        socket,
     300           0 :                                        NodeId {}));
     301             :     }
     302             : }
     303             : 
     304             : void
     305         863 : SwarmManager::tryConnect(const NodeId& nodeId)
     306             : {
     307         863 :     if (needSocketCb_)
     308         857 :         needSocketCb_(nodeId.toString(),
     309         757 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     310         757 :                           auto shared = w.lock();
     311         757 :                           if (!shared || shared->isShutdown_)
     312         206 :                               return true;
     313         551 :                           if (socket) {
     314         475 :                               shared->addChannel(socket);
     315         475 :                               return true;
     316             :                           }
     317          76 :                           std::unique_lock lk(shared->mutex);
     318          76 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     319          76 :                           bucket->removeConnectingNode(nodeId);
     320          76 :                           bucket->addKnownNode(nodeId);
     321          76 :                           bucket = shared->routing_table.findBucket(shared->getId());
     322         137 :                           if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
     323         137 :                               && shared->onConnectionChanged_) {
     324          47 :                               lk.unlock();
     325         188 :                               JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
     326          47 :                               shared->onConnectionChanged_(false);
     327             :                           }
     328          76 :                           return true;
     329         757 :                       });
     330         866 : }
     331             : 
     332             : void
     333         335 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     334             : {
     335         335 :     routing_table.removeNode(nodeId);
     336         334 : }
     337             : 
     338             : std::vector<NodeId>
     339          18 : SwarmManager::getAllNodes() const
     340             : {
     341          18 :     std::lock_guard lock(mutex);
     342          36 :     return routing_table.getAllNodes();
     343          18 : }
     344             : 
     345             : std::vector<NodeId>
     346        1692 : SwarmManager::getConnectedNodes() const
     347             : {
     348        1692 :     std::lock_guard lock(mutex);
     349        3383 :     return routing_table.getConnectedNodes();
     350        1692 : }
     351             : 
     352             : std::vector<std::map<std::string, std::string>>
     353           0 : SwarmManager::getRoutingTableInfo() const
     354             : {
     355           0 :     std::lock_guard lock(mutex);
     356           0 :     auto stats = routing_table.getRoutingTableStats();
     357           0 :     std::vector<std::map<std::string, std::string>> result;
     358           0 :     result.reserve(stats.size());
     359           0 :     for (const auto& stat : stats) {
     360           0 :         result.push_back({
     361           0 :             {"id",            stat.id                         },
     362           0 :             {"device",        stat.id                         },
     363           0 :             {"status",        stat.status                     },
     364           0 :             {"remoteAddress", stat.remoteAddress              },
     365           0 :             {"mobile",        stat.isMobile ? "true" : "false"}
     366             :         });
     367           0 :         if (stat.connectionTime != std::chrono::system_clock::time_point::min()) {
     368           0 :             auto tt = std::chrono::system_clock::to_time_t(stat.connectionTime);
     369           0 :             result.back().emplace("connectionTime", std::to_string(tt));
     370             :         }
     371             :     }
     372           0 :     return result;
     373           0 : }
     374             : 
     375             : bool
     376        2472 : SwarmManager::isConnected() const
     377             : {
     378        2472 :     std::lock_guard lock(mutex);
     379        4944 :     return !routing_table.isEmpty();
     380        2472 : }
     381             : 
     382             : void
     383          18 : SwarmManager::deleteNode(const std::vector<NodeId>& nodes)
     384             : {
     385             :     {
     386          18 :         std::lock_guard lock(mutex);
     387          20 :         for (const auto& node : nodes) {
     388           2 :             routing_table.deleteNode(node);
     389             :         }
     390          18 :     }
     391          18 :     maintainBuckets();
     392          18 : }
     393             : 
     394             : } // namespace jami

Generated by: LCOV version 1.14