Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include <cppunit/TestAssert.h>
19 : #include <cppunit/TestFixture.h>
20 : #include <cppunit/extensions/HelperMacros.h>
21 :
22 : #include <algorithm>
23 : #include <msgpack.hpp>
24 : #include <opendht/thread_pool.h>
25 : #include <opendht/utils.h>
26 :
27 : #include <iostream>
28 : #include <fstream>
29 : #include <string>
30 :
31 : #include "../../test_runner.h"
32 : #include "jami.h"
33 : #include "../common.h"
34 : #include "jamidht/swarm/swarm_manager.h"
35 : #include <dhtnet/multiplexed_socket.h>
36 : #include "nodes.h"
37 :
38 : using namespace std::string_literals;
39 : using namespace std::chrono_literals;
40 : using namespace dht;
41 : using NodeId = dht::PkId;
42 :
43 : namespace jami {
44 : namespace test {
45 :
46 : constexpr size_t nNodes = 10;
47 :
48 : constexpr size_t BOOTSTRAP_SIZE = 2;
49 : auto time = 30s;
50 :
51 : int TOTAL_HOPS = 0;
52 : int moyenne = 0;
53 : int max = 0;
54 : int min = 10000;
55 :
56 : struct Message
57 : {
58 : int identifier_; // message identifier
59 : int hops_ = 0; // number of hops
60 90 : MSGPACK_DEFINE_MAP(identifier_, hops_);
61 : };
62 :
63 : struct Counter
64 : {
65 1 : Counter(unsigned t)
66 1 : : target(t)
67 1 : {}
68 : const unsigned target;
69 : unsigned added {0};
70 : std::mutex mutex;
71 : std::condition_variable cv;
72 :
73 10 : void count()
74 : {
75 10 : std::lock_guard lock(mutex);
76 10 : ++added;
77 10 : if (added == target)
78 1 : cv.notify_one();
79 10 : }
80 : bool wait(std::chrono::steady_clock::duration timeout)
81 : {
82 : std::unique_lock lock(mutex);
83 : return cv.wait_for(lock, timeout, [&] { return added == target; });
84 : }
85 1 : void wait()
86 : {
87 1 : std::unique_lock lock(mutex);
88 4 : return cv.wait(lock, [&] { return added == target; });
89 1 : }
90 : };
91 :
92 : class SwarmMessageSpread : public CppUnit::TestFixture
93 : {
94 : public:
95 2 : ~SwarmMessageSpread() { libjami::fini(); }
96 2 : static std::string name() { return "SwarmMessageSpread"; }
97 :
98 : void setUp();
99 : void tearDown();
100 :
101 : private:
102 : std::mt19937_64 rd {dht::crypto::getSeededRandomEngine<std::mt19937_64>()};
103 : std::mutex channelSocketsMtx_;
104 : std::condition_variable channelSocketsCv_;
105 :
106 : std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagers;
107 : std::map<NodeId,
108 : std::map<NodeId,
109 : std::pair<std::shared_ptr<dhtnet::ChannelSocketTest>,
110 : std::shared_ptr<dhtnet::ChannelSocketTest>>>>
111 : channelSockets_;
112 : std::vector<NodeId> randomNodeIds;
113 : std::vector<std::shared_ptr<jami::SwarmManager>> swarmManagersShuffled;
114 : std::set<NodeId> discoveredNodes;
115 : std::map<NodeId, int> numberTimesReceived;
116 : std::map<NodeId, int> requestsReceived;
117 : std::map<NodeId, int> answersSent;
118 :
119 : int iterations = 0;
120 :
121 : void generateSwarmManagers();
122 : void needSocketCallBack(const std::shared_ptr<SwarmManager>& sm);
123 : void sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg);
124 : void receiveMessage(const NodeId nodeId, const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket);
125 : void relayMessageToRoutingTable(const NodeId nodeId, const NodeId sourceId, const Message msg);
126 : void updateHops(int hops);
127 : void crossNodes(NodeId nodeId);
128 : void displayBucketDistribution(const NodeId& id);
129 : void distribution();
130 : std::shared_ptr<jami::SwarmManager> getManager(const NodeId& id);
131 :
132 : void testWriteMessage();
133 :
134 2 : CPPUNIT_TEST_SUITE(SwarmMessageSpread);
135 1 : CPPUNIT_TEST(testWriteMessage);
136 4 : CPPUNIT_TEST_SUITE_END();
137 : };
138 :
139 : CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(SwarmMessageSpread, SwarmMessageSpread::name());
140 :
141 : void
142 1 : SwarmMessageSpread::setUp()
143 : {
144 1 : libjami::init(
145 : libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG));
146 1 : if (not Manager::instance().initialized) {
147 1 : CPPUNIT_ASSERT(libjami::start("jami-sample.yml"));
148 : }
149 :
150 1 : generateSwarmManagers();
151 1 : }
152 :
153 : void
154 1 : SwarmMessageSpread::tearDown()
155 1 : {}
156 :
157 : void
158 1 : SwarmMessageSpread::generateSwarmManagers()
159 : {
160 11 : for (size_t i = 0; i < nNodes; i++) {
161 10 : const NodeId node = Hash<32>::getRandom();
162 60 : auto sm = std::make_shared<SwarmManager>(node, rd, std::move([](auto) {return false;}));
163 10 : swarmManagers[node] = sm;
164 10 : randomNodeIds.emplace_back(node);
165 10 : swarmManagersShuffled.emplace_back(sm);
166 10 : }
167 1 : }
168 :
169 : std::shared_ptr<jami::SwarmManager>
170 56 : SwarmMessageSpread::getManager(const NodeId& id)
171 : {
172 56 : auto it = swarmManagers.find(id);
173 111 : return it == swarmManagers.end() ? nullptr : it->second;
174 : }
175 :
176 : void
177 1 : SwarmMessageSpread::crossNodes(NodeId nodeId)
178 : {
179 1 : std::list<NodeId> pendingNodes {nodeId};
180 1 : discoveredNodes.clear();
181 :
182 56 : for (const auto& curNode : pendingNodes) {
183 55 : if (discoveredNodes.emplace(curNode).second) {
184 10 : if (auto sm = getManager(curNode))
185 64 : for (const auto& node : sm->getRoutingTable().getNodes()) {
186 54 : pendingNodes.emplace_back(node);
187 20 : }
188 : }
189 : }
190 1 : }
191 :
192 : void
193 45 : SwarmMessageSpread::sendMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, Message msg)
194 : {
195 45 : auto buffer = std::make_shared<msgpack::sbuffer>(32);
196 45 : msgpack::packer<msgpack::sbuffer> pk(buffer.get());
197 45 : pk.pack(msg);
198 :
199 45 : dht::ThreadPool::io().run([socket, buffer = std::move(buffer)] {
200 45 : std::error_code ec;
201 : // std::this_thread::sleep_for(std::chrono::milliseconds(50));
202 :
203 45 : socket->write(reinterpret_cast<const unsigned char*>(buffer->data()), buffer->size(), ec);
204 45 : });
205 45 : }
206 :
207 : void
208 54 : SwarmMessageSpread::receiveMessage(const NodeId nodeId,
209 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
210 : {
211 : struct DecodingContext
212 : {
213 88 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
214 : nullptr,
215 : 32};
216 : };
217 :
218 54 : socket->setOnRecv([this,
219 : wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
220 : ctx = std::make_shared<DecodingContext>(),
221 207 : nodeId](const uint8_t* buf, size_t len) {
222 45 : auto socket = wsocket.lock();
223 45 : if (!socket)
224 0 : return 0lu;
225 :
226 45 : ctx->pac.reserve_buffer(len);
227 44 : std::copy_n(buf, len, ctx->pac.buffer());
228 44 : ctx->pac.buffer_consumed(len);
229 :
230 44 : msgpack::object_handle oh;
231 89 : while (ctx->pac.next(oh)) {
232 : try {
233 45 : Message msg;
234 45 : oh.get().convert(msg);
235 :
236 45 : if (msg.identifier_ == 1) {
237 45 : std::lock_guard lk(channelSocketsMtx_);
238 45 : auto var = numberTimesReceived.find(nodeId);
239 45 : iterations = iterations + 1;
240 :
241 45 : if (var != numberTimesReceived.end()) {
242 36 : var->second += 1;
243 : } else {
244 9 : Message msgToSend;
245 9 : msgToSend.identifier_ = 1;
246 9 : msgToSend.hops_ = msg.hops_ + 1;
247 9 : numberTimesReceived[nodeId] = 1;
248 9 : updateHops(msgToSend.hops_);
249 9 : relayMessageToRoutingTable(nodeId, socket->deviceId(), msgToSend);
250 : }
251 45 : channelSocketsCv_.notify_all();
252 45 : }
253 :
254 0 : } catch (const std::exception& e) {
255 0 : JAMI_WARNING("Error DRT recv: {}", e.what());
256 0 : return 0lu;
257 0 : }
258 : }
259 :
260 45 : return 0lu;
261 45 : });
262 54 : };
263 :
264 : void
265 9 : SwarmMessageSpread::updateHops(int hops)
266 : {
267 9 : if (hops > TOTAL_HOPS) {
268 4 : TOTAL_HOPS = hops;
269 : }
270 9 : }
271 :
272 : void
273 10 : SwarmMessageSpread::needSocketCallBack(const std::shared_ptr<SwarmManager>& sm)
274 : {
275 10 : sm->needSocketCb_ = [this, wsm = std::weak_ptr<SwarmManager>(sm)](const std::string& nodeId,
276 37 : auto&& onSocket) {
277 111 : dht::ThreadPool::io().run(
278 74 : [this, wsm = std::move(wsm), nodeId, onSocket = std::move(onSocket)] {
279 37 : auto sm = wsm.lock();
280 37 : if (!sm)
281 0 : return;
282 :
283 37 : NodeId node = dhtnet::DeviceId(nodeId);
284 74 : if (auto smRemote = getManager(node)) {
285 37 : auto myId = sm->getId();
286 37 : std::unique_lock lk(channelSocketsMtx_);
287 37 : auto& cstRemote = channelSockets_[node][myId];
288 37 : auto& cstMe = channelSockets_[myId][node];
289 37 : if (cstMe.second && cstMe.first)
290 10 : return;
291 27 : if (!cstMe.second) {
292 27 : cstMe.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "test1", 1);
293 27 : cstRemote.second = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "test1", 1);
294 : }
295 27 : if (!cstMe.first) {
296 27 : cstRemote.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), myId, "swarm1", 0);
297 27 : cstMe.first = std::make_shared<dhtnet::ChannelSocketTest>(Manager::instance().ioContext(), node, "swarm1", 0);
298 : }
299 27 : lk.unlock();
300 27 : dhtnet::ChannelSocketTest::link(cstMe.second, cstRemote.second);
301 27 : receiveMessage(myId, cstMe.second);
302 27 : receiveMessage(node, cstRemote.second);
303 : // std::this_thread::sleep_for(std::chrono::seconds(5));
304 27 : dhtnet::ChannelSocketTest::link(cstMe.first, cstRemote.first);
305 27 : smRemote->addChannel(cstRemote.first);
306 27 : onSocket(cstMe.first);
307 37 : }
308 37 : });
309 57 : };
310 10 : }
311 :
312 : void
313 9 : SwarmMessageSpread::relayMessageToRoutingTable(const NodeId nodeId,
314 : const NodeId sourceId,
315 : const Message msg)
316 : {
317 9 : auto swarmManager = getManager(nodeId);
318 9 : const auto& routingtable = swarmManager->getRoutingTable().getNodes();
319 60 : for (auto& node : routingtable) {
320 51 : if (node != sourceId) {
321 42 : auto channelToSend = channelSockets_[nodeId][node].second;
322 42 : sendMessage(channelToSend, msg);
323 42 : }
324 : }
325 9 : }
326 :
327 : void
328 1 : SwarmMessageSpread::distribution()
329 : {
330 2 : std::string const fileName("distrib_nodes_" + std::to_string(nNodes) + ".txt");
331 1 : std::ofstream myStream(fileName.c_str());
332 :
333 1 : std::vector<unsigned> dist(10);
334 1 : int mean = 0;
335 11 : for (const auto& sm : swarmManagers) {
336 10 : auto val = sm.second->getRoutingTable().getRoutingTableNodeCount();
337 10 : if (dist.size() <= val)
338 0 : dist.resize(val + 1);
339 :
340 10 : dist[val]++;
341 : }
342 :
343 11 : for (size_t i = 0; i < dist.size(); i++) {
344 : // std::cout << "Swarm Managers with " << i << " nodes: " << dist[i] << std::endl;
345 10 : if (myStream) {
346 10 : myStream << i << "," << dist[i] << std::endl;
347 : }
348 10 : mean += i * dist[i];
349 : }
350 1 : std::cout << "Le noeud avec le plus de noeuds dans sa routing table: " << dist.size()
351 1 : << std::endl;
352 1 : std::cout << "Moyenne de nombre de noeuds par Swarm: " << mean / (float) swarmManagers.size()
353 1 : << std::endl;
354 1 : }
355 :
356 : void
357 3 : SwarmMessageSpread::displayBucketDistribution(const NodeId& id)
358 : {
359 6 : std::string const fileName("distrib_rt_" + std::to_string(nNodes) + "_" + id.toString()
360 3 : + ".txt");
361 3 : std::ofstream myStream(fileName.c_str());
362 :
363 3 : const auto& routingtable = swarmManagers[id]->getRoutingTable().getBuckets();
364 :
365 3 : std::cout << "Bucket distribution for node " << id << std::endl;
366 :
367 13 : for (auto it = routingtable.begin(); it != routingtable.end(); ++it) {
368 10 : auto lowerLimit = it->getLowerLimit().toString();
369 :
370 10 : std::string hex_prefix = lowerLimit.substr(0, 4); // extraire les deux premiers caractères
371 10 : std::cout << "Bucket " << hex_prefix << " has " << it->getNodesSize() << " nodes"
372 10 : << std::endl;
373 :
374 10 : if (myStream) {
375 10 : myStream << hex_prefix << "," << it->getNodesSize() << std::endl;
376 : }
377 10 : }
378 3 : }
379 :
380 : void
381 1 : SwarmMessageSpread::testWriteMessage()
382 : {
383 1 : std::cout << "\ntestWriteMessage()" << std::endl;
384 11 : for (const auto& sm : swarmManagersShuffled) {
385 10 : needSocketCallBack(sm);
386 : }
387 :
388 1 : Counter counter(swarmManagers.size());
389 11 : for (const auto& sm : swarmManagers) {
390 10 : dht::ThreadPool::computation().run([&] {
391 10 : std::vector<NodeId> randIds(BOOTSTRAP_SIZE);
392 10 : std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1);
393 10 : std::generate(randIds.begin(), randIds.end(), [&] {
394 20 : auto dev = randomNodeIds[distribution(rd)];
395 20 : return dev;
396 : });
397 10 : sm.second->setKnownNodes(randIds);
398 10 : counter.count();
399 10 : });
400 : }
401 1 : counter.wait();
402 :
403 1 : std::this_thread::sleep_for(time);
404 :
405 1 : auto& firstNode = *channelSockets_.begin();
406 :
407 1 : crossNodes(swarmManagers.begin()->first);
408 1 : CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size());
409 :
410 1 : std::cout << "Sending First Message to " << firstNode.second.size() << std::endl;
411 1 : auto start = std::chrono::steady_clock::now();
412 :
413 1 : numberTimesReceived[firstNode.first] = 1;
414 :
415 4 : for (const auto& channel : firstNode.second) {
416 3 : if (channel.second.second) {
417 3 : sendMessage(channel.second.second, {1, 0});
418 : }
419 : }
420 :
421 1 : std::unique_lock lk(channelSocketsMtx_);
422 1 : bool ok = channelSocketsCv_.wait_for(lk, 1200s, [&] {
423 : std::cout << "\r"
424 9 : << "Size of Received " << numberTimesReceived.size();
425 9 : return numberTimesReceived.size() == swarmManagers.size();
426 : });
427 1 : auto now = std::chrono::steady_clock::now();
428 :
429 1 : std::cout << "#########################################################################"
430 1 : << std::endl;
431 2 : std::cout << "Time for everyone to receive the message " << dht::print_duration(now - start)
432 1 : << std::endl;
433 1 : std::cout << " IS OK " << ok << std::endl;
434 :
435 : // ##############################################################################
436 :
437 11 : for (const auto& count : numberTimesReceived) {
438 10 : moyenne = moyenne + count.second;
439 :
440 10 : if (count.second > max) {
441 3 : max = count.second;
442 : }
443 :
444 10 : if (count.second < min) {
445 1 : min = count.second;
446 : }
447 : }
448 :
449 1 : auto it = channelSockets_.begin();
450 :
451 1 : displayBucketDistribution((*it).first);
452 1 : std::advance(it, swarmManagers.size() / 2);
453 1 : displayBucketDistribution((*it).first);
454 1 : std::advance(it, swarmManagers.size() / 2 - 1);
455 1 : displayBucketDistribution((*it).first);
456 :
457 1 : std::cout << "MOYENNE DE RECEPTION PAR NOEUD [ " << moyenne / (float) numberTimesReceived.size()
458 1 : << " ] " << std::endl;
459 1 : std::cout << "MAX DE RECEPTION PAR NOEUD [ " << max << " ] " << std::endl;
460 1 : std::cout << "MIN DE RECEPTION PAR NOEUD [ " << min << " ] " << std::endl;
461 :
462 1 : std::cout << "NOMBRE DE SAUTS DIRECTS [ " << TOTAL_HOPS << " ] " << std::endl;
463 1 : std::cout << "NOMBRE D'ITERATIONS [ " << iterations << " ] " << std::endl;
464 :
465 1 : distribution();
466 1 : std::cout << "#########################################################################"
467 1 : << std::endl;
468 :
469 1 : std::cout << "Number of times received " << numberTimesReceived.size() << std::endl;
470 1 : std::cout << "Number of swarm managers " << swarmManagers.size() << std::endl;
471 :
472 1 : CPPUNIT_ASSERT(true);
473 1 : }
474 :
475 : }; // namespace test
476 : } // namespace jami
477 1 : RING_TEST_RUNNER(jami::test::SwarmMessageSpread::name())
|