Line data Source code
1 : /*
2 : * Copyright (C) 2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "swarm_manager.h"
22 : #include <dhtnet/multiplexed_socket.h>
23 : #include <opendht/thread_pool.h>
24 :
25 : constexpr const std::chrono::minutes FIND_PERIOD {10};
26 :
27 : namespace jami {
28 :
29 : using namespace swarm_protocol;
30 :
31 402 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
32 402 : : id_(id)
33 402 : , rd(rand)
34 402 : , toConnectCb_(toConnectCb)
35 : {
36 402 : routing_table.setId(id);
37 402 : }
38 :
39 349 : SwarmManager::~SwarmManager()
40 : {
41 349 : if (!isShutdown_)
42 199 : shutdown();
43 349 : }
44 :
45 : bool
46 1761 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
47 : {
48 1761 : isShutdown_ = false;
49 1760 : std::vector<NodeId> newNodes;
50 : {
51 1757 : std::lock_guard lock(mutex);
52 5040 : for (const auto& nodeId : known_nodes) {
53 3278 : if (addKnownNode(nodeId)) {
54 843 : newNodes.emplace_back(nodeId);
55 : }
56 : }
57 1757 : }
58 :
59 1762 : if (newNodes.empty())
60 1346 : return false;
61 :
62 416 : dht::ThreadPool::io().run([w=weak(), newNodes=std::move(newNodes)] {
63 416 : auto shared = w.lock();
64 415 : if (!shared)
65 0 : return;
66 : // If we detect a new node which already got a TCP link
67 : // we can use it to speed-up the bootstrap (because opening
68 : // a new channel will be easy)
69 415 : std::set<NodeId> toConnect;
70 1261 : for (const auto& nodeId: newNodes) {
71 848 : if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
72 426 : toConnect.emplace(nodeId);
73 : }
74 413 : shared->maintainBuckets(toConnect);
75 416 : });
76 415 : return true;
77 1761 : }
78 :
79 : void
80 1270 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
81 : {
82 : {
83 1270 : std::lock_guard lock(mutex);
84 1283 : for (const auto& nodeId : mobile_nodes)
85 15 : addMobileNodes(nodeId);
86 1267 : }
87 1270 : }
88 :
89 : void
90 1845 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
91 : {
92 : // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
93 1845 : if (channel) {
94 1845 : auto emit = false;
95 : {
96 1845 : std::lock_guard lock(mutex);
97 1845 : emit = routing_table.findBucket(getId())->isEmpty();
98 1844 : auto bucket = routing_table.findBucket(channel->deviceId());
99 1844 : if (routing_table.addNode(channel, bucket)) {
100 1290 : std::error_code ec;
101 1289 : resetNodeExpiry(ec, channel, id_);
102 : }
103 1845 : }
104 1845 : receiveMessage(channel);
105 1844 : if (emit && onConnectionChanged_) {
106 : // If it's the first channel we add, we're now connected!
107 903 : JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
108 301 : onConnectionChanged_(true);
109 : }
110 : }
111 1844 : }
112 :
113 : void
114 476 : SwarmManager::removeNode(const NodeId& nodeId)
115 : {
116 476 : std::unique_lock lk(mutex);
117 477 : if (isConnectedWith(nodeId)) {
118 463 : removeNodeInternal(nodeId);
119 460 : lk.unlock();
120 463 : maintainBuckets();
121 : }
122 479 : }
123 :
124 : void
125 201 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
126 : {
127 201 : std::lock_guard lock(mutex);
128 202 : auto bucket = routing_table.findBucket(nodeId);
129 202 : bucket->changeMobility(nodeId, isMobile);
130 202 : }
131 :
132 : bool
133 476 : SwarmManager::isConnectedWith(const NodeId& deviceId)
134 : {
135 476 : return routing_table.hasNode(deviceId);
136 : }
137 :
138 : void
139 430 : SwarmManager::shutdown()
140 : {
141 430 : if (isShutdown_) {
142 26 : return;
143 : }
144 404 : isShutdown_ = true;
145 404 : std::lock_guard lock(mutex);
146 404 : routing_table.shutdownAllNodes();
147 404 : }
148 :
149 : bool
150 3277 : SwarmManager::addKnownNode(const NodeId& nodeId)
151 : {
152 3277 : return routing_table.addKnownNode(nodeId);
153 : }
154 :
155 : void
156 15 : SwarmManager::addMobileNodes(const NodeId& nodeId)
157 : {
158 15 : if (id_ != nodeId) {
159 14 : routing_table.addMobileNode(nodeId);
160 : }
161 15 : }
162 :
163 : void
164 893 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
165 : {
166 893 : std::set<NodeId> nodes = toConnect;
167 894 : std::unique_lock lock(mutex);
168 895 : auto& buckets = routing_table.getBuckets();
169 3539 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
170 2644 : auto& bucket = *it;
171 2641 : bool myBucket = routing_table.contains(it, id_);
172 4397 : auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
173 1754 : : bucket.getConnectingNodesSize() + bucket.getNodesSize();
174 2646 : if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
175 : auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
176 1241 : rd);
177 1954 : for (auto& node : nodesToTry)
178 713 : routing_table.addConnectingNode(node);
179 :
180 1240 : nodes.insert(nodesToTry.begin(), nodesToTry.end());
181 1241 : }
182 : }
183 895 : lock.unlock();
184 1898 : for (auto& node : nodes)
185 1003 : tryConnect(node);
186 894 : }
187 :
188 : void
189 1290 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
190 : NodeId& nodeId,
191 : Query q,
192 : int numberNodes)
193 : {
194 1290 : msgpack::sbuffer buffer;
195 1291 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
196 1290 : std::error_code ec;
197 :
198 1289 : Request toRequest {q, numberNodes, nodeId};
199 1289 : Message msg;
200 1289 : msg.is_mobile = isMobile_;
201 1289 : msg.request = std::move(toRequest);
202 :
203 1291 : pk.pack(msg);
204 :
205 1289 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
206 :
207 1291 : if (ec) {
208 3 : JAMI_ERROR("{}", ec.message());
209 1 : return;
210 : }
211 1292 : }
212 :
213 : void
214 1264 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
215 : {
216 1264 : std::lock_guard lock(mutex);
217 :
218 1265 : if (msg_.request->q == Query::FIND) {
219 1259 : auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
220 1267 : auto bucket = routing_table.findBucket(msg_.request->nodeId);
221 1267 : const auto& m_nodes = bucket->getMobileNodes();
222 1264 : Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
223 :
224 1266 : Message msg;
225 1266 : msg.is_mobile = isMobile_;
226 1266 : msg.response = std::move(toResponse);
227 :
228 1267 : msgpack::sbuffer buffer((size_t) 60000);
229 1270 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
230 1269 : pk.pack(msg);
231 :
232 1269 : std::error_code ec;
233 :
234 1268 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
235 1270 : if (ec) {
236 0 : JAMI_ERROR("{}", ec.message());
237 0 : return;
238 : }
239 1270 : }
240 :
241 : else {
242 : }
243 1270 : }
244 :
245 : void
246 1845 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
247 : {
248 : struct DecodingContext
249 : {
250 20666 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
251 : nullptr,
252 : 512};
253 : };
254 :
255 1845 : socket->setOnRecv([w = weak(),
256 : wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
257 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
258 2523 : ctx->pac.reserve_buffer(len);
259 2526 : std::copy_n(buf, len, ctx->pac.buffer());
260 2538 : ctx->pac.buffer_consumed(len);
261 :
262 2536 : msgpack::object_handle oh;
263 5068 : while (ctx->pac.next(oh)) {
264 2526 : auto shared = w.lock();
265 2525 : auto socket = wsocket.lock();
266 2516 : if (!shared || !socket)
267 0 : return size_t {0};
268 :
269 : try {
270 2510 : Message msg;
271 2510 : oh.get().convert(msg);
272 :
273 2531 : if (msg.is_mobile)
274 202 : shared->changeMobility(socket->deviceId(), msg.is_mobile);
275 :
276 2531 : if (msg.request) {
277 1263 : shared->sendAnswer(socket, msg);
278 :
279 1263 : } else if (msg.response) {
280 1264 : shared->setKnownNodes(msg.response->nodes);
281 1268 : shared->setMobileNodes(msg.response->mobile_nodes);
282 : }
283 :
284 2535 : } catch (const std::exception& e) {
285 0 : JAMI_WARNING("Error DRT recv: {}", e.what());
286 0 : return len;
287 0 : }
288 2538 : }
289 :
290 2533 : return len;
291 2534 : });
292 :
293 1845 : socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
294 1311 : dht::ThreadPool::io().run([w, deviceId] {
295 965 : auto shared = w.lock();
296 964 : if (shared && !shared->isShutdown_) {
297 476 : shared->removeNode(deviceId);
298 : }
299 966 : });
300 1312 : });
301 1844 : }
302 :
303 : void
304 1289 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
305 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
306 : NodeId node)
307 : {
308 1289 : NodeId idToFind;
309 1288 : std::list<Bucket>::iterator bucket;
310 :
311 1288 : if (ec == asio::error::operation_aborted)
312 0 : return;
313 :
314 1289 : if (!node) {
315 0 : bucket = routing_table.findBucket(socket->deviceId());
316 0 : idToFind = bucket->randomId(rd);
317 : } else {
318 1291 : bucket = routing_table.findBucket(node);
319 1290 : idToFind = node;
320 : }
321 :
322 1290 : sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
323 :
324 1291 : if (!node) {
325 0 : auto& nodeTimer = bucket->getNodeTimer(socket);
326 0 : nodeTimer.expires_after(FIND_PERIOD);
327 0 : nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
328 0 : shared_from_this(),
329 : std::placeholders::_1,
330 : socket,
331 0 : NodeId {}));
332 : }
333 : }
334 :
335 : void
336 1003 : SwarmManager::tryConnect(const NodeId& nodeId)
337 : {
338 1003 : if (needSocketCb_)
339 998 : needSocketCb_(nodeId.toString(),
340 940 : [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
341 940 : auto shared = w.lock();
342 940 : if (!shared)
343 4 : return true;
344 936 : if (socket) {
345 706 : shared->addChannel(socket);
346 705 : return true;
347 : }
348 230 : std::unique_lock lk(shared->mutex);
349 231 : auto bucket = shared->routing_table.findBucket(nodeId);
350 231 : bucket->removeConnectingNode(nodeId);
351 231 : bucket->addKnownNode(nodeId);
352 231 : bucket = shared->routing_table.findBucket(shared->getId());
353 231 : if (bucket->getConnectingNodesSize() == 0
354 231 : && bucket->isEmpty() && shared->onConnectionChanged_) {
355 133 : lk.unlock();
356 399 : JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed",
357 : fmt::ptr(shared.get()));
358 133 : shared->onConnectionChanged_(false);
359 : }
360 231 : return true;
361 940 : });
362 1002 : }
363 :
364 : void
365 463 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
366 : {
367 463 : routing_table.removeNode(nodeId);
368 460 : }
369 :
370 : std::vector<NodeId>
371 0 : SwarmManager::getAllNodes() const
372 : {
373 0 : std::lock_guard lock(mutex);
374 0 : return routing_table.getAllNodes();
375 0 : }
376 :
377 : void
378 1 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
379 : {
380 : {
381 1 : std::lock_guard lock(mutex);
382 2 : for (const auto& node : nodes) {
383 1 : routing_table.deleteNode(node);
384 : }
385 1 : }
386 1 : maintainBuckets();
387 1 : }
388 :
389 : } // namespace jami
|