LCOV - code coverage report
Current view: top level - foo/src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 207 226 91.6 %
Date: 2025-08-24 09:11:10 Functions: 33 38 86.8 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <opendht/thread_pool.h>
      21             : 
      22             : namespace jami {
      23             : 
      24             : using namespace swarm_protocol;
      25             : 
      26         488 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      27         488 :     : id_(id)
      28         488 :     , rd(rand)
      29         488 :     , toConnectCb_(toConnectCb)
      30             : {
      31         488 :     routing_table.setId(id);
      32         488 : }
      33             : 
      34         486 : SwarmManager::~SwarmManager()
      35             : {
      36         486 :     if (!isShutdown_)
      37         222 :         shutdown();
      38         486 : }
      39             : 
      40             : bool
      41        1730 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      42             : {
      43        1730 :     isShutdown_ = false;
      44        1730 :     std::vector<NodeId> newNodes;
      45             :     {
      46        1730 :         std::lock_guard lock(mutex);
      47        4831 :         for (const auto& nodeId : known_nodes) {
      48        3101 :             if (addKnownNode(nodeId)) {
      49         992 :                 newNodes.emplace_back(nodeId);
      50             :             }
      51             :         }
      52        1730 :     }
      53             : 
      54        1730 :     if (newNodes.empty())
      55        1196 :         return false;
      56             : 
      57         534 :     dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
      58         534 :         auto shared = w.lock();
      59         534 :         if (!shared)
      60           0 :             return;
      61             :         // If we detect a new node which already got a TCP link
      62             :         // we can use it to speed-up the bootstrap (because opening
      63             :         // a new channel will be easy)
      64         534 :         std::set<NodeId> toConnect;
      65        1526 :         for (const auto& nodeId : newNodes) {
      66         992 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      67         231 :                 toConnect.emplace(nodeId);
      68             :         }
      69         534 :         shared->maintainBuckets(toConnect);
      70         534 :     });
      71         534 :     return true;
      72        1730 : }
      73             : 
      74             : void
      75        1042 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      76             : {
      77             :     {
      78        1042 :         std::lock_guard lock(mutex);
      79        1058 :         for (const auto& nodeId : mobile_nodes)
      80          16 :             addMobileNodes(nodeId);
      81        1042 :     }
      82        1042 : }
      83             : 
      84             : void
      85        1517 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      86             : {
      87             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      88        1517 :     if (channel) {
      89        1517 :         auto emit = false;
      90             :         {
      91        1517 :             std::lock_guard lock(mutex);
      92        1517 :             emit = routing_table.findBucket(getId())->isEmpty();
      93        1517 :             auto bucket = routing_table.findBucket(channel->deviceId());
      94        1517 :             if (routing_table.addNode(channel, bucket)) {
      95        1062 :                 std::error_code ec;
      96        1062 :                 resetNodeExpiry(ec, channel, id_);
      97             :             }
      98        1517 :         }
      99        1517 :         receiveMessage(channel);
     100        1517 :         if (emit && onConnectionChanged_) {
     101             :             // If it's the first channel we add, we're now connected!
     102         795 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     103         265 :             onConnectionChanged_(true);
     104             :         }
     105             :     }
     106        1517 : }
     107             : 
     108             : void
     109         434 : SwarmManager::removeNode(const NodeId& nodeId)
     110             : {
     111         434 :     std::unique_lock lk(mutex);
     112         434 :     if (isConnectedWith(nodeId)) {
     113         399 :         removeNodeInternal(nodeId);
     114         399 :         lk.unlock();
     115         399 :         maintainBuckets();
     116             :     }
     117         434 : }
     118             : 
     119             : void
     120         198 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     121             : {
     122         198 :     std::lock_guard lock(mutex);
     123         198 :     auto bucket = routing_table.findBucket(nodeId);
     124         198 :     bucket->changeMobility(nodeId, isMobile);
     125         198 : }
     126             : 
     127             : bool
     128         434 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     129             : {
     130         434 :     return routing_table.hasNode(deviceId);
     131             : }
     132             : 
     133             : void
     134         526 : SwarmManager::shutdown()
     135             : {
     136         526 :     if (isShutdown_) {
     137          16 :         return;
     138             :     }
     139         510 :     isShutdown_ = true;
     140         510 :     std::lock_guard lock(mutex);
     141         510 :     routing_table.shutdownAllNodes();
     142         510 : }
     143             : 
     144             : void
     145          14 : SwarmManager::restart()
     146             : {
     147          14 :     isShutdown_ = false;
     148          14 : }
     149             : 
     150             : bool
     151        3101 : SwarmManager::addKnownNode(const NodeId& nodeId)
     152             : {
     153        3101 :     return routing_table.addKnownNode(nodeId);
     154             : }
     155             : 
     156             : void
     157          16 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     158             : {
     159          16 :     if (id_ != nodeId) {
     160          15 :         routing_table.addMobileNode(nodeId);
     161             :     }
     162          16 : }
     163             : 
     164             : void
     165         963 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     166             : {
     167         963 :     std::set<NodeId> nodes = toConnect;
     168         963 :     std::unique_lock lock(mutex);
     169         963 :     auto& buckets = routing_table.getBuckets();
     170        3298 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     171        2335 :         auto& bucket = *it;
     172        2335 :         bool myBucket = routing_table.contains(it, id_);
     173        3707 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     174        1372 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     175        2335 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     176             :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
     177        1241 :                                                          rd);
     178        2127 :             for (auto& node : nodesToTry)
     179         886 :                 routing_table.addConnectingNode(node);
     180             : 
     181        1241 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     182        1241 :         }
     183             :     }
     184         963 :     lock.unlock();
     185        1919 :     for (auto& node : nodes)
     186         956 :         tryConnect(node);
     187         963 : }
     188             : 
     189             : void
     190        1062 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     191             :                           const NodeId& nodeId,
     192             :                           Query q,
     193             :                           int numberNodes)
     194             : {
     195        1062 :     dht::ThreadPool::io().run([socket, isMobile=isMobile_, nodeId, q, numberNodes] {
     196        1062 :         msgpack::sbuffer buffer;
     197        1062 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     198        1062 :         Message msg;
     199        1062 :         msg.is_mobile = isMobile;
     200        1062 :         msg.request = Request {q, numberNodes, nodeId};
     201        1062 :         pk.pack(msg);
     202             : 
     203        1062 :         std::error_code ec;
     204        1062 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     205        1062 :         if (ec) {
     206           3 :             JAMI_ERROR("{}", ec.message());
     207             :         }
     208        1062 :     });
     209        1062 : }
     210             : 
     211             : void
     212        1041 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     213             :                          const Message& msg_)
     214             : {
     215        1041 :     std::lock_guard lock(mutex);
     216             : 
     217        1041 :     if (msg_.request->q == Query::FIND) {
     218        1041 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     219        1041 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     220        1041 :         const auto& m_nodes = bucket->getMobileNodes();
     221        1041 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     222             : 
     223        1041 :         Message msg;
     224        1041 :         msg.is_mobile = isMobile_;
     225        1041 :         msg.response = std::move(toResponse);
     226             : 
     227        1041 :         msgpack::sbuffer buffer((size_t) 60000);
     228        1041 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     229        1041 :         pk.pack(msg);
     230             : 
     231        1041 :         std::error_code ec;
     232        1041 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     233        1041 :         if (ec) {
     234           0 :             JAMI_ERROR("{}", ec.message());
     235           0 :             return;
     236             :         }
     237        1041 :     }
     238        1041 : }
     239             : 
     240             : void
     241        1517 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     242             : {
     243             :     struct DecodingContext
     244             :     {
     245       17045 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     246             :                                nullptr,
     247             :                                512};
     248             :     };
     249             : 
     250        1517 :     socket->setOnRecv([w = weak(),
     251             :                        wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
     252             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     253        2081 :         ctx->pac.reserve_buffer(len);
     254        2081 :         std::copy_n(buf, len, ctx->pac.buffer());
     255        2081 :         ctx->pac.buffer_consumed(len);
     256             : 
     257        2081 :         msgpack::object_handle oh;
     258        4162 :         while (ctx->pac.next(oh)) {
     259        2081 :             auto shared = w.lock();
     260        2081 :             auto socket = wsocket.lock();
     261        2081 :             if (!shared || !socket)
     262           0 :                 return size_t {0};
     263             : 
     264             :             try {
     265        2081 :                 Message msg;
     266        2081 :                 oh.get().convert(msg);
     267             : 
     268        2081 :                 if (msg.is_mobile)
     269         198 :                     shared->changeMobility(socket->deviceId(), msg.is_mobile);
     270             : 
     271        2081 :                 if (msg.request) {
     272        1041 :                     shared->sendAnswer(socket, msg);
     273             : 
     274        1040 :                 } else if (msg.response) {
     275        1040 :                     shared->setKnownNodes(msg.response->nodes);
     276        1040 :                     shared->setMobileNodes(msg.response->mobile_nodes);
     277             :                 }
     278             : 
     279        2081 :             } catch (const std::exception& e) {
     280           0 :                 JAMI_WARNING("Error DRT recv: {}", e.what());
     281           0 :                 return len;
     282           0 :             }
     283        2081 :         }
     284             : 
     285        2081 :         return len;
     286        2081 :     });
     287             : 
     288        1517 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
     289        1072 :         dht::ThreadPool::io().run([w, deviceId] {
     290         736 :             auto shared = w.lock();
     291         736 :             if (shared && !shared->isShutdown_) {
     292         434 :                 shared->removeNode(deviceId);
     293             :             }
     294         736 :         });
     295        1072 :     });
     296        1517 : }
     297             : 
     298             : void
     299        1062 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     300             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     301             :                               NodeId node)
     302             : {
     303        1062 :     NodeId idToFind;
     304        1062 :     std::list<Bucket>::iterator bucket;
     305             : 
     306        1062 :     if (ec == asio::error::operation_aborted)
     307           0 :         return;
     308             : 
     309        1062 :     if (!node) {
     310           0 :         bucket = routing_table.findBucket(socket->deviceId());
     311           0 :         idToFind = bucket->randomId(rd);
     312             :     } else {
     313        1062 :         bucket = routing_table.findBucket(node);
     314        1062 :         idToFind = node;
     315             :     }
     316             : 
     317        1062 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     318             : 
     319        1062 :     if (!node) {
     320           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     321           0 :         nodeTimer.expires_after(FIND_PERIOD);
     322           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     323           0 :                                        shared_from_this(),
     324             :                                        std::placeholders::_1,
     325             :                                        socket,
     326           0 :                                        NodeId {}));
     327             :     }
     328             : }
     329             : 
     330             : void
     331         956 : SwarmManager::tryConnect(const NodeId& nodeId)
     332             : {
     333         956 :     if (needSocketCb_)
     334         951 :         needSocketCb_(nodeId.toString(),
     335         876 :                       [w = weak(),
     336             :                        nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     337         876 :                           auto shared = w.lock();
     338         876 :                           if (!shared || shared->isShutdown_)
     339         220 :                               return true;
     340         656 :                           if (socket) {
     341         599 :                               shared->addChannel(socket);
     342         599 :                               return true;
     343             :                           }
     344          57 :                           std::unique_lock lk(shared->mutex);
     345          57 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     346          57 :                           bucket->removeConnectingNode(nodeId);
     347          57 :                           bucket->addKnownNode(nodeId);
     348          57 :                           bucket = shared->routing_table.findBucket(shared->getId());
     349         104 :                           if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
     350         104 :                               && shared->onConnectionChanged_) {
     351          35 :                               lk.unlock();
     352         105 :                               JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed",
     353             :                                            fmt::ptr(shared.get()));
     354          35 :                               shared->onConnectionChanged_(false);
     355             :                           }
     356          57 :                           return true;
     357         876 :                       });
     358         956 : }
     359             : 
     360             : void
     361         399 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     362             : {
     363         399 :     routing_table.removeNode(nodeId);
     364         399 : }
     365             : 
     366             : std::vector<NodeId>
     367           0 : SwarmManager::getAllNodes() const
     368             : {
     369           0 :     std::lock_guard lock(mutex);
     370           0 :     return routing_table.getAllNodes();
     371           0 : }
     372             : 
     373             : void
     374          16 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     375             : {
     376             :     {
     377          16 :         std::lock_guard lock(mutex);
     378          27 :         for (const auto& node : nodes) {
     379          11 :             routing_table.deleteNode(node);
     380             :         }
     381          16 :     }
     382          16 :     maintainBuckets();
     383          16 : }
     384             : 
     385             : } // namespace jami

Generated by: LCOV version 1.14