Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "swarm_manager.h"
19 : #include <dhtnet/multiplexed_socket.h>
20 : #include <opendht/thread_pool.h>
21 :
22 : constexpr const std::chrono::minutes FIND_PERIOD {10};
23 :
24 : namespace jami {
25 :
26 : using namespace swarm_protocol;
27 :
28 586 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
29 586 : : id_(id)
30 586 : , rd(rand)
31 586 : , toConnectCb_(toConnectCb)
32 : {
33 586 : routing_table.setId(id);
34 586 : }
35 :
36 586 : SwarmManager::~SwarmManager()
37 : {
38 586 : if (!isShutdown_)
39 246 : shutdown();
40 586 : }
41 :
42 : bool
43 2248 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
44 : {
45 2248 : isShutdown_ = false;
46 2251 : std::vector<NodeId> newNodes;
47 : {
48 2245 : std::lock_guard lock(mutex);
49 8217 : for (const auto& nodeId : known_nodes) {
50 5958 : if (addKnownNode(nodeId)) {
51 2992 : newNodes.emplace_back(nodeId);
52 : }
53 : }
54 2246 : }
55 :
56 2251 : if (newNodes.empty())
57 1716 : return false;
58 :
59 532 : dht::ThreadPool::io().run([w=weak(), newNodes=std::move(newNodes)] {
60 535 : auto shared = w.lock();
61 533 : if (!shared)
62 0 : return;
63 : // If we detect a new node which already got a TCP link
64 : // we can use it to speed-up the bootstrap (because opening
65 : // a new channel will be easy)
66 533 : std::set<NodeId> toConnect;
67 3525 : for (const auto& nodeId: newNodes) {
68 2995 : if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
69 512 : toConnect.emplace(nodeId);
70 : }
71 531 : shared->maintainBuckets(toConnect);
72 535 : });
73 535 : return true;
74 2251 : }
75 :
76 : void
77 1413 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
78 : {
79 : {
80 1413 : std::lock_guard lock(mutex);
81 1426 : for (const auto& nodeId : mobile_nodes)
82 13 : addMobileNodes(nodeId);
83 1408 : }
84 1410 : }
85 :
86 : void
87 2088 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
88 : {
89 : // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
90 2088 : if (channel) {
91 2089 : auto emit = false;
92 : {
93 2089 : std::lock_guard lock(mutex);
94 2088 : emit = routing_table.findBucket(getId())->isEmpty();
95 2089 : auto bucket = routing_table.findBucket(channel->deviceId());
96 2087 : if (routing_table.addNode(channel, bucket)) {
97 1445 : std::error_code ec;
98 1444 : resetNodeExpiry(ec, channel, id_);
99 : }
100 2089 : }
101 2088 : receiveMessage(channel);
102 2089 : if (emit && onConnectionChanged_) {
103 : // If it's the first channel we add, we're now connected!
104 1349 : JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
105 451 : onConnectionChanged_(true);
106 : }
107 : }
108 2089 : }
109 :
110 : void
111 574 : SwarmManager::removeNode(const NodeId& nodeId)
112 : {
113 574 : std::unique_lock lk(mutex);
114 573 : if (isConnectedWith(nodeId)) {
115 543 : removeNodeInternal(nodeId);
116 548 : lk.unlock();
117 549 : maintainBuckets();
118 : }
119 574 : }
120 :
121 : void
122 191 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
123 : {
124 191 : std::lock_guard lock(mutex);
125 192 : auto bucket = routing_table.findBucket(nodeId);
126 191 : bucket->changeMobility(nodeId, isMobile);
127 191 : }
128 :
129 : bool
130 573 : SwarmManager::isConnectedWith(const NodeId& deviceId)
131 : {
132 573 : return routing_table.hasNode(deviceId);
133 : }
134 :
135 : void
136 651 : SwarmManager::shutdown()
137 : {
138 651 : if (isShutdown_) {
139 35 : return;
140 : }
141 616 : isShutdown_ = true;
142 616 : std::lock_guard lock(mutex);
143 616 : routing_table.shutdownAllNodes();
144 616 : }
145 :
146 : void
147 22 : SwarmManager::restart()
148 : {
149 22 : isShutdown_ = false;
150 22 : }
151 :
152 : bool
153 5960 : SwarmManager::addKnownNode(const NodeId& nodeId)
154 : {
155 5960 : return routing_table.addKnownNode(nodeId);
156 : }
157 :
158 : void
159 13 : SwarmManager::addMobileNodes(const NodeId& nodeId)
160 : {
161 13 : if (id_ != nodeId) {
162 12 : routing_table.addMobileNode(nodeId);
163 : }
164 13 : }
165 :
166 : void
167 1122 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
168 : {
169 1122 : std::set<NodeId> nodes = toConnect;
170 1120 : std::unique_lock lock(mutex);
171 1121 : auto& buckets = routing_table.getBuckets();
172 3999 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
173 2877 : auto& bucket = *it;
174 2878 : bool myBucket = routing_table.contains(it, id_);
175 4632 : auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
176 1755 : : bucket.getConnectingNodesSize() + bucket.getNodesSize();
177 2876 : if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
178 : auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
179 1419 : rd);
180 2311 : for (auto& node : nodesToTry)
181 895 : routing_table.addConnectingNode(node);
182 :
183 1410 : nodes.insert(nodesToTry.begin(), nodesToTry.end());
184 1416 : }
185 : }
186 1123 : lock.unlock();
187 2306 : for (auto& node : nodes)
188 1180 : tryConnect(node);
189 1122 : }
190 :
191 : void
192 1444 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
193 : NodeId& nodeId,
194 : Query q,
195 : int numberNodes)
196 : {
197 1444 : msgpack::sbuffer buffer;
198 1445 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
199 1445 : std::error_code ec;
200 :
201 1445 : Request toRequest {q, numberNodes, nodeId};
202 1445 : Message msg;
203 1445 : msg.is_mobile = isMobile_;
204 1445 : msg.request = std::move(toRequest);
205 :
206 1444 : pk.pack(msg);
207 :
208 1443 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
209 :
210 1445 : if (ec) {
211 8 : JAMI_ERROR("{}", ec.message());
212 3 : return;
213 : }
214 1448 : }
215 :
216 : void
217 1410 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
218 : {
219 1410 : std::lock_guard lock(mutex);
220 :
221 1412 : if (msg_.request->q == Query::FIND) {
222 1409 : auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
223 1415 : auto bucket = routing_table.findBucket(msg_.request->nodeId);
224 1412 : const auto& m_nodes = bucket->getMobileNodes();
225 1413 : Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
226 :
227 1411 : Message msg;
228 1411 : msg.is_mobile = isMobile_;
229 1411 : msg.response = std::move(toResponse);
230 :
231 1415 : msgpack::sbuffer buffer((size_t) 60000);
232 1416 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
233 1415 : pk.pack(msg);
234 :
235 1415 : std::error_code ec;
236 :
237 1415 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
238 1416 : if (ec) {
239 0 : JAMI_ERROR("{}", ec.message());
240 0 : return;
241 : }
242 1416 : }
243 :
244 : else {
245 : }
246 1415 : }
247 :
248 : void
249 2089 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
250 : {
251 : struct DecodingContext
252 : {
253 22819 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
254 : nullptr,
255 : 512};
256 : };
257 :
258 2089 : socket->setOnRecv([w = weak(),
259 : wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
260 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
261 2814 : ctx->pac.reserve_buffer(len);
262 2815 : std::copy_n(buf, len, ctx->pac.buffer());
263 2829 : ctx->pac.buffer_consumed(len);
264 :
265 2828 : msgpack::object_handle oh;
266 5646 : while (ctx->pac.next(oh)) {
267 2807 : auto shared = w.lock();
268 2809 : auto socket = wsocket.lock();
269 2812 : if (!shared || !socket)
270 1 : return size_t {0};
271 :
272 : try {
273 2810 : Message msg;
274 2810 : oh.get().convert(msg);
275 :
276 2803 : if (msg.is_mobile)
277 190 : shared->changeMobility(socket->deviceId(), msg.is_mobile);
278 :
279 2805 : if (msg.request) {
280 1409 : shared->sendAnswer(socket, msg);
281 :
282 1403 : } else if (msg.response) {
283 1404 : shared->setKnownNodes(msg.response->nodes);
284 1410 : shared->setMobileNodes(msg.response->mobile_nodes);
285 : }
286 :
287 2821 : } catch (const std::exception& e) {
288 0 : JAMI_WARNING("Error DRT recv: {}", e.what());
289 0 : return len;
290 0 : }
291 2825 : }
292 :
293 2822 : return len;
294 2824 : });
295 :
296 2087 : socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
297 1466 : dht::ThreadPool::io().run([w, deviceId] {
298 1133 : auto shared = w.lock();
299 1132 : if (shared && !shared->isShutdown_) {
300 574 : shared->removeNode(deviceId);
301 : }
302 1134 : });
303 1467 : });
304 2087 : }
305 :
306 : void
307 1444 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
308 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
309 : NodeId node)
310 : {
311 1444 : NodeId idToFind;
312 1443 : std::list<Bucket>::iterator bucket;
313 :
314 1444 : if (ec == asio::error::operation_aborted)
315 0 : return;
316 :
317 1443 : if (!node) {
318 0 : bucket = routing_table.findBucket(socket->deviceId());
319 0 : idToFind = bucket->randomId(rd);
320 : } else {
321 1444 : bucket = routing_table.findBucket(node);
322 1444 : idToFind = node;
323 : }
324 :
325 1444 : sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
326 :
327 1445 : if (!node) {
328 0 : auto& nodeTimer = bucket->getNodeTimer(socket);
329 0 : nodeTimer.expires_after(FIND_PERIOD);
330 0 : nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
331 0 : shared_from_this(),
332 : std::placeholders::_1,
333 : socket,
334 0 : NodeId {}));
335 : }
336 : }
337 :
338 : void
339 1180 : SwarmManager::tryConnect(const NodeId& nodeId)
340 : {
341 1180 : if (needSocketCb_)
342 1175 : needSocketCb_(nodeId.toString(),
343 1095 : [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
344 1095 : auto shared = w.lock();
345 1095 : if (!shared || shared->isShutdown_)
346 222 : return true;
347 872 : if (socket) {
348 788 : shared->addChannel(socket);
349 788 : return true;
350 : }
351 84 : std::unique_lock lk(shared->mutex);
352 84 : auto bucket = shared->routing_table.findBucket(nodeId);
353 84 : bucket->removeConnectingNode(nodeId);
354 84 : bucket->addKnownNode(nodeId);
355 84 : bucket = shared->routing_table.findBucket(shared->getId());
356 84 : if (bucket->getConnectingNodesSize() == 0
357 84 : && bucket->isEmpty() && shared->onConnectionChanged_) {
358 38 : lk.unlock();
359 114 : JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed",
360 : fmt::ptr(shared.get()));
361 38 : shared->onConnectionChanged_(false);
362 : }
363 84 : return true;
364 1094 : });
365 1182 : }
366 :
367 : void
368 543 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
369 : {
370 543 : routing_table.removeNode(nodeId);
371 548 : }
372 :
373 : std::vector<NodeId>
374 0 : SwarmManager::getAllNodes() const
375 : {
376 0 : std::lock_guard lock(mutex);
377 0 : return routing_table.getAllNodes();
378 0 : }
379 :
380 : void
381 18 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
382 : {
383 : {
384 18 : std::lock_guard lock(mutex);
385 30 : for (const auto& node : nodes) {
386 12 : routing_table.deleteNode(node);
387 : }
388 18 : }
389 18 : maintainBuckets();
390 18 : }
391 :
392 : } // namespace jami
|