Line data Source code
1 : /*
2 : * Copyright (C) 2024 Savoir-faire Linux Inc.
3 : * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com>
4 : *
5 : * This program is free software; you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation; either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <cppunit/TestAssert.h>
20 : #include <cppunit/TestFixture.h>
21 : #include <cppunit/extensions/HelperMacros.h>
22 :
23 : #include <algorithm>
24 : #include <msgpack.hpp>
25 : #include <opendht/thread_pool.h>
26 : #include <opendht/utils.h>
27 :
28 : #include <iostream>
29 : #include <fstream>
30 : #include <string>
31 :
32 : #include "../../test_runner.h"
33 : #include "jami.h"
34 : #include "../common.h"
35 : #include "jamidht/swarm/swarm_manager.h"
36 : #include <dhtnet/multiplexed_socket.h>
37 : #include "nodes.h"
38 :
39 : using namespace std::string_literals;
40 : using namespace std::chrono_literals;
41 : using namespace dht;
42 : using NodeId = dht::PkId;
43 :
44 : namespace jami {
45 : namespace test {
46 :
47 : constexpr size_t nNodes = 10;
48 :
49 : constexpr size_t BOOTSTRAP_SIZE = 2;
50 : auto time = 30s;
51 :
52 : int TOTAL_HOPS = 0;
53 : int moyenne = 0;
54 : int max = 0;
55 : int min = 10000;
56 :
57 : struct Message
58 : {
59 : int identifier_; // message identifier
60 : int hops_ = 0; // number of hops
61 103 : MSGPACK_DEFINE_MAP(identifier_, hops_);
62 : };
63 :
64 : struct Counter
65 : {
66 1 : Counter(unsigned t)
67 1 : : target(t)
68 1 : {}
69 : const unsigned target;
70 : unsigned added {0};
71 : std::mutex mutex;
72 : std::condition_variable cv;
73 :
74 10 : void count()
75 : {
76 10 : std::lock_guard lock(mutex);
77 10 : ++added;
78 10 : if (added == target)
79 1 : cv.notify_one();
80 10 : }
81 : bool wait(std::chrono::steady_clock::duration timeout)
82 : {
83 : std::unique_lock lock(mutex);
84 : return cv.wait_for(lock, timeout, [&] { return added == target; });
85 : }
86 1 : void wait()
87 : {
88 1 : std::unique_lock lock(mutex);
89 4 : return cv.wait(lock, [&] { return added == target; });
90 1 : }
91 : };
92 :
93 : class SwarmMessageSpread : public CppUnit::TestFixture
94 : {
95 : public:
96 2 : ~SwarmMessageSpread() { libjami::fini(); }
97 2 : static std::string name() { return "SwarmMessageSpread"; }
98 :
99 : void setUp();
100 : void tearDown();
101 :
102 : private:
103 : std::mt19937_64 rd {dht::crypto::getSeededRandomEngine<std::mt19937_64>()};
104 : std::mutex channelSocketsMtx_;
105 : std::condition_variable channelSocketsCv_;
106 :
107 : std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagers;
108 : std::map<NodeId,
109 : std::map<NodeId,
110 : std::pair<std::shared_ptr<dhtnet::ChannelSocketTest>,
111 : std::shared_ptr<dhtnet::ChannelSocketTest>>>>
112 : channelSockets_;
113 : std::vector<NodeId> randomNodeIds;
114 : std::vector<std::shared_ptr<jami::SwarmManager>> swarmManagersShuffled;
115 : std::set<NodeId> discoveredNodes;
116 : std::map<NodeId, int> numberTimesReceived;
117 : std::map<NodeId, int> requestsReceived;
118 : std::map<NodeId, int> answersSent;
119 :
120 : int iterations = 0;
121 :
122 : void generateSwarmManagers();
123 : void needSocketCallBack(const std::shared_ptr<SwarmManager>& sm);
124 : void sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg);
125 : void receiveMessage(const NodeId nodeId, const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket);
126 : void relayMessageToRoutingTable(const NodeId nodeId, const NodeId sourceId, const Message msg);
127 : void updateHops(int hops);
128 : void crossNodes(NodeId nodeId);
129 : void displayBucketDistribution(const NodeId& id);
130 : void distribution();
131 : std::shared_ptr<jami::SwarmManager> getManager(const NodeId& id);
132 :
133 : void testWriteMessage();
134 :
135 2 : CPPUNIT_TEST_SUITE(SwarmMessageSpread);
136 1 : CPPUNIT_TEST(testWriteMessage);
137 4 : CPPUNIT_TEST_SUITE_END();
138 : };
139 :
140 : CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(SwarmMessageSpread, SwarmMessageSpread::name());
141 :
142 : void
143 1 : SwarmMessageSpread::setUp()
144 : {
145 1 : libjami::init(
146 : libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG));
147 1 : if (not Manager::instance().initialized) {
148 1 : CPPUNIT_ASSERT(libjami::start("jami-sample.yml"));
149 : }
150 :
151 1 : generateSwarmManagers();
152 1 : }
153 :
154 : void
155 1 : SwarmMessageSpread::tearDown()
156 1 : {}
157 :
158 : void
159 1 : SwarmMessageSpread::generateSwarmManagers()
160 : {
161 11 : for (size_t i = 0; i < nNodes; i++) {
162 10 : const NodeId node = Hash<32>::getRandom();
163 68 : auto sm = std::make_shared<SwarmManager>(node, rd, std::move([](auto) {return false;}));
164 10 : swarmManagers[node] = sm;
165 10 : randomNodeIds.emplace_back(node);
166 10 : swarmManagersShuffled.emplace_back(sm);
167 10 : }
168 1 : }
169 :
170 : std::shared_ptr<jami::SwarmManager>
171 57 : SwarmMessageSpread::getManager(const NodeId& id)
172 : {
173 57 : auto it = swarmManagers.find(id);
174 112 : return it == swarmManagers.end() ? nullptr : it->second;
175 : }
176 :
177 : void
178 1 : SwarmMessageSpread::crossNodes(NodeId nodeId)
179 : {
180 1 : std::list<NodeId> pendingNodes {nodeId};
181 1 : discoveredNodes.clear();
182 :
183 64 : for (const auto& curNode : pendingNodes) {
184 63 : if (discoveredNodes.emplace(curNode).second) {
185 10 : if (auto sm = getManager(curNode))
186 72 : for (const auto& node : sm->getRoutingTable().getNodes()) {
187 62 : pendingNodes.emplace_back(node);
188 20 : }
189 : }
190 : }
191 1 : }
192 :
193 : void
194 53 : SwarmMessageSpread::sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg)
195 : {
196 53 : auto buffer = std::make_shared<msgpack::sbuffer>(32);
197 53 : msgpack::packer<msgpack::sbuffer> pk(buffer.get());
198 53 : pk.pack(msg);
199 :
200 53 : dht::ThreadPool::io().run([socket, buffer = std::move(buffer)] {
201 53 : std::error_code ec;
202 : // std::this_thread::sleep_for(std::chrono::milliseconds(50));
203 :
204 53 : socket->write(reinterpret_cast<const unsigned char*>(buffer->data()), buffer->size(), ec);
205 53 : });
206 53 : }
207 :
208 : void
209 62 : SwarmMessageSpread::receiveMessage(const NodeId nodeId,
210 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
211 : {
212 : struct DecodingContext
213 : {
214 100 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
215 : nullptr,
216 : 32};
217 : };
218 :
219 62 : socket->setOnRecv([this,
220 : wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
221 : ctx = std::make_shared<DecodingContext>(),
222 236 : nodeId](const uint8_t* buf, size_t len) {
223 53 : auto socket = wsocket.lock();
224 52 : if (!socket)
225 0 : return 0lu;
226 :
227 52 : ctx->pac.reserve_buffer(len);
228 52 : std::copy_n(buf, len, ctx->pac.buffer());
229 52 : ctx->pac.buffer_consumed(len);
230 :
231 53 : msgpack::object_handle oh;
232 105 : while (ctx->pac.next(oh)) {
233 : try {
234 48 : Message msg;
235 48 : oh.get().convert(msg);
236 :
237 50 : if (msg.identifier_ == 1) {
238 50 : std::lock_guard lk(channelSocketsMtx_);
239 53 : auto var = numberTimesReceived.find(nodeId);
240 53 : iterations = iterations + 1;
241 :
242 53 : if (var != numberTimesReceived.end()) {
243 44 : var->second += 1;
244 : } else {
245 9 : Message msgToSend;
246 9 : msgToSend.identifier_ = 1;
247 9 : msgToSend.hops_ = msg.hops_ + 1;
248 9 : numberTimesReceived[nodeId] = 1;
249 9 : updateHops(msgToSend.hops_);
250 9 : relayMessageToRoutingTable(nodeId, socket->deviceId(), msgToSend);
251 : }
252 53 : channelSocketsCv_.notify_all();
253 53 : }
254 :
255 0 : } catch (const std::exception& e) {
256 0 : JAMI_WARNING("Error DRT recv: {}", e.what());
257 0 : return 0lu;
258 0 : }
259 : }
260 :
261 53 : return 0lu;
262 53 : });
263 62 : };
264 :
265 : void
266 9 : SwarmMessageSpread::updateHops(int hops)
267 : {
268 9 : if (hops > TOTAL_HOPS) {
269 3 : TOTAL_HOPS = hops;
270 : }
271 9 : }
272 :
273 : void
274 10 : SwarmMessageSpread::needSocketCallBack(const std::shared_ptr<SwarmManager>& sm)
275 : {
276 10 : sm->needSocketCb_ = [this, wsm = std::weak_ptr<SwarmManager>(sm)](const std::string& nodeId,
277 38 : auto&& onSocket) {
278 114 : dht::ThreadPool::io().run(
279 76 : [this, wsm = std::move(wsm), nodeId, onSocket = std::move(onSocket)] {
280 38 : auto sm = wsm.lock();
281 38 : if (!sm)
282 0 : return;
283 :
284 37 : NodeId node = dhtnet::DeviceId(nodeId);
285 76 : if (auto smRemote = getManager(node)) {
286 38 : auto myId = sm->getId();
287 37 : std::unique_lock lk(channelSocketsMtx_);
288 38 : auto& cstRemote = channelSockets_[node][myId];
289 38 : auto& cstMe = channelSockets_[myId][node];
290 38 : if (cstMe.second && cstMe.first)
291 7 : return;
292 31 : if (!cstMe.second) {
293 31 : cstMe.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "test1", 1);
294 31 : cstRemote.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "test1", 1);
295 : }
296 31 : if (!cstMe.first) {
297 31 : cstRemote.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "swarm1", 0);
298 31 : cstMe.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "swarm1", 0);
299 : }
300 31 : lk.unlock();
301 31 : dhtnet::ChannelSocketTest::link(cstMe.second, cstRemote.second);
302 31 : receiveMessage(myId, cstMe.second);
303 31 : receiveMessage(node, cstRemote.second);
304 : // std::this_thread::sleep_for(std::chrono::seconds(5));
305 31 : dhtnet::ChannelSocketTest::link(cstMe.first, cstRemote.first);
306 31 : smRemote->addChannel(cstRemote.first);
307 31 : onSocket(cstMe.first);
308 38 : }
309 38 : });
310 57 : };
311 10 : }
312 :
313 : void
314 9 : SwarmMessageSpread::relayMessageToRoutingTable(const NodeId nodeId,
315 : const NodeId sourceId,
316 : const Message msg)
317 : {
318 9 : auto swarmManager = getManager(nodeId);
319 9 : const auto& routingtable = swarmManager->getRoutingTable().getNodes();
320 66 : for (auto& node : routingtable) {
321 57 : if (node != sourceId) {
322 48 : auto channelToSend = channelSockets_[nodeId][node].second;
323 48 : sendMessage(channelToSend, msg);
324 48 : }
325 : }
326 9 : }
327 :
328 : void
329 1 : SwarmMessageSpread::distribution()
330 : {
331 2 : std::string const fileName("distrib_nodes_" + std::to_string(nNodes) + ".txt");
332 1 : std::ofstream myStream(fileName.c_str());
333 :
334 1 : std::vector<unsigned> dist(10);
335 1 : int mean = 0;
336 11 : for (const auto& sm : swarmManagers) {
337 10 : auto val = sm.second->getRoutingTable().getRoutingTableNodeCount();
338 10 : if (dist.size() <= val)
339 0 : dist.resize(val + 1);
340 :
341 10 : dist[val]++;
342 : }
343 :
344 11 : for (size_t i = 0; i < dist.size(); i++) {
345 : // std::cout << "Swarm Managers with " << i << " nodes: " << dist[i] << std::endl;
346 10 : if (myStream) {
347 10 : myStream << i << "," << dist[i] << std::endl;
348 : }
349 10 : mean += i * dist[i];
350 : }
351 1 : std::cout << "Le noeud avec le plus de noeuds dans sa routing table: " << dist.size()
352 1 : << std::endl;
353 1 : std::cout << "Moyenne de nombre de noeuds par Swarm: " << mean / (float) swarmManagers.size()
354 1 : << std::endl;
355 1 : }
356 :
357 : void
358 3 : SwarmMessageSpread::displayBucketDistribution(const NodeId& id)
359 : {
360 6 : std::string const fileName("distrib_rt_" + std::to_string(nNodes) + "_" + id.toString()
361 3 : + ".txt");
362 3 : std::ofstream myStream(fileName.c_str());
363 :
364 3 : const auto& routingtable = swarmManagers[id]->getRoutingTable().getBuckets();
365 :
366 3 : std::cout << "Bucket distribution for node " << id << std::endl;
367 :
368 13 : for (auto it = routingtable.begin(); it != routingtable.end(); ++it) {
369 10 : auto lowerLimit = it->getLowerLimit().toString();
370 :
371 10 : std::string hex_prefix = lowerLimit.substr(0, 4); // extraire les deux premiers caractères
372 10 : std::cout << "Bucket " << hex_prefix << " has " << it->getNodesSize() << " nodes"
373 10 : << std::endl;
374 :
375 10 : if (myStream) {
376 10 : myStream << hex_prefix << "," << it->getNodesSize() << std::endl;
377 : }
378 10 : }
379 3 : }
380 :
381 : void
382 1 : SwarmMessageSpread::testWriteMessage()
383 : {
384 1 : std::cout << "\ntestWriteMessage()" << std::endl;
385 11 : for (const auto& sm : swarmManagersShuffled) {
386 10 : needSocketCallBack(sm);
387 : }
388 :
389 1 : Counter counter(swarmManagers.size());
390 11 : for (const auto& sm : swarmManagers) {
391 10 : dht::ThreadPool::computation().run([&] {
392 10 : std::vector<NodeId> randIds(BOOTSTRAP_SIZE);
393 10 : std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1);
394 10 : std::generate(randIds.begin(), randIds.end(), [&] {
395 19 : auto dev = randomNodeIds[distribution(rd)];
396 20 : return dev;
397 : });
398 10 : sm.second->setKnownNodes(randIds);
399 10 : counter.count();
400 10 : });
401 : }
402 1 : counter.wait();
403 :
404 1 : std::this_thread::sleep_for(time);
405 :
406 1 : auto& firstNode = *channelSockets_.begin();
407 :
408 1 : crossNodes(swarmManagers.begin()->first);
409 1 : CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size());
410 :
411 1 : std::cout << "Sending First Message to " << firstNode.second.size() << std::endl;
412 1 : auto start = std::chrono::steady_clock::now();
413 :
414 1 : numberTimesReceived[firstNode.first] = 1;
415 :
416 6 : for (const auto& channel : firstNode.second) {
417 5 : if (channel.second.second) {
418 5 : sendMessage(channel.second.second, {1, 0});
419 : }
420 : }
421 :
422 1 : std::unique_lock lk(channelSocketsMtx_);
423 1 : bool ok = channelSocketsCv_.wait_for(lk, 1200s, [&] {
424 : std::cout << "\r"
425 3 : << "Size of Received " << numberTimesReceived.size();
426 3 : return numberTimesReceived.size() == swarmManagers.size();
427 : });
428 1 : auto now = std::chrono::steady_clock::now();
429 :
430 1 : std::cout << "#########################################################################"
431 1 : << std::endl;
432 2 : std::cout << "Time for everyone to receive the message " << dht::print_duration(now - start)
433 1 : << std::endl;
434 1 : std::cout << " IS OK " << ok << std::endl;
435 :
436 : // ##############################################################################
437 :
438 11 : for (const auto& count : numberTimesReceived) {
439 10 : moyenne = moyenne + count.second;
440 :
441 10 : if (count.second > max) {
442 3 : max = count.second;
443 : }
444 :
445 10 : if (count.second < min) {
446 1 : min = count.second;
447 : }
448 : }
449 :
450 1 : auto it = channelSockets_.begin();
451 :
452 1 : displayBucketDistribution((*it).first);
453 1 : std::advance(it, swarmManagers.size() / 2);
454 1 : displayBucketDistribution((*it).first);
455 1 : std::advance(it, swarmManagers.size() / 2 - 1);
456 1 : displayBucketDistribution((*it).first);
457 :
458 1 : std::cout << "MOYENNE DE RECEPTION PAR NOEUD [ " << moyenne / (float) numberTimesReceived.size()
459 1 : << " ] " << std::endl;
460 1 : std::cout << "MAX DE RECEPTION PAR NOEUD [ " << max << " ] " << std::endl;
461 1 : std::cout << "MIN DE RECEPTION PAR NOEUD [ " << min << " ] " << std::endl;
462 :
463 1 : std::cout << "NOMBRE DE SAUTS DIRECTS [ " << TOTAL_HOPS << " ] " << std::endl;
464 1 : std::cout << "NOMBRE D'ITERATIONS [ " << iterations << " ] " << std::endl;
465 :
466 1 : distribution();
467 1 : std::cout << "#########################################################################"
468 1 : << std::endl;
469 :
470 1 : std::cout << "Number of times received " << numberTimesReceived.size() << std::endl;
471 1 : std::cout << "Number of swarm managers " << swarmManagers.size() << std::endl;
472 :
473 1 : CPPUNIT_ASSERT(true);
474 1 : }
475 :
476 : }; // namespace test
477 : } // namespace jami
478 1 : RING_TEST_RUNNER(jami::test::SwarmMessageSpread::name())
|