LCOV - code coverage report
Current view: top level - src/jamidht/swarm - swarm_manager.cpp (source / functions) Coverage Total Hit
Test: jami-coverage-filtered.info Lines: 89.2 % 249 222
Test Date: 2026-06-13 09:18:46 Functions: 97.4 % 39 38

            Line data    Source code
       1              : /*
       2              :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3              :  *
       4              :  *  This program is free software: you can redistribute it and/or modify
       5              :  *  it under the terms of the GNU General Public License as published by
       6              :  *  the Free Software Foundation, either version 3 of the License, or
       7              :  *  (at your option) any later version.
       8              :  *
       9              :  *  This program is distributed in the hope that it will be useful,
      10              :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12              :  *  GNU General Public License for more details.
      13              :  *
      14              :  *  You should have received a copy of the GNU General Public License
      15              :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16              :  */
      17              : 
      18              : #include "swarm_manager.h"
      19              : #include <dhtnet/multiplexed_socket.h>
      20              : #include <dhtnet/channel_utils.h>
      21              : #include <opendht/thread_pool.h>
      22              : 
      23              : namespace jami {
      24              : 
      25              : using namespace swarm_protocol;
      26              : 
      27          592 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      28          592 :     : id_(id)
      29          592 :     , rd(rand)
      30          592 :     , toConnectCb_(toConnectCb)
      31              : {
      32          592 :     routing_table.setId(id);
      33          592 : }
      34              : 
      35          592 : SwarmManager::~SwarmManager()
      36              : {
      37          592 :     if (!isShutdown_)
      38          248 :         shutdown();
      39          592 : }
      40              : 
      41              : bool
      42         2028 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      43              : {
      44         2028 :     isShutdown_ = false;
      45         2026 :     std::vector<NodeId> newNodes;
      46              :     {
      47         2026 :         std::lock_guard lock(mutex);
      48         4757 :         for (const auto& nodeId : known_nodes) {
      49         2733 :             if (addKnownNode(nodeId)) {
      50          673 :                 newNodes.emplace_back(nodeId);
      51              :             }
      52              :         }
      53         2018 :     }
      54              : 
      55         2029 :     if (newNodes.empty())
      56         1509 :         return false;
      57              : 
      58          518 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      59          518 :         auto shared = w.lock();
      60          517 :         if (!shared)
      61            0 :             return;
      62              :         // If we detect a new node which already got a TCP link
      63              :         // we can use it to speed-up the bootstrap (because opening
      64              :         // a new channel will be easy)
      65          516 :         std::set<NodeId> toConnect;
      66         1192 :         for (const auto& nodeId : newNodes) {
      67          673 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      68          143 :                 toConnect.emplace(nodeId);
      69              :         }
      70          518 :         shared->maintainBuckets(toConnect);
      71          518 :     });
      72          519 :     return true;
      73         2028 : }
      74              : 
      75              : void
      76         1281 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      77              : {
      78              :     {
      79         1281 :         std::lock_guard lock(mutex);
      80         1294 :         for (const auto& nodeId : mobile_nodes)
      81           13 :             addMobileNodes(nodeId);
      82         1281 :     }
      83         1281 : }
      84              : 
      85              : void
      86         2180 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      87              : {
      88              :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      89         2180 :     if (channel) {
      90         2180 :         auto emit = false;
      91              :         {
      92         2180 :             std::lock_guard lock(mutex);
      93         2178 :             emit = routing_table.findBucket(getId())->isEmpty();
      94         2179 :             auto bucket = routing_table.findBucket(channel->deviceId());
      95         2180 :             if (routing_table.addNode(channel, bucket)) {
      96         1308 :                 std::error_code ec;
      97         1308 :                 resetNodeExpiry(ec, channel, id_);
      98              :             }
      99         2179 :         }
     100         2179 :         receiveMessage(channel);
     101         2180 :         if (emit && onConnectionChanged_) {
     102              :             // If it's the first channel we add, we're now connected!
     103         1256 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     104          314 :             onConnectionChanged_(true);
     105              :         }
     106              :     }
     107         2180 : }
     108              : 
     109              : void
     110          585 : SwarmManager::removeNode(const NodeId& nodeId)
     111              : {
     112          585 :     std::unique_lock lk(mutex);
     113          585 :     if (isConnectedWith(nodeId)) {
     114          508 :         removeNodeInternal(nodeId);
     115          508 :         lk.unlock();
     116          507 :         maintainBuckets();
     117              :     }
     118          586 : }
     119              : 
     120              : void
     121          195 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     122              : {
     123          195 :     std::lock_guard lock(mutex);
     124          196 :     auto bucket = routing_table.findBucket(nodeId);
     125          196 :     bucket->changeMobility(nodeId, isMobile);
     126          196 : }
     127              : 
     128              : bool
     129         1142 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     130              : {
     131         1142 :     return routing_table.hasNode(deviceId);
     132              : }
     133              : 
     134              : void
     135          642 : SwarmManager::shutdown()
     136              : {
     137          642 :     if (isShutdown_) {
     138           16 :         return;
     139              :     }
     140          626 :     isShutdown_ = true;
     141          626 :     std::lock_guard lock(mutex);
     142          626 :     routing_table.shutdownAllNodes();
     143          626 : }
     144              : 
     145              : void
     146           22 : SwarmManager::restart()
     147              : {
     148           22 :     isShutdown_ = false;
     149           22 : }
     150              : 
     151              : bool
     152         3287 : SwarmManager::addKnownNode(const NodeId& nodeId)
     153              : {
     154         3287 :     return routing_table.addKnownNode(nodeId);
     155              : }
     156              : 
     157              : void
     158           13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     159              : {
     160           13 :     if (id_ != nodeId) {
     161           12 :         routing_table.addMobileNode(nodeId);
     162              :     }
     163           13 : }
     164              : 
     165              : void
     166         1057 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     167              : {
     168         1057 :     std::set<NodeId> nodes = toConnect;
     169         1057 :     std::unique_lock lock(mutex);
     170         1059 :     auto& buckets = routing_table.getBuckets();
     171         4063 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     172         3010 :         auto& bucket = *it;
     173         3007 :         bool myBucket = routing_table.contains(it, id_);
     174         4961 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     175         1955 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     176         3005 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     177         1651 :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
     178         2439 :             for (auto& node : nodesToTry)
     179          785 :                 routing_table.addConnectingNode(node);
     180              : 
     181         1650 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182         1654 :         }
     183              :     }
     184         1059 :     lock.unlock();
     185         1864 :     for (const auto& node : nodes)
     186          805 :         tryConnect(node);
     187         1056 : }
     188              : 
     189              : void
     190         1308 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191              :                           const NodeId& nodeId,
     192              :                           Query q,
     193              :                           int numberNodes)
     194              : {
     195         1308 :     dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
     196         1308 :         msgpack::sbuffer buffer;
     197         1308 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198         1308 :         Message msg;
     199         1308 :         msg.is_mobile = isMobile;
     200         1308 :         msg.request = Request {q, numberNodes, nodeId};
     201         1308 :         pk.pack(msg);
     202              : 
     203         1308 :         std::error_code ec;
     204         1308 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205         1308 :         if (ec) {
     206            4 :             JAMI_ERROR("{}", ec.message());
     207              :         }
     208         1308 :     });
     209         1308 : }
     210              : 
     211              : void
     212         1276 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     213              : {
     214         1276 :     std::lock_guard lock(mutex);
     215              : 
     216         1278 :     if (msg_.request->q == Query::FIND) {
     217         1274 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     218         1283 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     219         1281 :         const auto& m_nodes = bucket->getMobileNodes();
     220         2561 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     221              : 
     222         1278 :         Message msg;
     223         1278 :         msg.is_mobile = isMobile_;
     224         1278 :         msg.response = std::move(toResponse);
     225              : 
     226         1280 :         msgpack::sbuffer buffer((size_t) 60000);
     227         1284 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     228         1283 :         pk.pack(msg);
     229              : 
     230         1280 :         std::error_code ec;
     231         1279 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     232         1284 :         if (ec) {
     233            8 :             JAMI_ERROR("{}", ec.message());
     234            2 :             return;
     235              :         }
     236         1290 :     }
     237         1284 : }
     238              : 
     239              : void
     240         2179 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     241              : {
     242         4359 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
     243         4359 :         [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
     244         2551 :             auto shared = w.lock();
     245         2548 :             auto socket = wsocket.lock();
     246         2555 :             if (!shared || !socket)
     247            0 :                 return std::make_error_code(std::errc::operation_canceled);
     248              : 
     249         2548 :             if (msg.is_mobile)
     250          195 :                 shared->changeMobility(socket->deviceId(), msg.is_mobile);
     251              : 
     252         2549 :             if (msg.request) {
     253         1276 :                 shared->sendAnswer(socket, msg);
     254              : 
     255         1274 :             } else if (msg.response) {
     256         1275 :                 shared->setKnownNodes(msg.response->nodes);
     257         1280 :                 shared->setMobileNodes(msg.response->mobile_nodes);
     258              :             }
     259         2562 :             return std::error_code();
     260         2561 :         }));
     261              : 
     262         2180 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
     263         1111 :         dht::ThreadPool::io().run([w, deviceId] {
     264         1114 :             auto shared = w.lock();
     265         1114 :             if (shared && !shared->isShutdown_) {
     266          586 :                 shared->removeNode(deviceId);
     267              :             }
     268         1116 :         });
     269         1117 :     });
     270         2179 : }
     271              : 
     272              : void
     273         1308 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     274              :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     275              :                               NodeId node)
     276              : {
     277         1308 :     NodeId idToFind;
     278         1308 :     std::list<Bucket>::iterator bucket;
     279              : 
     280         1308 :     if (ec == asio::error::operation_aborted)
     281            0 :         return;
     282              : 
     283         1308 :     if (!node) {
     284            0 :         bucket = routing_table.findBucket(socket->deviceId());
     285            0 :         idToFind = bucket->randomId(rd);
     286              :     } else {
     287         1308 :         bucket = routing_table.findBucket(node);
     288         1308 :         idToFind = node;
     289              :     }
     290              : 
     291         1308 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     292              : 
     293         1308 :     if (!node) {
     294            0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     295            0 :         nodeTimer.expires_after(FIND_PERIOD);
     296            0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     297            0 :                                        shared_from_this(),
     298              :                                        std::placeholders::_1,
     299              :                                        socket,
     300            0 :                                        NodeId {}));
     301              :     }
     302              : }
     303              : 
     304              : void
     305         1359 : SwarmManager::tryConnect(const NodeId& nodeId, bool noNewSocket)
     306              : {
     307         1359 :     if (needSocketCb_)
     308         1351 :         needSocketCb_(
     309         2706 :             nodeId.toString(),
     310         2706 :             [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     311         1267 :                 auto shared = w.lock();
     312         1267 :                 if (!shared || shared->isShutdown_)
     313          192 :                     return true;
     314         1075 :                 if (socket) {
     315          990 :                     shared->addChannel(socket);
     316          990 :                     return true;
     317              :                 }
     318           85 :                 std::unique_lock lk(shared->mutex);
     319           85 :                 auto bucket = shared->routing_table.findBucket(nodeId);
     320           85 :                 bucket->removeConnectingNode(nodeId);
     321           85 :                 bucket->addKnownNode(nodeId);
     322           85 :                 bucket = shared->routing_table.findBucket(shared->getId());
     323           85 :                 if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty() && shared->onConnectionChanged_) {
     324           41 :                     lk.unlock();
     325          164 :                     JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
     326           41 :                     shared->onConnectionChanged_(false);
     327              :                 }
     328           85 :                 return true;
     329         1267 :             },
     330              :             noNewSocket);
     331         1359 : }
     332              : 
     333              : void
     334          508 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     335              : {
     336          508 :     routing_table.removeNode(nodeId);
     337          506 : }
     338              : 
     339              : void
     340          559 : SwarmManager::connectNode(const NodeId& nodeId)
     341              : {
     342              :     {
     343          559 :         std::lock_guard lock(mutex);
     344          559 :         if (isShutdown_)
     345            1 :             return;
     346          558 :         if (isConnectedWith(nodeId))
     347            4 :             return;
     348          554 :         addKnownNode(nodeId);
     349          554 :         if (!routing_table.addConnectingNode(nodeId))
     350            0 :             return;
     351          559 :     }
     352          554 :     tryConnect(nodeId, true);
     353              : }
     354              : 
     355              : std::vector<NodeId>
     356           16 : SwarmManager::getAllNodes() const
     357              : {
     358           16 :     std::lock_guard lock(mutex);
     359           32 :     return routing_table.getAllNodes();
     360           16 : }
     361              : 
     362              : std::vector<NodeId>
     363         1765 : SwarmManager::getConnectedNodes() const
     364              : {
     365         1765 :     std::lock_guard lock(mutex);
     366         3530 :     return routing_table.getConnectedNodes();
     367         1765 : }
     368              : 
     369              : std::vector<std::map<std::string, std::string>>
     370            0 : SwarmManager::getRoutingTableInfo() const
     371              : {
     372            0 :     std::lock_guard lock(mutex);
     373            0 :     auto stats = routing_table.getRoutingTableStats();
     374            0 :     std::vector<std::map<std::string, std::string>> result;
     375            0 :     result.reserve(stats.size());
     376            0 :     for (const auto& stat : stats) {
     377            0 :         result.push_back({{"id", stat.id},
     378            0 :                           {"device", stat.id},
     379            0 :                           {"status", stat.status},
     380            0 :                           {"remoteAddress", stat.remoteAddress},
     381            0 :                           {"mobile", stat.isMobile ? "true" : "false"}});
     382            0 :         if (stat.connectionTime != std::chrono::system_clock::time_point::min()) {
     383            0 :             auto tt = std::chrono::system_clock::to_time_t(stat.connectionTime);
     384            0 :             result.back().emplace("connectionTime", std::to_string(tt));
     385              :         }
     386              :     }
     387            0 :     return result;
     388            0 : }
     389              : 
     390              : bool
     391         2632 : SwarmManager::isConnected() const
     392              : {
     393         2632 :     std::lock_guard lock(mutex);
     394         5264 :     return !routing_table.isEmpty();
     395         2632 : }
     396              : 
     397              : void
     398           11 : SwarmManager::deleteNode(const std::vector<NodeId>& nodes)
     399              : {
     400              :     {
     401           11 :         std::lock_guard lock(mutex);
     402           22 :         for (const auto& node : nodes) {
     403           11 :             routing_table.deleteNode(node);
     404              :         }
     405           11 :     }
     406           11 :     maintainBuckets();
     407           11 : }
     408              : 
     409              : } // namespace jami
        

Generated by: LCOV version 2.0-1