LCOV - code coverage report
Current view: top level - test/unitTest/swarm - swarm_spread.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 228 235 97.0 %
Date: 2024-11-15 09:04:49 Functions: 35 37 94.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include <cppunit/TestAssert.h>
      19             : #include <cppunit/TestFixture.h>
      20             : #include <cppunit/extensions/HelperMacros.h>
      21             : 
      22             : #include <algorithm>
      23             : #include <msgpack.hpp>
      24             : #include <opendht/thread_pool.h>
      25             : #include <opendht/utils.h>
      26             : 
      27             : #include <iostream>
      28             : #include <fstream>
      29             : #include <string>
      30             : 
      31             : #include "../../test_runner.h"
      32             : #include "jami.h"
      33             : #include "../common.h"
      34             : #include "jamidht/swarm/swarm_manager.h"
      35             : #include <dhtnet/multiplexed_socket.h>
      36             : #include "nodes.h"
      37             : 
      38             : using namespace std::string_literals;
      39             : using namespace std::chrono_literals;
      40             : using namespace dht;
      41             : using NodeId = dht::PkId;
      42             : 
      43             : namespace jami {
      44             : namespace test {
      45             : 
      46             : constexpr size_t nNodes = 10;
      47             : 
      48             : constexpr size_t BOOTSTRAP_SIZE = 2;
      49             : auto time = 30s;
      50             : 
      51             : int TOTAL_HOPS = 0;
      52             : int moyenne = 0;
      53             : int max = 0;
      54             : int min = 10000;
      55             : 
      56             : struct Message
      57             : {
      58             :     int identifier_; // message identifier
      59             :     int hops_ = 0;   // number of hops
      60         106 :     MSGPACK_DEFINE_MAP(identifier_, hops_);
      61             : };
      62             : 
      63             : struct Counter
      64             : {
      65           1 :     Counter(unsigned t)
      66           1 :         : target(t)
      67           1 :     {}
      68             :     const unsigned target;
      69             :     unsigned added {0};
      70             :     std::mutex mutex;
      71             :     std::condition_variable cv;
      72             : 
      73          10 :     void count()
      74             :     {
      75          10 :         std::lock_guard lock(mutex);
      76          10 :         ++added;
      77          10 :         if (added == target)
      78           1 :             cv.notify_one();
      79          10 :     }
      80             :     bool wait(std::chrono::steady_clock::duration timeout)
      81             :     {
      82             :         std::unique_lock lock(mutex);
      83             :         return cv.wait_for(lock, timeout, [&] { return added == target; });
      84             :     }
      85           1 :     void wait()
      86             :     {
      87           1 :         std::unique_lock lock(mutex);
      88           4 :         return cv.wait(lock, [&] { return added == target; });
      89           1 :     }
      90             : };
      91             : 
      92             : class SwarmMessageSpread : public CppUnit::TestFixture
      93             : {
      94             : public:
      95           2 :     ~SwarmMessageSpread() { libjami::fini(); }
      96           2 :     static std::string name() { return "SwarmMessageSpread"; }
      97             : 
      98             :     void setUp();
      99             :     void tearDown();
     100             : 
     101             : private:
     102             :     std::mt19937_64 rd {dht::crypto::getSeededRandomEngine<std::mt19937_64>()};
     103             :     std::mutex channelSocketsMtx_;
     104             :     std::condition_variable channelSocketsCv_;
     105             : 
     106             :     std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagers;
     107             :     std::map<NodeId,
     108             :              std::map<NodeId,
     109             :                       std::pair<std::shared_ptr<dhtnet::ChannelSocketTest>,
     110             :                                 std::shared_ptr<dhtnet::ChannelSocketTest>>>>
     111             :         channelSockets_;
     112             :     std::vector<NodeId> randomNodeIds;
     113             :     std::vector<std::shared_ptr<jami::SwarmManager>> swarmManagersShuffled;
     114             :     std::set<NodeId> discoveredNodes;
     115             :     std::map<NodeId, int> numberTimesReceived;
     116             :     std::map<NodeId, int> requestsReceived;
     117             :     std::map<NodeId, int> answersSent;
     118             : 
     119             :     int iterations = 0;
     120             : 
     121             :     void generateSwarmManagers();
     122             :     void needSocketCallBack(const std::shared_ptr<SwarmManager>& sm);
     123             :     void sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg);
     124             :     void receiveMessage(const NodeId nodeId, const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket);
     125             :     void relayMessageToRoutingTable(const NodeId nodeId, const NodeId sourceId, const Message msg);
     126             :     void updateHops(int hops);
     127             :     void crossNodes(NodeId nodeId);
     128             :     void displayBucketDistribution(const NodeId& id);
     129             :     void distribution();
     130             :     std::shared_ptr<jami::SwarmManager> getManager(const NodeId& id);
     131             : 
     132             :     void testWriteMessage();
     133             : 
     134           2 :     CPPUNIT_TEST_SUITE(SwarmMessageSpread);
     135           1 :     CPPUNIT_TEST(testWriteMessage);
     136           4 :     CPPUNIT_TEST_SUITE_END();
     137             : };
     138             : 
     139             : CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(SwarmMessageSpread, SwarmMessageSpread::name());
     140             : 
     141             : void
     142           1 : SwarmMessageSpread::setUp()
     143             : {
     144           1 :     libjami::init(
     145             :         libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG));
     146           1 :     if (not Manager::instance().initialized) {
     147           1 :         CPPUNIT_ASSERT(libjami::start("jami-sample.yml"));
     148             :     }
     149             : 
     150           1 :     generateSwarmManagers();
     151           1 : }
     152             : 
     153             : void
     154           1 : SwarmMessageSpread::tearDown()
     155           1 : {}
     156             : 
     157             : void
     158           1 : SwarmMessageSpread::generateSwarmManagers()
     159             : {
     160          11 :     for (size_t i = 0; i < nNodes; i++) {
     161          10 :         const NodeId node = Hash<32>::getRandom();
     162          68 :         auto sm = std::make_shared<SwarmManager>(node, rd, std::move([](auto) {return false;}));
     163          10 :         swarmManagers[node] = sm;
     164          10 :         randomNodeIds.emplace_back(node);
     165          10 :         swarmManagersShuffled.emplace_back(sm);
     166          10 :     }
     167           1 : }
     168             : 
     169             : std::shared_ptr<jami::SwarmManager>
     170          60 : SwarmMessageSpread::getManager(const NodeId& id)
     171             : {
     172          60 :     auto it = swarmManagers.find(id);
     173         120 :     return it == swarmManagers.end() ? nullptr : it->second;
     174             : }
     175             : 
     176             : void
     177           1 : SwarmMessageSpread::crossNodes(NodeId nodeId)
     178             : {
     179           1 :     std::list<NodeId> pendingNodes {nodeId};
     180           1 :     discoveredNodes.clear();
     181             : 
     182          64 :     for (const auto& curNode : pendingNodes) {
     183          63 :         if (discoveredNodes.emplace(curNode).second) {
     184          10 :             if (auto sm = getManager(curNode))
     185          72 :                 for (const auto& node : sm->getRoutingTable().getNodes()) {
     186          62 :                     pendingNodes.emplace_back(node);
     187          20 :                 }
     188             :         }
     189             :     }
     190           1 : }
     191             : 
     192             : void
     193          53 : SwarmMessageSpread::sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg)
     194             : {
     195          53 :     auto buffer = std::make_shared<msgpack::sbuffer>(32);
     196          53 :     msgpack::packer<msgpack::sbuffer> pk(buffer.get());
     197          53 :     pk.pack(msg);
     198             : 
     199          53 :     dht::ThreadPool::io().run([socket, buffer = std::move(buffer)] {
     200          53 :         std::error_code ec;
     201             :         // std::this_thread::sleep_for(std::chrono::milliseconds(50));
     202             : 
     203          53 :         socket->write(reinterpret_cast<const unsigned char*>(buffer->data()), buffer->size(), ec);
     204          53 :     });
     205          53 : }
     206             : 
     207             : void
     208          62 : SwarmMessageSpread::receiveMessage(const NodeId nodeId,
     209             :                                    const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
     210             : {
     211             :     struct DecodingContext
     212             :     {
     213         102 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     214             :                                nullptr,
     215             :                                32};
     216             :     };
     217             : 
     218          62 :     socket->setOnRecv([this,
     219             :                        wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
     220             :                        ctx = std::make_shared<DecodingContext>(),
     221         239 :                        nodeId](const uint8_t* buf, size_t len) {
     222          52 :         auto socket = wsocket.lock();
     223          52 :         if (!socket)
     224           0 :             return 0lu;
     225             : 
     226          53 :         ctx->pac.reserve_buffer(len);
     227          52 :         std::copy_n(buf, len, ctx->pac.buffer());
     228          52 :         ctx->pac.buffer_consumed(len);
     229             : 
     230          52 :         msgpack::object_handle oh;
     231         105 :         while (ctx->pac.next(oh)) {
     232             :             try {
     233          50 :                 Message msg;
     234          50 :                 oh.get().convert(msg);
     235             : 
     236          53 :                 if (msg.identifier_ == 1) {
     237          53 :                     std::lock_guard lk(channelSocketsMtx_);
     238          53 :                     auto var = numberTimesReceived.find(nodeId);
     239          53 :                     iterations = iterations + 1;
     240             : 
     241          53 :                     if (var != numberTimesReceived.end()) {
     242          44 :                         var->second += 1;
     243             :                     } else {
     244           9 :                         Message msgToSend;
     245           9 :                         msgToSend.identifier_ = 1;
     246           9 :                         msgToSend.hops_ = msg.hops_ + 1;
     247           9 :                         numberTimesReceived[nodeId] = 1;
     248           9 :                         updateHops(msgToSend.hops_);
     249           9 :                         relayMessageToRoutingTable(nodeId, socket->deviceId(), msgToSend);
     250             :                     }
     251          53 :                     channelSocketsCv_.notify_all();
     252          53 :                 }
     253             : 
     254           0 :             } catch (const std::exception& e) {
     255           0 :                 JAMI_WARNING("Error DRT recv: {}", e.what());
     256           0 :                 return 0lu;
     257           0 :             }
     258             :         }
     259             : 
     260          53 :         return 0lu;
     261          53 :     });
     262          62 : };
     263             : 
     264             : void
     265           9 : SwarmMessageSpread::updateHops(int hops)
     266             : {
     267           9 :     if (hops > TOTAL_HOPS) {
     268           3 :         TOTAL_HOPS = hops;
     269             :     }
     270           9 : }
     271             : 
     272             : void
     273          10 : SwarmMessageSpread::needSocketCallBack(const std::shared_ptr<SwarmManager>& sm)
     274             : {
     275          10 :     sm->needSocketCb_ = [this, wsm = std::weak_ptr<SwarmManager>(sm)](const std::string& nodeId,
     276          41 :                                                                       auto&& onSocket) {
     277         123 :         dht::ThreadPool::io().run(
     278          82 :             [this, wsm = std::move(wsm), nodeId, onSocket = std::move(onSocket)] {
     279          40 :                 auto sm = wsm.lock();
     280          40 :                 if (!sm)
     281           0 :                     return;
     282             : 
     283          40 :                 NodeId node = dhtnet::DeviceId(nodeId);
     284          82 :                 if (auto smRemote = getManager(node)) {
     285          41 :                     auto myId = sm->getId();
     286          41 :                     std::unique_lock lk(channelSocketsMtx_);
     287          41 :                     auto& cstRemote = channelSockets_[node][myId];
     288          41 :                     auto& cstMe = channelSockets_[myId][node];
     289          41 :                     if (cstMe.second && cstMe.first)
     290          10 :                         return;
     291          31 :                     if (!cstMe.second) {
     292          31 :                         cstMe.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "test1", 1);
     293          31 :                         cstRemote.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "test1", 1);
     294             :                     }
     295          31 :                     if (!cstMe.first) {
     296          31 :                         cstRemote.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "swarm1", 0);
     297          31 :                         cstMe.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "swarm1", 0);
     298             :                     }
     299          31 :                     lk.unlock();
     300          31 :                     dhtnet::ChannelSocketTest::link(cstMe.second, cstRemote.second);
     301          31 :                     receiveMessage(myId, cstMe.second);
     302          31 :                     receiveMessage(node, cstRemote.second);
     303             :                     // std::this_thread::sleep_for(std::chrono::seconds(5));
     304          31 :                     dhtnet::ChannelSocketTest::link(cstMe.first, cstRemote.first);
     305          31 :                     smRemote->addChannel(cstRemote.first);
     306          31 :                     onSocket(cstMe.first);
     307          41 :                 }
     308          41 :             });
     309          61 :     };
     310          10 : }
     311             : 
     312             : void
     313           9 : SwarmMessageSpread::relayMessageToRoutingTable(const NodeId nodeId,
     314             :                                                const NodeId sourceId,
     315             :                                                const Message msg)
     316             : {
     317           9 :     auto swarmManager = getManager(nodeId);
     318           9 :     const auto& routingtable = swarmManager->getRoutingTable().getNodes();
     319          65 :     for (auto& node : routingtable) {
     320          56 :         if (node != sourceId) {
     321          47 :             auto channelToSend = channelSockets_[nodeId][node].second;
     322          47 :             sendMessage(channelToSend, msg);
     323          47 :         }
     324             :     }
     325           9 : }
     326             : 
     327             : void
     328           1 : SwarmMessageSpread::distribution()
     329             : {
     330           2 :     std::string const fileName("distrib_nodes_" + std::to_string(nNodes) + ".txt");
     331           1 :     std::ofstream myStream(fileName.c_str());
     332             : 
     333           1 :     std::vector<unsigned> dist(10);
     334           1 :     int mean = 0;
     335          11 :     for (const auto& sm : swarmManagers) {
     336          10 :         auto val = sm.second->getRoutingTable().getRoutingTableNodeCount();
     337          10 :         if (dist.size() <= val)
     338           0 :             dist.resize(val + 1);
     339             : 
     340          10 :         dist[val]++;
     341             :     }
     342             : 
     343          11 :     for (size_t i = 0; i < dist.size(); i++) {
     344             :         // std::cout << "Swarm Managers with " << i << " nodes: " << dist[i] << std::endl;
     345          10 :         if (myStream) {
     346          10 :             myStream << i << "," << dist[i] << std::endl;
     347             :         }
     348          10 :         mean += i * dist[i];
     349             :     }
     350           1 :     std::cout << "Le noeud avec le plus de noeuds dans sa routing table: " << dist.size()
     351           1 :               << std::endl;
     352           1 :     std::cout << "Moyenne de nombre de noeuds par Swarm: " << mean / (float) swarmManagers.size()
     353           1 :               << std::endl;
     354           1 : }
     355             : 
     356             : void
     357           3 : SwarmMessageSpread::displayBucketDistribution(const NodeId& id)
     358             : {
     359           6 :     std::string const fileName("distrib_rt_" + std::to_string(nNodes) + "_" + id.toString()
     360           3 :                                + ".txt");
     361           3 :     std::ofstream myStream(fileName.c_str());
     362             : 
     363           3 :     const auto& routingtable = swarmManagers[id]->getRoutingTable().getBuckets();
     364             : 
     365           3 :     std::cout << "Bucket distribution for node " << id << std::endl;
     366             : 
     367          14 :     for (auto it = routingtable.begin(); it != routingtable.end(); ++it) {
     368          11 :         auto lowerLimit = it->getLowerLimit().toString();
     369             : 
     370          11 :         std::string hex_prefix = lowerLimit.substr(0, 4); // extraire les deux premiers caractères
     371          11 :         std::cout << "Bucket " << hex_prefix << " has " << it->getNodesSize() << " nodes"
     372          11 :                   << std::endl;
     373             : 
     374          11 :         if (myStream) {
     375          11 :             myStream << hex_prefix << "," << it->getNodesSize() << std::endl;
     376             :         }
     377          11 :     }
     378           3 : }
     379             : 
     380             : void
     381           1 : SwarmMessageSpread::testWriteMessage()
     382             : {
     383           1 :     std::cout << "\ntestWriteMessage()" << std::endl;
     384          11 :     for (const auto& sm : swarmManagersShuffled) {
     385          10 :         needSocketCallBack(sm);
     386             :     }
     387             : 
     388           1 :     Counter counter(swarmManagers.size());
     389          11 :     for (const auto& sm : swarmManagers) {
     390          10 :         dht::ThreadPool::computation().run([&] {
     391          10 :             std::vector<NodeId> randIds(BOOTSTRAP_SIZE);
     392          10 :             std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1);
     393          10 :             std::generate(randIds.begin(), randIds.end(), [&] {
     394          20 :                 auto dev = randomNodeIds[distribution(rd)];
     395          20 :                 return dev;
     396             :             });
     397          10 :             sm.second->setKnownNodes(randIds);
     398          10 :             counter.count();
     399          10 :         });
     400             :     }
     401           1 :     counter.wait();
     402             : 
     403           1 :     std::this_thread::sleep_for(time);
     404             : 
     405           1 :     auto& firstNode = *channelSockets_.begin();
     406             : 
     407           1 :     crossNodes(swarmManagers.begin()->first);
     408           1 :     CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size());
     409             : 
     410           1 :     std::cout << "Sending First Message to " << firstNode.second.size() << std::endl;
     411           1 :     auto start = std::chrono::steady_clock::now();
     412             : 
     413           1 :     numberTimesReceived[firstNode.first] = 1;
     414             : 
     415           7 :     for (const auto& channel : firstNode.second) {
     416           6 :         if (channel.second.second) {
     417           6 :             sendMessage(channel.second.second, {1, 0});
     418             :         }
     419             :     }
     420             : 
     421           1 :     std::unique_lock lk(channelSocketsMtx_);
     422           1 :     bool ok = channelSocketsCv_.wait_for(lk, 1200s, [&] {
     423             :         std::cout << "\r"
     424           9 :                   << "Size of Received " << numberTimesReceived.size();
     425           9 :         return numberTimesReceived.size() == swarmManagers.size();
     426             :     });
     427           1 :     auto now = std::chrono::steady_clock::now();
     428             : 
     429           1 :     std::cout << "#########################################################################"
     430           1 :               << std::endl;
     431           2 :     std::cout << "Time for everyone to receive the message " << dht::print_duration(now - start)
     432           1 :               << std::endl;
     433           1 :     std::cout << " IS OK " << ok << std::endl;
     434             : 
     435             :     // ##############################################################################
     436             : 
     437          11 :     for (const auto& count : numberTimesReceived) {
     438          10 :         moyenne = moyenne + count.second;
     439             : 
     440          10 :         if (count.second > max) {
     441           2 :             max = count.second;
     442             :         }
     443             : 
     444          10 :         if (count.second < min) {
     445           2 :             min = count.second;
     446             :         }
     447             :     }
     448             : 
     449           1 :     auto it = channelSockets_.begin();
     450             : 
     451           1 :     displayBucketDistribution((*it).first);
     452           1 :     std::advance(it, swarmManagers.size() / 2);
     453           1 :     displayBucketDistribution((*it).first);
     454           1 :     std::advance(it, swarmManagers.size() / 2 - 1);
     455           1 :     displayBucketDistribution((*it).first);
     456             : 
     457           1 :     std::cout << "MOYENNE DE RECEPTION PAR NOEUD [ " << moyenne / (float) numberTimesReceived.size()
     458           1 :               << " ] " << std::endl;
     459           1 :     std::cout << "MAX DE RECEPTION PAR NOEUD     [ " << max << " ] " << std::endl;
     460           1 :     std::cout << "MIN DE RECEPTION PAR NOEUD     [ " << min << " ] " << std::endl;
     461             : 
     462           1 :     std::cout << "NOMBRE DE SAUTS DIRECTS        [ " << TOTAL_HOPS << " ] " << std::endl;
     463           1 :     std::cout << "NOMBRE D'ITERATIONS            [ " << iterations << " ] " << std::endl;
     464             : 
     465           1 :     distribution();
     466           1 :     std::cout << "#########################################################################"
     467           1 :               << std::endl;
     468             : 
     469           1 :     std::cout << "Number of times received " << numberTimesReceived.size() << std::endl;
     470           1 :     std::cout << "Number of swarm managers " << swarmManagers.size() << std::endl;
     471             : 
     472           1 :     CPPUNIT_ASSERT(true);
     473           1 : }
     474             : 
     475             : }; // namespace test
     476             : } // namespace jami
     477           1 : RING_TEST_RUNNER(jami::test::SwarmMessageSpread::name())

Generated by: LCOV version 1.14