LCOV - code coverage report
Current view: top level - src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 204 223 91.5 %
Date: 2024-05-14 08:41:19 Functions: 31 36 86.1 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com>
       5             :  *
       6             :  *  This program is free software; you can redistribute it and/or modify
       7             :  *  it under the terms of the GNU General Public License as published by
       8             :  *  the Free Software Foundation; either version 3 of the License, or
       9             :  *  (at your option) any later version.
      10             :  *
      11             :  *  This program is distributed in the hope that it will be useful,
      12             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  *  GNU General Public License for more details.
      15             :  *
      16             :  *  You should have received a copy of the GNU General Public License
      17             :  *  along with this program; if not, write to the Free Software
      18             :  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "swarm_manager.h"
      22             : #include <dhtnet/multiplexed_socket.h>
      23             : #include <opendht/thread_pool.h>
      24             : 
      25             : constexpr const std::chrono::minutes FIND_PERIOD {10};
      26             : 
      27             : namespace jami {
      28             : 
      29             : using namespace swarm_protocol;
      30             : 
      31         402 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      32         402 :     : id_(id)
      33         402 :     , rd(rand)
      34         402 :     , toConnectCb_(toConnectCb)
      35             : {
      36         402 :     routing_table.setId(id);
      37         402 : }
      38             : 
      39         349 : SwarmManager::~SwarmManager()
      40             : {
      41         349 :     if (!isShutdown_)
      42         199 :         shutdown();
      43         349 : }
      44             : 
      45             : bool
      46        1761 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      47             : {
      48        1761 :     isShutdown_ = false;
      49        1760 :     std::vector<NodeId> newNodes;
      50             :     {
      51        1757 :         std::lock_guard lock(mutex);
      52        5040 :         for (const auto& nodeId : known_nodes) {
      53        3278 :             if (addKnownNode(nodeId)) {
      54         843 :                 newNodes.emplace_back(nodeId);
      55             :             }
      56             :         }
      57        1757 :     }
      58             : 
      59        1762 :     if (newNodes.empty())
      60        1346 :         return false;
      61             : 
      62         416 :     dht::ThreadPool::io().run([w=weak(), newNodes=std::move(newNodes)] {
      63         416 :         auto shared = w.lock();
      64         415 :         if (!shared)
      65           0 :             return;
      66             :         // If we detect a new node which already got a TCP link
      67             :         // we can use it to speed-up the bootstrap (because opening
      68             :         // a new channel will be easy)
      69         415 :         std::set<NodeId> toConnect;
      70        1261 :         for (const auto& nodeId: newNodes) {
      71         848 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      72         426 :                 toConnect.emplace(nodeId);
      73             :         }
      74         413 :         shared->maintainBuckets(toConnect);
      75         416 :     });
      76         415 :     return true;
      77        1761 : }
      78             : 
      79             : void
      80        1270 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      81             : {
      82             :     {
      83        1270 :         std::lock_guard lock(mutex);
      84        1283 :         for (const auto& nodeId : mobile_nodes)
      85          15 :             addMobileNodes(nodeId);
      86        1267 :     }
      87        1270 : }
      88             : 
      89             : void
      90        1845 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      91             : {
      92             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      93        1845 :     if (channel) {
      94        1845 :         auto emit = false;
      95             :         {
      96        1845 :             std::lock_guard lock(mutex);
      97        1845 :             emit = routing_table.findBucket(getId())->isEmpty();
      98        1844 :             auto bucket = routing_table.findBucket(channel->deviceId());
      99        1844 :             if (routing_table.addNode(channel, bucket)) {
     100        1290 :                 std::error_code ec;
     101        1289 :                 resetNodeExpiry(ec, channel, id_);
     102             :             }
     103        1845 :         }
     104        1845 :         receiveMessage(channel);
     105        1844 :         if (emit && onConnectionChanged_) {
     106             :             // If it's the first channel we add, we're now connected!
     107         903 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     108         301 :             onConnectionChanged_(true);
     109             :         }
     110             :     }
     111        1844 : }
     112             : 
     113             : void
     114         476 : SwarmManager::removeNode(const NodeId& nodeId)
     115             : {
     116         476 :     std::unique_lock lk(mutex);
     117         477 :     if (isConnectedWith(nodeId)) {
     118         463 :         removeNodeInternal(nodeId);
     119         460 :         lk.unlock();
     120         463 :         maintainBuckets();
     121             :     }
     122         479 : }
     123             : 
     124             : void
     125         201 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     126             : {
     127         201 :     std::lock_guard lock(mutex);
     128         202 :     auto bucket = routing_table.findBucket(nodeId);
     129         202 :     bucket->changeMobility(nodeId, isMobile);
     130         202 : }
     131             : 
     132             : bool
     133         476 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     134             : {
     135         476 :     return routing_table.hasNode(deviceId);
     136             : }
     137             : 
     138             : void
     139         430 : SwarmManager::shutdown()
     140             : {
     141         430 :     if (isShutdown_) {
     142          26 :         return;
     143             :     }
     144         404 :     isShutdown_ = true;
     145         404 :     std::lock_guard lock(mutex);
     146         404 :     routing_table.shutdownAllNodes();
     147         404 : }
     148             : 
     149             : bool
     150        3277 : SwarmManager::addKnownNode(const NodeId& nodeId)
     151             : {
     152        3277 :     return routing_table.addKnownNode(nodeId);
     153             : }
     154             : 
     155             : void
     156          15 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     157             : {
     158          15 :     if (id_ != nodeId) {
     159          14 :         routing_table.addMobileNode(nodeId);
     160             :     }
     161          15 : }
     162             : 
     163             : void
     164         893 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     165             : {
     166         893 :     std::set<NodeId> nodes = toConnect;
     167         894 :     std::unique_lock lock(mutex);
     168         895 :     auto& buckets = routing_table.getBuckets();
     169        3539 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     170        2644 :         auto& bucket = *it;
     171        2641 :         bool myBucket = routing_table.contains(it, id_);
     172        4397 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     173        1754 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     174        2646 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     175             :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
     176        1241 :                                                          rd);
     177        1954 :             for (auto& node : nodesToTry)
     178         713 :                 routing_table.addConnectingNode(node);
     179             : 
     180        1240 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     181        1241 :         }
     182             :     }
     183         895 :     lock.unlock();
     184        1898 :     for (auto& node : nodes)
     185        1003 :         tryConnect(node);
     186         894 : }
     187             : 
     188             : void
     189        1290 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     190             :                           NodeId& nodeId,
     191             :                           Query q,
     192             :                           int numberNodes)
     193             : {
     194        1290 :     msgpack::sbuffer buffer;
     195        1291 :     msgpack::packer<msgpack::sbuffer> pk(&buffer);
     196        1290 :     std::error_code ec;
     197             : 
     198        1289 :     Request toRequest {q, numberNodes, nodeId};
     199        1289 :     Message msg;
     200        1289 :     msg.is_mobile = isMobile_;
     201        1289 :     msg.request = std::move(toRequest);
     202             : 
     203        1291 :     pk.pack(msg);
     204             : 
     205        1289 :     socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     206             : 
     207        1291 :     if (ec) {
     208           3 :         JAMI_ERROR("{}", ec.message());
     209           1 :         return;
     210             :     }
     211        1292 : }
     212             : 
     213             : void
     214        1264 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     215             : {
     216        1264 :     std::lock_guard lock(mutex);
     217             : 
     218        1265 :     if (msg_.request->q == Query::FIND) {
     219        1259 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     220        1267 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     221        1267 :         const auto& m_nodes = bucket->getMobileNodes();
     222        1264 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     223             : 
     224        1266 :         Message msg;
     225        1266 :         msg.is_mobile = isMobile_;
     226        1266 :         msg.response = std::move(toResponse);
     227             : 
     228        1267 :         msgpack::sbuffer buffer((size_t) 60000);
     229        1270 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     230        1269 :         pk.pack(msg);
     231             : 
     232        1269 :         std::error_code ec;
     233             : 
     234        1268 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     235        1270 :         if (ec) {
     236           0 :             JAMI_ERROR("{}", ec.message());
     237           0 :             return;
     238             :         }
     239        1270 :     }
     240             : 
     241             :     else {
     242             :     }
     243        1270 : }
     244             : 
     245             : void
     246        1845 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     247             : {
     248             :     struct DecodingContext
     249             :     {
     250       20666 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     251             :                                nullptr,
     252             :                                512};
     253             :     };
     254             : 
     255        1845 :     socket->setOnRecv([w = weak(),
     256             :                        wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
     257             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     258        2523 :         ctx->pac.reserve_buffer(len);
     259        2526 :         std::copy_n(buf, len, ctx->pac.buffer());
     260        2538 :         ctx->pac.buffer_consumed(len);
     261             : 
     262        2536 :         msgpack::object_handle oh;
     263        5068 :         while (ctx->pac.next(oh)) {
     264        2526 :             auto shared = w.lock();
     265        2525 :             auto socket = wsocket.lock();
     266        2516 :             if (!shared || !socket)
     267           0 :                 return size_t {0};
     268             : 
     269             :             try {
     270        2510 :                 Message msg;
     271        2510 :                 oh.get().convert(msg);
     272             : 
     273        2531 :                 if (msg.is_mobile)
     274         202 :                     shared->changeMobility(socket->deviceId(), msg.is_mobile);
     275             : 
     276        2531 :                 if (msg.request) {
     277        1263 :                     shared->sendAnswer(socket, msg);
     278             : 
     279        1263 :                 } else if (msg.response) {
     280        1264 :                     shared->setKnownNodes(msg.response->nodes);
     281        1268 :                     shared->setMobileNodes(msg.response->mobile_nodes);
     282             :                 }
     283             : 
     284        2535 :             } catch (const std::exception& e) {
     285           0 :                 JAMI_WARNING("Error DRT recv: {}", e.what());
     286           0 :                 return len;
     287           0 :             }
     288        2538 :         }
     289             : 
     290        2533 :         return len;
     291        2534 :     });
     292             : 
     293        1845 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
     294        1311 :         dht::ThreadPool::io().run([w, deviceId] {
     295         965 :             auto shared = w.lock();
     296         964 :             if (shared && !shared->isShutdown_) {
     297         476 :                 shared->removeNode(deviceId);
     298             :             }
     299         966 :         });
     300        1312 :     });
     301        1844 : }
     302             : 
     303             : void
     304        1289 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     305             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     306             :                               NodeId node)
     307             : {
     308        1289 :     NodeId idToFind;
     309        1288 :     std::list<Bucket>::iterator bucket;
     310             : 
     311        1288 :     if (ec == asio::error::operation_aborted)
     312           0 :         return;
     313             : 
     314        1289 :     if (!node) {
     315           0 :         bucket = routing_table.findBucket(socket->deviceId());
     316           0 :         idToFind = bucket->randomId(rd);
     317             :     } else {
     318        1291 :         bucket = routing_table.findBucket(node);
     319        1290 :         idToFind = node;
     320             :     }
     321             : 
     322        1290 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     323             : 
     324        1291 :     if (!node) {
     325           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     326           0 :         nodeTimer.expires_after(FIND_PERIOD);
     327           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     328           0 :                                        shared_from_this(),
     329             :                                        std::placeholders::_1,
     330             :                                        socket,
     331           0 :                                        NodeId {}));
     332             :     }
     333             : }
     334             : 
     335             : void
     336        1003 : SwarmManager::tryConnect(const NodeId& nodeId)
     337             : {
     338        1003 :     if (needSocketCb_)
     339         998 :         needSocketCb_(nodeId.toString(),
     340         940 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     341         940 :                           auto shared = w.lock();
     342         940 :                           if (!shared)
     343           4 :                               return true;
     344         936 :                           if (socket) {
     345         706 :                               shared->addChannel(socket);
     346         705 :                               return true;
     347             :                           }
     348         230 :                           std::unique_lock lk(shared->mutex);
     349         231 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     350         231 :                           bucket->removeConnectingNode(nodeId);
     351         231 :                           bucket->addKnownNode(nodeId);
     352         231 :                           bucket = shared->routing_table.findBucket(shared->getId());
     353         231 :                           if (bucket->getConnectingNodesSize() == 0
     354         231 :                               && bucket->isEmpty() && shared->onConnectionChanged_) {
     355         133 :                               lk.unlock();
     356         399 :                               JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed",
     357             :                                            fmt::ptr(shared.get()));
     358         133 :                               shared->onConnectionChanged_(false);
     359             :                           }
     360         231 :                           return true;
     361         940 :                       });
     362        1002 : }
     363             : 
     364             : void
     365         463 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     366             : {
     367         463 :     routing_table.removeNode(nodeId);
     368         460 : }
     369             : 
     370             : std::vector<NodeId>
     371           0 : SwarmManager::getAllNodes() const
     372             : {
     373           0 :     std::lock_guard lock(mutex);
     374           0 :     return routing_table.getAllNodes();
     375           0 : }
     376             : 
     377             : void
     378           1 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     379             : {
     380             :     {
     381           1 :         std::lock_guard lock(mutex);
     382           2 :         for (const auto& node : nodes) {
     383           1 :             routing_table.deleteNode(node);
     384             :         }
     385           1 :     }
     386           1 :     maintainBuckets();
     387           1 : }
     388             : 
     389             : } // namespace jami

Generated by: LCOV version 1.14