LCOV - code coverage report
Current view: top level - src/jamidht/swarm - swarm_manager.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 208 226 92.0 %
Date: 2024-11-15 09:04:49 Functions: 32 37 86.5 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "swarm_manager.h"
      19             : #include <dhtnet/multiplexed_socket.h>
      20             : #include <opendht/thread_pool.h>
      21             : 
      22             : constexpr const std::chrono::minutes FIND_PERIOD {10};
      23             : 
      24             : namespace jami {
      25             : 
      26             : using namespace swarm_protocol;
      27             : 
      28         586 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
      29         586 :     : id_(id)
      30         586 :     , rd(rand)
      31         586 :     , toConnectCb_(toConnectCb)
      32             : {
      33         586 :     routing_table.setId(id);
      34         586 : }
      35             : 
      36         586 : SwarmManager::~SwarmManager()
      37             : {
      38         586 :     if (!isShutdown_)
      39         246 :         shutdown();
      40         586 : }
      41             : 
      42             : bool
      43        2248 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
      44             : {
      45        2248 :     isShutdown_ = false;
      46        2251 :     std::vector<NodeId> newNodes;
      47             :     {
      48        2245 :         std::lock_guard lock(mutex);
      49        8217 :         for (const auto& nodeId : known_nodes) {
      50        5958 :             if (addKnownNode(nodeId)) {
      51        2992 :                 newNodes.emplace_back(nodeId);
      52             :             }
      53             :         }
      54        2246 :     }
      55             : 
      56        2251 :     if (newNodes.empty())
      57        1716 :         return false;
      58             : 
      59         532 :     dht::ThreadPool::io().run([w=weak(), newNodes=std::move(newNodes)] {
      60         535 :         auto shared = w.lock();
      61         533 :         if (!shared)
      62           0 :             return;
      63             :         // If we detect a new node which already got a TCP link
      64             :         // we can use it to speed-up the bootstrap (because opening
      65             :         // a new channel will be easy)
      66         533 :         std::set<NodeId> toConnect;
      67        3525 :         for (const auto& nodeId: newNodes) {
      68        2995 :             if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
      69         512 :                 toConnect.emplace(nodeId);
      70             :         }
      71         531 :         shared->maintainBuckets(toConnect);
      72         535 :     });
      73         535 :     return true;
      74        2251 : }
      75             : 
      76             : void
      77        1413 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
      78             : {
      79             :     {
      80        1413 :         std::lock_guard lock(mutex);
      81        1426 :         for (const auto& nodeId : mobile_nodes)
      82          13 :             addMobileNodes(nodeId);
      83        1408 :     }
      84        1410 : }
      85             : 
      86             : void
      87        2088 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
      88             : {
      89             :     // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
      90        2088 :     if (channel) {
      91        2089 :         auto emit = false;
      92             :         {
      93        2089 :             std::lock_guard lock(mutex);
      94        2088 :             emit = routing_table.findBucket(getId())->isEmpty();
      95        2089 :             auto bucket = routing_table.findBucket(channel->deviceId());
      96        2087 :             if (routing_table.addNode(channel, bucket)) {
      97        1445 :                 std::error_code ec;
      98        1444 :                 resetNodeExpiry(ec, channel, id_);
      99             :             }
     100        2089 :         }
     101        2088 :         receiveMessage(channel);
     102        2089 :         if (emit && onConnectionChanged_) {
     103             :             // If it's the first channel we add, we're now connected!
     104        1349 :             JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
     105         451 :             onConnectionChanged_(true);
     106             :         }
     107             :     }
     108        2089 : }
     109             : 
     110             : void
     111         574 : SwarmManager::removeNode(const NodeId& nodeId)
     112             : {
     113         574 :     std::unique_lock lk(mutex);
     114         573 :     if (isConnectedWith(nodeId)) {
     115         543 :         removeNodeInternal(nodeId);
     116         548 :         lk.unlock();
     117         549 :         maintainBuckets();
     118             :     }
     119         574 : }
     120             : 
     121             : void
     122         191 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
     123             : {
     124         191 :     std::lock_guard lock(mutex);
     125         192 :     auto bucket = routing_table.findBucket(nodeId);
     126         191 :     bucket->changeMobility(nodeId, isMobile);
     127         191 : }
     128             : 
     129             : bool
     130         573 : SwarmManager::isConnectedWith(const NodeId& deviceId)
     131             : {
     132         573 :     return routing_table.hasNode(deviceId);
     133             : }
     134             : 
     135             : void
     136         651 : SwarmManager::shutdown()
     137             : {
     138         651 :     if (isShutdown_) {
     139          35 :         return;
     140             :     }
     141         616 :     isShutdown_ = true;
     142         616 :     std::lock_guard lock(mutex);
     143         616 :     routing_table.shutdownAllNodes();
     144         616 : }
     145             : 
     146             : void
     147          22 : SwarmManager::restart()
     148             : {
     149          22 :     isShutdown_ = false;
     150          22 : }
     151             : 
     152             : bool
     153        5960 : SwarmManager::addKnownNode(const NodeId& nodeId)
     154             : {
     155        5960 :     return routing_table.addKnownNode(nodeId);
     156             : }
     157             : 
     158             : void
     159          13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
     160             : {
     161          13 :     if (id_ != nodeId) {
     162          12 :         routing_table.addMobileNode(nodeId);
     163             :     }
     164          13 : }
     165             : 
     166             : void
     167        1122 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
     168             : {
     169        1122 :     std::set<NodeId> nodes = toConnect;
     170        1120 :     std::unique_lock lock(mutex);
     171        1121 :     auto& buckets = routing_table.getBuckets();
     172        3999 :     for (auto it = buckets.begin(); it != buckets.end(); ++it) {
     173        2877 :         auto& bucket = *it;
     174        2878 :         bool myBucket = routing_table.contains(it, id_);
     175        4632 :         auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
     176        1755 :                                          : bucket.getConnectingNodesSize() + bucket.getNodesSize();
     177        2876 :         if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
     178             :             auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
     179        1419 :                                                          rd);
     180        2311 :             for (auto& node : nodesToTry)
     181         895 :                 routing_table.addConnectingNode(node);
     182             : 
     183        1410 :             nodes.insert(nodesToTry.begin(), nodesToTry.end());
     184        1416 :         }
     185             :     }
     186        1123 :     lock.unlock();
     187        2306 :     for (auto& node : nodes)
     188        1180 :         tryConnect(node);
     189        1122 : }
     190             : 
     191             : void
     192        1444 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     193             :                           NodeId& nodeId,
     194             :                           Query q,
     195             :                           int numberNodes)
     196             : {
     197        1444 :     msgpack::sbuffer buffer;
     198        1445 :     msgpack::packer<msgpack::sbuffer> pk(&buffer);
     199        1445 :     std::error_code ec;
     200             : 
     201        1445 :     Request toRequest {q, numberNodes, nodeId};
     202        1445 :     Message msg;
     203        1445 :     msg.is_mobile = isMobile_;
     204        1445 :     msg.request = std::move(toRequest);
     205             : 
     206        1444 :     pk.pack(msg);
     207             : 
     208        1443 :     socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     209             : 
     210        1445 :     if (ec) {
     211           8 :         JAMI_ERROR("{}", ec.message());
     212           3 :         return;
     213             :     }
     214        1448 : }
     215             : 
     216             : void
     217        1410 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
     218             : {
     219        1410 :     std::lock_guard lock(mutex);
     220             : 
     221        1412 :     if (msg_.request->q == Query::FIND) {
     222        1409 :         auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
     223        1415 :         auto bucket = routing_table.findBucket(msg_.request->nodeId);
     224        1412 :         const auto& m_nodes = bucket->getMobileNodes();
     225        1413 :         Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
     226             : 
     227        1411 :         Message msg;
     228        1411 :         msg.is_mobile = isMobile_;
     229        1411 :         msg.response = std::move(toResponse);
     230             : 
     231        1415 :         msgpack::sbuffer buffer((size_t) 60000);
     232        1416 :         msgpack::packer<msgpack::sbuffer> pk(&buffer);
     233        1415 :         pk.pack(msg);
     234             : 
     235        1415 :         std::error_code ec;
     236             : 
     237        1415 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     238        1416 :         if (ec) {
     239           0 :             JAMI_ERROR("{}", ec.message());
     240           0 :             return;
     241             :         }
     242        1416 :     }
     243             : 
     244             :     else {
     245             :     }
     246        1415 : }
     247             : 
     248             : void
     249        2089 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     250             : {
     251             :     struct DecodingContext
     252             :     {
     253       22819 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     254             :                                nullptr,
     255             :                                512};
     256             :     };
     257             : 
     258        2089 :     socket->setOnRecv([w = weak(),
     259             :                        wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
     260             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     261        2814 :         ctx->pac.reserve_buffer(len);
     262        2815 :         std::copy_n(buf, len, ctx->pac.buffer());
     263        2829 :         ctx->pac.buffer_consumed(len);
     264             : 
     265        2828 :         msgpack::object_handle oh;
     266        5646 :         while (ctx->pac.next(oh)) {
     267        2807 :             auto shared = w.lock();
     268        2809 :             auto socket = wsocket.lock();
     269        2812 :             if (!shared || !socket)
     270           1 :                 return size_t {0};
     271             : 
     272             :             try {
     273        2810 :                 Message msg;
     274        2810 :                 oh.get().convert(msg);
     275             : 
     276        2803 :                 if (msg.is_mobile)
     277         190 :                     shared->changeMobility(socket->deviceId(), msg.is_mobile);
     278             : 
     279        2805 :                 if (msg.request) {
     280        1409 :                     shared->sendAnswer(socket, msg);
     281             : 
     282        1403 :                 } else if (msg.response) {
     283        1404 :                     shared->setKnownNodes(msg.response->nodes);
     284        1410 :                     shared->setMobileNodes(msg.response->mobile_nodes);
     285             :                 }
     286             : 
     287        2821 :             } catch (const std::exception& e) {
     288           0 :                 JAMI_WARNING("Error DRT recv: {}", e.what());
     289           0 :                 return len;
     290           0 :             }
     291        2825 :         }
     292             : 
     293        2822 :         return len;
     294        2824 :     });
     295             : 
     296        2087 :     socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
     297        1466 :         dht::ThreadPool::io().run([w, deviceId] {
     298        1133 :             auto shared = w.lock();
     299        1132 :             if (shared && !shared->isShutdown_) {
     300         574 :                 shared->removeNode(deviceId);
     301             :             }
     302        1134 :         });
     303        1467 :     });
     304        2087 : }
     305             : 
     306             : void
     307        1444 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
     308             :                               const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
     309             :                               NodeId node)
     310             : {
     311        1444 :     NodeId idToFind;
     312        1443 :     std::list<Bucket>::iterator bucket;
     313             : 
     314        1444 :     if (ec == asio::error::operation_aborted)
     315           0 :         return;
     316             : 
     317        1443 :     if (!node) {
     318           0 :         bucket = routing_table.findBucket(socket->deviceId());
     319           0 :         idToFind = bucket->randomId(rd);
     320             :     } else {
     321        1444 :         bucket = routing_table.findBucket(node);
     322        1444 :         idToFind = node;
     323             :     }
     324             : 
     325        1444 :     sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
     326             : 
     327        1445 :     if (!node) {
     328           0 :         auto& nodeTimer = bucket->getNodeTimer(socket);
     329           0 :         nodeTimer.expires_after(FIND_PERIOD);
     330           0 :         nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
     331           0 :                                        shared_from_this(),
     332             :                                        std::placeholders::_1,
     333             :                                        socket,
     334           0 :                                        NodeId {}));
     335             :     }
     336             : }
     337             : 
     338             : void
     339        1180 : SwarmManager::tryConnect(const NodeId& nodeId)
     340             : {
     341        1180 :     if (needSocketCb_)
     342        1175 :         needSocketCb_(nodeId.toString(),
     343        1095 :                       [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
     344        1095 :                           auto shared = w.lock();
     345        1095 :                           if (!shared || shared->isShutdown_)
     346         222 :                               return true;
     347         872 :                           if (socket) {
     348         788 :                               shared->addChannel(socket);
     349         788 :                               return true;
     350             :                           }
     351          84 :                           std::unique_lock lk(shared->mutex);
     352          84 :                           auto bucket = shared->routing_table.findBucket(nodeId);
     353          84 :                           bucket->removeConnectingNode(nodeId);
     354          84 :                           bucket->addKnownNode(nodeId);
     355          84 :                           bucket = shared->routing_table.findBucket(shared->getId());
     356          84 :                           if (bucket->getConnectingNodesSize() == 0
     357          84 :                               && bucket->isEmpty() && shared->onConnectionChanged_) {
     358          38 :                               lk.unlock();
     359         114 :                               JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed",
     360             :                                            fmt::ptr(shared.get()));
     361          38 :                               shared->onConnectionChanged_(false);
     362             :                           }
     363          84 :                           return true;
     364        1094 :                       });
     365        1182 : }
     366             : 
     367             : void
     368         543 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
     369             : {
     370         543 :     routing_table.removeNode(nodeId);
     371         548 : }
     372             : 
     373             : std::vector<NodeId>
     374           0 : SwarmManager::getAllNodes() const
     375             : {
     376           0 :     std::lock_guard lock(mutex);
     377           0 :     return routing_table.getAllNodes();
     378           0 : }
     379             : 
     380             : void
     381          18 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
     382             : {
     383             :     {
     384          18 :         std::lock_guard lock(mutex);
     385          30 :         for (const auto& node : nodes) {
     386          12 :             routing_table.deleteNode(node);
     387             :         }
     388          18 :     }
     389          18 :     maintainBuckets();
     390          18 : }
     391             : 
     392             : } // namespace jami

Generated by: LCOV version 1.14