LCOV - code coverage report
Current view: top level - src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 207 226 91.6 %
Date: 2024-12-21 08:56:24 Functions: 32 37 86.5 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <opendht/thread_pool.h>
      21             : 
      22             : constexpr const std::chrono::minutes FIND_PERIOD {10};
      23             : 
      24             : namespace jami {
      25             : 
      26             : using namespace swarm_protocol;
      27             : 
      28         585 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      29         585 :     : id_(id)
      30         585 :     , rd(rand)
      31         585 :     , toConnectCb_(toConnectCb)
      32             : {
      33         585 :     routing_table.setId(id);
      34         585 : }
      35             : 
      36         585 : SwarmManager::~SwarmManager()
      37             : {
      38         585 :     if (!isShutdown_)
      39         245 :         shutdown();
      40         585 : }
      41             : 
      42             : bool
      43        2246 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      44             : {
      45        2246 :     isShutdown_ = false;
      46        2254 :     std::vector<NodeId> newNodes;
      47             :     {
      48        2249 :         std::lock_guard lock(mutex);
      49        8243 :         for (const auto& nodeId : known_nodes) {
      50        5989 :             if (addKnownNode(nodeId)) {
      51        2991 :                 newNodes.emplace_back(nodeId);
      52             :             }
      53             :         }
      54        2246 :     }
      55             : 
      56        2255 :     if (newNodes.empty())
      57        1726 :         return false;
      58             : 
      59         528 :     dht::ThreadPool::io().run([w=weak(), newNodes=std::move(newNodes)] {
      60         528 :         auto shared = w.lock();
      61         527 :         if (!shared)
      62           0 :             return;
      63             :         // If we detect a new node which already got a TCP link
      64             :         // we can use it to speed-up the bootstrap (because opening
      65             :         // a new channel will be easy)
      66         527 :         std::set<NodeId> toConnect;
      67        3518 :         for (const auto& nodeId: newNodes) {
      68        2988 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      69         515 :                 toConnect.emplace(nodeId);
      70             :         }
      71         527 :         shared->maintainBuckets(toConnect);
      72         529 :     });
      73         529 :     return true;
      74        2255 : }
      75             : 
      76             : void
      77        1419 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      78             : {
      79             :     {
      80        1419 :         std::lock_guard lock(mutex);
      81        1436 :         for (const auto& nodeId : mobile_nodes)
      82          15 :             addMobileNodes(nodeId);
      83        1419 :     }
      84        1421 : }
      85             : 
      86             : void
      87        2114 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      88             : {
      89             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      90        2114 :     if (channel) {
      91        2114 :         auto emit = false;
      92             :         {
      93        2114 :             std::lock_guard lock(mutex);
      94        2114 :             emit = routing_table.findBucket(getId())->isEmpty();
      95        2114 :             auto bucket = routing_table.findBucket(channel->deviceId());
      96        2114 :             if (routing_table.addNode(channel, bucket)) {
      97        1449 :                 std::error_code ec;
      98        1449 :                 resetNodeExpiry(ec, channel, id_);
      99             :             }
     100        2114 :         }
     101        2114 :         receiveMessage(channel);
     102        2112 :         if (emit && onConnectionChanged_) {
     103             :             // If it's the first channel we add, we're now connected!
     104        1395 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     105         465 :             onConnectionChanged_(true);
     106             :         }
     107             :     }
     108        2112 : }
     109             : 
     110             : void
     111         584 : SwarmManager::removeNode(const NodeId& nodeId)
     112             : {
     113         584 :     std::unique_lock lk(mutex);
     114         584 :     if (isConnectedWith(nodeId)) {
     115         551 :         removeNodeInternal(nodeId);
     116         553 :         lk.unlock();
     117         553 :         maintainBuckets();
     118             :     }
     119         584 : }
     120             : 
     121             : void
     122         199 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     123             : {
     124         199 :     std::lock_guard lock(mutex);
     125         200 :     auto bucket = routing_table.findBucket(nodeId);
     126         200 :     bucket->changeMobility(nodeId, isMobile);
     127         199 : }
     128             : 
     129             : bool
     130         584 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     131             : {
     132         584 :     return routing_table.hasNode(deviceId);
     133             : }
     134             : 
     135             : void
     136         648 : SwarmManager::shutdown()
     137             : {
     138         648 :     if (isShutdown_) {
     139          36 :         return;
     140             :     }
     141         612 :     isShutdown_ = true;
     142         612 :     std::lock_guard lock(mutex);
     143         612 :     routing_table.shutdownAllNodes();
     144         612 : }
     145             : 
     146             : void
     147          19 : SwarmManager::restart()
     148             : {
     149          19 :     isShutdown_ = false;
     150          19 : }
     151             : 
     152             : bool
     153        5990 : SwarmManager::addKnownNode(const NodeId& nodeId)
     154             : {
     155        5990 :     return routing_table.addKnownNode(nodeId);
     156             : }
     157             : 
     158             : void
     159          15 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     160             : {
     161          15 :     if (id_ != nodeId) {
     162          14 :         routing_table.addMobileNode(nodeId);
     163             :     }
     164          15 : }
     165             : 
     166             : void
     167        1115 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     168             : {
     169        1115 :     std::set<NodeId> nodes = toConnect;
     170        1116 :     std::unique_lock lock(mutex);
     171        1118 :     auto& buckets = routing_table.getBuckets();
     172        4127 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     173        3006 :         auto& bucket = *it;
     174        3000 :         bool myBucket = routing_table.contains(it, id_);
     175        4904 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     176        1894 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     177        3010 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     178             :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
     179        1628 :                                                          rd);
     180        2518 :             for (auto& node : nodesToTry)
     181         886 :                 routing_table.addConnectingNode(node);
     182             : 
     183        1629 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     184        1628 :         }
     185             :     }
     186        1117 :     lock.unlock();
     187        2296 :     for (auto& node : nodes)
     188        1177 :         tryConnect(node);
     189        1117 : }
     190             : 
     191             : void
     192        1451 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     193             :                           NodeId& nodeId,
     194             :                           Query q,
     195             :                           int numberNodes)
     196             : {
     197        1451 :     msgpack::sbuffer buffer;
     198        1450 :     msgpack::packer<msgpack::sbuffer> pk(&buffer);
     199        1451 :     std::error_code ec;
     200             : 
     201        1451 :     Request toRequest {q, numberNodes, nodeId};
     202        1451 :     Message msg;
     203        1451 :     msg.is_mobile = isMobile_;
     204        1451 :     msg.request = std::move(toRequest);
     205             : 
     206        1451 :     pk.pack(msg);
     207             : 
     208        1446 :     socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     209             : 
     210        1451 :     if (ec) {
     211           9 :         JAMI_ERROR("{}", ec.message());
     212           3 :         return;
     213             :     }
     214        1454 : }
     215             : 
     216             : void
     217        1416 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     218             : {
     219        1416 :     std::lock_guard lock(mutex);
     220             : 
     221        1415 :     if (msg_.request->q == Query::FIND) {
     222        1414 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     223        1423 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     224        1423 :         const auto& m_nodes = bucket->getMobileNodes();
     225        1422 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     226             : 
     227        1419 :         Message msg;
     228        1419 :         msg.is_mobile = isMobile_;
     229        1419 :         msg.response = std::move(toResponse);
     230             : 
     231        1421 :         msgpack::sbuffer buffer((size_t) 60000);
     232        1422 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     233        1421 :         pk.pack(msg);
     234             : 
     235        1423 :         std::error_code ec;
     236             : 
     237        1423 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     238        1423 :         if (ec) {
     239           0 :             JAMI_ERROR("{}", ec.message());
     240           0 :             return;
     241             :         }
     242        1422 :     }
     243             : 
     244             :     else {
     245             :     }
     246        1421 : }
     247             : 
     248             : void
     249        2114 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     250             : {
     251             :     struct DecodingContext
     252             :     {
     253       23008 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     254             :                                nullptr,
     255             :                                512};
     256             :     };
     257             : 
     258        2114 :     socket->setOnRecv([w = weak(),
     259             :                        wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
     260             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     261        2837 :         ctx->pac.reserve_buffer(len);
     262        2836 :         std::copy_n(buf, len, ctx->pac.buffer());
     263        2841 :         ctx->pac.buffer_consumed(len);
     264             : 
     265        2840 :         msgpack::object_handle oh;
     266        5673 :         while (ctx->pac.next(oh)) {
     267        2819 :             auto shared = w.lock();
     268        2820 :             auto socket = wsocket.lock();
     269        2814 :             if (!shared || !socket)
     270           0 :                 return size_t {0};
     271             : 
     272             :             try {
     273        2811 :                 Message msg;
     274        2811 :                 oh.get().convert(msg);
     275             : 
     276        2827 :                 if (msg.is_mobile)
     277         199 :                     shared->changeMobility(socket->deviceId(), msg.is_mobile);
     278             : 
     279        2828 :                 if (msg.request) {
     280        1416 :                     shared->sendAnswer(socket, msg);
     281             : 
     282        1415 :                 } else if (msg.response) {
     283        1415 :                     shared->setKnownNodes(msg.response->nodes);
     284        1419 :                     shared->setMobileNodes(msg.response->mobile_nodes);
     285             :                 }
     286             : 
     287        2838 :             } catch (const std::exception& e) {
     288           0 :                 JAMI_WARNING("Error DRT recv: {}", e.what());
     289           0 :                 return len;
     290           0 :             }
     291        2841 :         }
     292             : 
     293        2837 :         return len;
     294        2837 :     });
     295             : 
     296        2113 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
     297        1485 :         dht::ThreadPool::io().run([w, deviceId] {
     298        1147 :             auto shared = w.lock();
     299        1147 :             if (shared && !shared->isShutdown_) {
     300         584 :                 shared->removeNode(deviceId);
     301             :             }
     302        1148 :         });
     303        1485 :     });
     304        2112 : }
     305             : 
     306             : void
     307        1451 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     308             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     309             :                               NodeId node)
     310             : {
     311        1451 :     NodeId idToFind;
     312        1450 :     std::list<Bucket>::iterator bucket;
     313             : 
     314        1449 :     if (ec == asio::error::operation_aborted)
     315           0 :         return;
     316             : 
     317        1450 :     if (!node) {
     318           0 :         bucket = routing_table.findBucket(socket->deviceId());
     319           0 :         idToFind = bucket->randomId(rd);
     320             :     } else {
     321        1450 :         bucket = routing_table.findBucket(node);
     322        1451 :         idToFind = node;
     323             :     }
     324             : 
     325        1451 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     326             : 
     327        1451 :     if (!node) {
     328           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     329           0 :         nodeTimer.expires_after(FIND_PERIOD);
     330           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     331           0 :                                        shared_from_this(),
     332             :                                        std::placeholders::_1,
     333             :                                        socket,
     334           0 :                                        NodeId {}));
     335             :     }
     336             : }
     337             : 
     338             : void
     339        1177 : SwarmManager::tryConnect(const NodeId& nodeId)
     340             : {
     341        1177 :     if (needSocketCb_)
     342        1172 :         needSocketCb_(nodeId.toString(),
     343        1091 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     344        1091 :                           auto shared = w.lock();
     345        1094 :                           if (!shared || shared->isShutdown_)
     346         218 :                               return true;
     347         873 :                           if (socket) {
     348         799 :                               shared->addChannel(socket);
     349         798 :                               return true;
     350             :                           }
     351          74 :                           std::unique_lock lk(shared->mutex);
     352          74 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     353          74 :                           bucket->removeConnectingNode(nodeId);
     354          74 :                           bucket->addKnownNode(nodeId);
     355          74 :                           bucket = shared->routing_table.findBucket(shared->getId());
     356          74 :                           if (bucket->getConnectingNodesSize() == 0
     357          74 :                               && bucket->isEmpty() && shared->onConnectionChanged_) {
     358          31 :                               lk.unlock();
     359          93 :                               JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed",
     360             :                                            fmt::ptr(shared.get()));
     361          31 :                               shared->onConnectionChanged_(false);
     362             :                           }
     363          74 :                           return true;
     364        1090 :                       });
     365        1178 : }
     366             : 
     367             : void
     368         552 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     369             : {
     370         552 :     routing_table.removeNode(nodeId);
     371         553 : }
     372             : 
     373             : std::vector<NodeId>
     374           0 : SwarmManager::getAllNodes() const
     375             : {
     376           0 :     std::lock_guard lock(mutex);
     377           0 :     return routing_table.getAllNodes();
     378           0 : }
     379             : 
     380             : void
     381          17 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     382             : {
     383             :     {
     384          17 :         std::lock_guard lock(mutex);
     385          29 :         for (const auto& node : nodes) {
     386          12 :             routing_table.deleteNode(node);
     387             :         }
     388          17 :     }
     389          17 :     maintainBuckets();
     390          17 : }
     391             : 
     392             : } // namespace jami

Generated by: LCOV version 1.14