LCOV - code coverage report
Current view: top level - test/unitTest/swarm - swarm_spread.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 228 235 97.0 %
Date: 2024-05-10 07:56:25 Functions: 35 37 94.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2024 Savoir-faire Linux Inc.
       3             :  *  Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com>
       4             :  *
       5             :  *  This program is free software; you can redistribute it and/or modify
       6             :  *  it under the terms of the GNU General Public License as published by
       7             :  *  the Free Software Foundation; either version 3 of the License, or
       8             :  *  (at your option) any later version.
       9             :  *
      10             :  *  This program is distributed in the hope that it will be useful,
      11             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      13             :  *  GNU General Public License for more details.
      14             :  *
      15             :  *  You should have received a copy of the GNU General Public License
      16             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : #include <cppunit/TestAssert.h>
      20             : #include <cppunit/TestFixture.h>
      21             : #include <cppunit/extensions/HelperMacros.h>
      22             : 
      23             : #include <algorithm>
      24             : #include <msgpack.hpp>
      25             : #include <opendht/thread_pool.h>
      26             : #include <opendht/utils.h>
      27             : 
      28             : #include <iostream>
      29             : #include <fstream>
      30             : #include <string>
      31             : 
      32             : #include "../../test_runner.h"
      33             : #include "jami.h"
      34             : #include "../common.h"
      35             : #include "jamidht/swarm/swarm_manager.h"
      36             : #include <dhtnet/multiplexed_socket.h>
      37             : #include "nodes.h"
      38             : 
      39             : using namespace std::string_literals;
      40             : using namespace std::chrono_literals;
      41             : using namespace dht;
      42             : using NodeId = dht::PkId;
      43             : 
      44             : namespace jami {
      45             : namespace test {
      46             : 
      47             : constexpr size_t nNodes = 10;
      48             : 
      49             : constexpr size_t BOOTSTRAP_SIZE = 2;
      50             : auto time = 30s;
      51             : 
      52             : int TOTAL_HOPS = 0;
      53             : int moyenne = 0;
      54             : int max = 0;
      55             : int min = 10000;
      56             : 
      57             : struct Message
      58             : {
      59             :     int identifier_; // message identifier
      60             :     int hops_ = 0;   // number of hops
      61         103 :     MSGPACK_DEFINE_MAP(identifier_, hops_);
      62             : };
      63             : 
      64             : struct Counter
      65             : {
      66           1 :     Counter(unsigned t)
      67           1 :         : target(t)
      68           1 :     {}
      69             :     const unsigned target;
      70             :     unsigned added {0};
      71             :     std::mutex mutex;
      72             :     std::condition_variable cv;
      73             : 
      74          10 :     void count()
      75             :     {
      76          10 :         std::lock_guard lock(mutex);
      77          10 :         ++added;
      78          10 :         if (added == target)
      79           1 :             cv.notify_one();
      80          10 :     }
      81             :     bool wait(std::chrono::steady_clock::duration timeout)
      82             :     {
      83             :         std::unique_lock lock(mutex);
      84             :         return cv.wait_for(lock, timeout, [&] { return added == target; });
      85             :     }
      86           1 :     void wait()
      87             :     {
      88           1 :         std::unique_lock lock(mutex);
      89           4 :         return cv.wait(lock, [&] { return added == target; });
      90           1 :     }
      91             : };
      92             : 
      93             : class SwarmMessageSpread : public CppUnit::TestFixture
      94             : {
      95             : public:
      96           2 :     ~SwarmMessageSpread() { libjami::fini(); }
      97           2 :     static std::string name() { return "SwarmMessageSpread"; }
      98             : 
      99             :     void setUp();
     100             :     void tearDown();
     101             : 
     102             : private:
     103             :     std::mt19937_64 rd {dht::crypto::getSeededRandomEngine<std::mt19937_64>()};
     104             :     std::mutex channelSocketsMtx_;
     105             :     std::condition_variable channelSocketsCv_;
     106             : 
     107             :     std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagers;
     108             :     std::map<NodeId,
     109             :              std::map<NodeId,
     110             :                       std::pair<std::shared_ptr<dhtnet::ChannelSocketTest>,
     111             :                                 std::shared_ptr<dhtnet::ChannelSocketTest>>>>
     112             :         channelSockets_;
     113             :     std::vector<NodeId> randomNodeIds;
     114             :     std::vector<std::shared_ptr<jami::SwarmManager>> swarmManagersShuffled;
     115             :     std::set<NodeId> discoveredNodes;
     116             :     std::map<NodeId, int> numberTimesReceived;
     117             :     std::map<NodeId, int> requestsReceived;
     118             :     std::map<NodeId, int> answersSent;
     119             : 
     120             :     int iterations = 0;
     121             : 
     122             :     void generateSwarmManagers();
     123             :     void needSocketCallBack(const std::shared_ptr<SwarmManager>& sm);
     124             :     void sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg);
     125             :     void receiveMessage(const NodeId nodeId, const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket);
     126             :     void relayMessageToRoutingTable(const NodeId nodeId, const NodeId sourceId, const Message msg);
     127             :     void updateHops(int hops);
     128             :     void crossNodes(NodeId nodeId);
     129             :     void displayBucketDistribution(const NodeId& id);
     130             :     void distribution();
     131             :     std::shared_ptr<jami::SwarmManager> getManager(const NodeId& id);
     132             : 
     133             :     void testWriteMessage();
     134             : 
     135           2 :     CPPUNIT_TEST_SUITE(SwarmMessageSpread);
     136           1 :     CPPUNIT_TEST(testWriteMessage);
     137           4 :     CPPUNIT_TEST_SUITE_END();
     138             : };
     139             : 
     140             : CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(SwarmMessageSpread, SwarmMessageSpread::name());
     141             : 
     142             : void
     143           1 : SwarmMessageSpread::setUp()
     144             : {
     145           1 :     libjami::init(
     146             :         libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG));
     147           1 :     if (not Manager::instance().initialized) {
     148           1 :         CPPUNIT_ASSERT(libjami::start("jami-sample.yml"));
     149             :     }
     150             : 
     151           1 :     generateSwarmManagers();
     152           1 : }
     153             : 
     154             : void
     155           1 : SwarmMessageSpread::tearDown()
     156           1 : {}
     157             : 
     158             : void
     159           1 : SwarmMessageSpread::generateSwarmManagers()
     160             : {
     161          11 :     for (size_t i = 0; i < nNodes; i++) {
     162          10 :         const NodeId node = Hash<32>::getRandom();
     163          68 :         auto sm = std::make_shared<SwarmManager>(node, rd, std::move([](auto) {return false;}));
     164          10 :         swarmManagers[node] = sm;
     165          10 :         randomNodeIds.emplace_back(node);
     166          10 :         swarmManagersShuffled.emplace_back(sm);
     167          10 :     }
     168           1 : }
     169             : 
     170             : std::shared_ptr<jami::SwarmManager>
     171          57 : SwarmMessageSpread::getManager(const NodeId& id)
     172             : {
     173          57 :     auto it = swarmManagers.find(id);
     174         112 :     return it == swarmManagers.end() ? nullptr : it->second;
     175             : }
     176             : 
     177             : void
     178           1 : SwarmMessageSpread::crossNodes(NodeId nodeId)
     179             : {
     180           1 :     std::list<NodeId> pendingNodes {nodeId};
     181           1 :     discoveredNodes.clear();
     182             : 
     183          64 :     for (const auto& curNode : pendingNodes) {
     184          63 :         if (discoveredNodes.emplace(curNode).second) {
     185          10 :             if (auto sm = getManager(curNode))
     186          72 :                 for (const auto& node : sm->getRoutingTable().getNodes()) {
     187          62 :                     pendingNodes.emplace_back(node);
     188          20 :                 }
     189             :         }
     190             :     }
     191           1 : }
     192             : 
     193             : void
     194          53 : SwarmMessageSpread::sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg)
     195             : {
     196          53 :     auto buffer = std::make_shared<msgpack::sbuffer>(32);
     197          53 :     msgpack::packer<msgpack::sbuffer> pk(buffer.get());
     198          53 :     pk.pack(msg);
     199             : 
     200          53 :     dht::ThreadPool::io().run([socket, buffer = std::move(buffer)] {
     201          53 :         std::error_code ec;
     202             :         // std::this_thread::sleep_for(std::chrono::milliseconds(50));
     203             : 
     204          53 :         socket->write(reinterpret_cast<const unsigned char*>(buffer->data()), buffer->size(), ec);
     205          53 :     });
     206          53 : }
     207             : 
     208             : void
     209          62 : SwarmMessageSpread::receiveMessage(const NodeId nodeId,
     210             :                                    const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     211             : {
     212             :     struct DecodingContext
     213             :     {
     214         100 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     215             :                                nullptr,
     216             :                                32};
     217             :     };
     218             : 
     219          62 :     socket->setOnRecv([this,
     220             :                        wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
     221             :                        ctx = std::make_shared<DecodingContext>(),
     222         236 :                        nodeId](const uint8_t* buf, size_t len) {
     223          53 :         auto socket = wsocket.lock();
     224          52 :         if (!socket)
     225           0 :             return 0lu;
     226             : 
     227          52 :         ctx->pac.reserve_buffer(len);
     228          52 :         std::copy_n(buf, len, ctx->pac.buffer());
     229          52 :         ctx->pac.buffer_consumed(len);
     230             : 
     231          53 :         msgpack::object_handle oh;
     232         105 :         while (ctx->pac.next(oh)) {
     233             :             try {
     234          48 :                 Message msg;
     235          48 :                 oh.get().convert(msg);
     236             : 
     237          50 :                 if (msg.identifier_ == 1) {
     238          50 :                     std::lock_guard lk(channelSocketsMtx_);
     239          53 :                     auto var = numberTimesReceived.find(nodeId);
     240          53 :                     iterations = iterations + 1;
     241             : 
     242          53 :                     if (var != numberTimesReceived.end()) {
     243          44 :                         var->second += 1;
     244             :                     } else {
     245           9 :                         Message msgToSend;
     246           9 :                         msgToSend.identifier_ = 1;
     247           9 :                         msgToSend.hops_ = msg.hops_ + 1;
     248           9 :                         numberTimesReceived[nodeId] = 1;
     249           9 :                         updateHops(msgToSend.hops_);
     250           9 :                         relayMessageToRoutingTable(nodeId, socket->deviceId(), msgToSend);
     251             :                     }
     252          53 :                     channelSocketsCv_.notify_all();
     253          53 :                 }
     254             : 
     255           0 :             } catch (const std::exception& e) {
     256           0 :                 JAMI_WARNING("Error DRT recv: {}", e.what());
     257           0 :                 return 0lu;
     258           0 :             }
     259             :         }
     260             : 
     261          53 :         return 0lu;
     262          53 :     });
     263          62 : };
     264             : 
     265             : void
     266           9 : SwarmMessageSpread::updateHops(int hops)
     267             : {
     268           9 :     if (hops > TOTAL_HOPS) {
     269           3 :         TOTAL_HOPS = hops;
     270             :     }
     271           9 : }
     272             : 
     273             : void
     274          10 : SwarmMessageSpread::needSocketCallBack(const std::shared_ptr<SwarmManager>& sm)
     275             : {
     276          10 :     sm->needSocketCb_ = [this, wsm = std::weak_ptr<SwarmManager>(sm)](const std::string& nodeId,
     277          38 :                                                                       auto&& onSocket) {
     278         114 :         dht::ThreadPool::io().run(
     279          76 :             [this, wsm = std::move(wsm), nodeId, onSocket = std::move(onSocket)] {
     280          38 :                 auto sm = wsm.lock();
     281          38 :                 if (!sm)
     282           0 :                     return;
     283             : 
     284          37 :                 NodeId node = dhtnet::DeviceId(nodeId);
     285          76 :                 if (auto smRemote = getManager(node)) {
     286          38 :                     auto myId = sm->getId();
     287          37 :                     std::unique_lock lk(channelSocketsMtx_);
     288          38 :                     auto& cstRemote = channelSockets_[node][myId];
     289          38 :                     auto& cstMe = channelSockets_[myId][node];
     290          38 :                     if (cstMe.second && cstMe.first)
     291           7 :                         return;
     292          31 :                     if (!cstMe.second) {
     293          31 :                         cstMe.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "test1", 1);
     294          31 :                         cstRemote.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "test1", 1);
     295             :                     }
     296          31 :                     if (!cstMe.first) {
     297          31 :                         cstRemote.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "swarm1", 0);
     298          31 :                         cstMe.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "swarm1", 0);
     299             :                     }
     300          31 :                     lk.unlock();
     301          31 :                     dhtnet::ChannelSocketTest::link(cstMe.second, cstRemote.second);
     302          31 :                     receiveMessage(myId, cstMe.second);
     303          31 :                     receiveMessage(node, cstRemote.second);
     304             :                     // std::this_thread::sleep_for(std::chrono::seconds(5));
     305          31 :                     dhtnet::ChannelSocketTest::link(cstMe.first, cstRemote.first);
     306          31 :                     smRemote->addChannel(cstRemote.first);
     307          31 :                     onSocket(cstMe.first);
     308          38 :                 }
     309          38 :             });
     310          57 :     };
     311          10 : }
     312             : 
     313             : void
     314           9 : SwarmMessageSpread::relayMessageToRoutingTable(const NodeId nodeId,
     315             :                                                const NodeId sourceId,
     316             :                                                const Message msg)
     317             : {
     318           9 :     auto swarmManager = getManager(nodeId);
     319           9 :     const auto& routingtable = swarmManager->getRoutingTable().getNodes();
     320          66 :     for (auto& node : routingtable) {
     321          57 :         if (node != sourceId) {
     322          48 :             auto channelToSend = channelSockets_[nodeId][node].second;
     323          48 :             sendMessage(channelToSend, msg);
     324          48 :         }
     325             :     }
     326           9 : }
     327             : 
     328             : void
     329           1 : SwarmMessageSpread::distribution()
     330             : {
     331           2 :     std::string const fileName("distrib_nodes_" + std::to_string(nNodes) + ".txt");
     332           1 :     std::ofstream myStream(fileName.c_str());
     333             : 
     334           1 :     std::vector<unsigned> dist(10);
     335           1 :     int mean = 0;
     336          11 :     for (const auto& sm : swarmManagers) {
     337          10 :         auto val = sm.second->getRoutingTable().getRoutingTableNodeCount();
     338          10 :         if (dist.size() <= val)
     339           0 :             dist.resize(val + 1);
     340             : 
     341          10 :         dist[val]++;
     342             :     }
     343             : 
     344          11 :     for (size_t i = 0; i < dist.size(); i++) {
     345             :         // std::cout << "Swarm Managers with " << i << " nodes: " << dist[i] << std::endl;
     346          10 :         if (myStream) {
     347          10 :             myStream << i << "," << dist[i] << std::endl;
     348             :         }
     349          10 :         mean += i * dist[i];
     350             :     }
     351           1 :     std::cout << "Le noeud avec le plus de noeuds dans sa routing table: " << dist.size()
     352           1 :               << std::endl;
     353           1 :     std::cout << "Moyenne de nombre de noeuds par Swarm: " << mean / (float) swarmManagers.size()
     354           1 :               << std::endl;
     355           1 : }
     356             : 
     357             : void
     358           3 : SwarmMessageSpread::displayBucketDistribution(const NodeId& id)
     359             : {
     360           6 :     std::string const fileName("distrib_rt_" + std::to_string(nNodes) + "_" + id.toString()
     361           3 :                                + ".txt");
     362           3 :     std::ofstream myStream(fileName.c_str());
     363             : 
     364           3 :     const auto& routingtable = swarmManagers[id]->getRoutingTable().getBuckets();
     365             : 
     366           3 :     std::cout << "Bucket distribution for node " << id << std::endl;
     367             : 
     368          13 :     for (auto it = routingtable.begin(); it != routingtable.end(); ++it) {
     369          10 :         auto lowerLimit = it->getLowerLimit().toString();
     370             : 
     371          10 :         std::string hex_prefix = lowerLimit.substr(0, 4); // extraire les deux premiers caractères
     372          10 :         std::cout << "Bucket " << hex_prefix << " has " << it->getNodesSize() << " nodes"
     373          10 :                   << std::endl;
     374             : 
     375          10 :         if (myStream) {
     376          10 :             myStream << hex_prefix << "," << it->getNodesSize() << std::endl;
     377             :         }
     378          10 :     }
     379           3 : }
     380             : 
     381             : void
     382           1 : SwarmMessageSpread::testWriteMessage()
     383             : {
     384           1 :     std::cout << "\ntestWriteMessage()" << std::endl;
     385          11 :     for (const auto& sm : swarmManagersShuffled) {
     386          10 :         needSocketCallBack(sm);
     387             :     }
     388             : 
     389           1 :     Counter counter(swarmManagers.size());
     390          11 :     for (const auto& sm : swarmManagers) {
     391          10 :         dht::ThreadPool::computation().run([&] {
     392          10 :             std::vector<NodeId> randIds(BOOTSTRAP_SIZE);
     393          10 :             std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1);
     394          10 :             std::generate(randIds.begin(), randIds.end(), [&] {
     395          19 :                 auto dev = randomNodeIds[distribution(rd)];
     396          20 :                 return dev;
     397             :             });
     398          10 :             sm.second->setKnownNodes(randIds);
     399          10 :             counter.count();
     400          10 :         });
     401             :     }
     402           1 :     counter.wait();
     403             : 
     404           1 :     std::this_thread::sleep_for(time);
     405             : 
     406           1 :     auto& firstNode = *channelSockets_.begin();
     407             : 
     408           1 :     crossNodes(swarmManagers.begin()->first);
     409           1 :     CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size());
     410             : 
     411           1 :     std::cout << "Sending First Message to " << firstNode.second.size() << std::endl;
     412           1 :     auto start = std::chrono::steady_clock::now();
     413             : 
     414           1 :     numberTimesReceived[firstNode.first] = 1;
     415             : 
     416           6 :     for (const auto& channel : firstNode.second) {
     417           5 :         if (channel.second.second) {
     418           5 :             sendMessage(channel.second.second, {1, 0});
     419             :         }
     420             :     }
     421             : 
     422           1 :     std::unique_lock lk(channelSocketsMtx_);
     423           1 :     bool ok = channelSocketsCv_.wait_for(lk, 1200s, [&] {
     424             :         std::cout << "\r"
     425           3 :                   << "Size of Received " << numberTimesReceived.size();
     426           3 :         return numberTimesReceived.size() == swarmManagers.size();
     427             :     });
     428           1 :     auto now = std::chrono::steady_clock::now();
     429             : 
     430           1 :     std::cout << "#########################################################################"
     431           1 :               << std::endl;
     432           2 :     std::cout << "Time for everyone to receive the message " << dht::print_duration(now - start)
     433           1 :               << std::endl;
     434           1 :     std::cout << " IS OK " << ok << std::endl;
     435             : 
     436             :     // ##############################################################################
     437             : 
     438          11 :     for (const auto& count : numberTimesReceived) {
     439          10 :         moyenne = moyenne + count.second;
     440             : 
     441          10 :         if (count.second > max) {
     442           3 :             max = count.second;
     443             :         }
     444             : 
     445          10 :         if (count.second < min) {
     446           1 :             min = count.second;
     447             :         }
     448             :     }
     449             : 
     450           1 :     auto it = channelSockets_.begin();
     451             : 
     452           1 :     displayBucketDistribution((*it).first);
     453           1 :     std::advance(it, swarmManagers.size() / 2);
     454           1 :     displayBucketDistribution((*it).first);
     455           1 :     std::advance(it, swarmManagers.size() / 2 - 1);
     456           1 :     displayBucketDistribution((*it).first);
     457             : 
     458           1 :     std::cout << "MOYENNE DE RECEPTION PAR NOEUD [ " << moyenne / (float) numberTimesReceived.size()
     459           1 :               << " ] " << std::endl;
     460           1 :     std::cout << "MAX DE RECEPTION PAR NOEUD     [ " << max << " ] " << std::endl;
     461           1 :     std::cout << "MIN DE RECEPTION PAR NOEUD     [ " << min << " ] " << std::endl;
     462             : 
     463           1 :     std::cout << "NOMBRE DE SAUTS DIRECTS        [ " << TOTAL_HOPS << " ] " << std::endl;
     464           1 :     std::cout << "NOMBRE D'ITERATIONS            [ " << iterations << " ] " << std::endl;
     465             : 
     466           1 :     distribution();
     467           1 :     std::cout << "#########################################################################"
     468           1 :               << std::endl;
     469             : 
     470           1 :     std::cout << "Number of times received " << numberTimesReceived.size() << std::endl;
     471           1 :     std::cout << "Number of swarm managers " << swarmManagers.size() << std::endl;
     472             : 
     473           1 :     CPPUNIT_ASSERT(true);
     474           1 : }
     475             : 
     476             : }; // namespace test
     477             : } // namespace jami
     478           1 : RING_TEST_RUNNER(jami::test::SwarmMessageSpread::name())

Generated by: LCOV version 1.14