Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "swarm_manager.h"
19 : #include <dhtnet/multiplexed_socket.h>
20 : #include <opendht/thread_pool.h>
21 :
22 : constexpr const std::chrono::minutes FIND_PERIOD {10};
23 :
24 : namespace jami {
25 :
26 : using namespace swarm_protocol;
27 :
28 585 : SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
29 585 : : id_(id)
30 585 : , rd(rand)
31 585 : , toConnectCb_(toConnectCb)
32 : {
33 585 : routing_table.setId(id);
34 585 : }
35 :
36 585 : SwarmManager::~SwarmManager()
37 : {
38 585 : if (!isShutdown_)
39 245 : shutdown();
40 585 : }
41 :
42 : bool
43 2246 : SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
44 : {
45 2246 : isShutdown_ = false;
46 2254 : std::vector<NodeId> newNodes;
47 : {
48 2249 : std::lock_guard lock(mutex);
49 8243 : for (const auto& nodeId : known_nodes) {
50 5989 : if (addKnownNode(nodeId)) {
51 2991 : newNodes.emplace_back(nodeId);
52 : }
53 : }
54 2246 : }
55 :
56 2255 : if (newNodes.empty())
57 1726 : return false;
58 :
59 528 : dht::ThreadPool::io().run([w=weak(), newNodes=std::move(newNodes)] {
60 528 : auto shared = w.lock();
61 527 : if (!shared)
62 0 : return;
63 : // If we detect a new node which already got a TCP link
64 : // we can use it to speed-up the bootstrap (because opening
65 : // a new channel will be easy)
66 527 : std::set<NodeId> toConnect;
67 3518 : for (const auto& nodeId: newNodes) {
68 2988 : if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
69 515 : toConnect.emplace(nodeId);
70 : }
71 527 : shared->maintainBuckets(toConnect);
72 529 : });
73 529 : return true;
74 2255 : }
75 :
76 : void
77 1419 : SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
78 : {
79 : {
80 1419 : std::lock_guard lock(mutex);
81 1436 : for (const auto& nodeId : mobile_nodes)
82 15 : addMobileNodes(nodeId);
83 1419 : }
84 1421 : }
85 :
86 : void
87 2114 : SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
88 : {
89 : // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
90 2114 : if (channel) {
91 2114 : auto emit = false;
92 : {
93 2114 : std::lock_guard lock(mutex);
94 2114 : emit = routing_table.findBucket(getId())->isEmpty();
95 2114 : auto bucket = routing_table.findBucket(channel->deviceId());
96 2114 : if (routing_table.addNode(channel, bucket)) {
97 1449 : std::error_code ec;
98 1449 : resetNodeExpiry(ec, channel, id_);
99 : }
100 2114 : }
101 2114 : receiveMessage(channel);
102 2112 : if (emit && onConnectionChanged_) {
103 : // If it's the first channel we add, we're now connected!
104 1395 : JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
105 465 : onConnectionChanged_(true);
106 : }
107 : }
108 2112 : }
109 :
110 : void
111 584 : SwarmManager::removeNode(const NodeId& nodeId)
112 : {
113 584 : std::unique_lock lk(mutex);
114 584 : if (isConnectedWith(nodeId)) {
115 551 : removeNodeInternal(nodeId);
116 553 : lk.unlock();
117 553 : maintainBuckets();
118 : }
119 584 : }
120 :
121 : void
122 199 : SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
123 : {
124 199 : std::lock_guard lock(mutex);
125 200 : auto bucket = routing_table.findBucket(nodeId);
126 200 : bucket->changeMobility(nodeId, isMobile);
127 199 : }
128 :
129 : bool
130 584 : SwarmManager::isConnectedWith(const NodeId& deviceId)
131 : {
132 584 : return routing_table.hasNode(deviceId);
133 : }
134 :
135 : void
136 648 : SwarmManager::shutdown()
137 : {
138 648 : if (isShutdown_) {
139 36 : return;
140 : }
141 612 : isShutdown_ = true;
142 612 : std::lock_guard lock(mutex);
143 612 : routing_table.shutdownAllNodes();
144 612 : }
145 :
146 : void
147 19 : SwarmManager::restart()
148 : {
149 19 : isShutdown_ = false;
150 19 : }
151 :
152 : bool
153 5990 : SwarmManager::addKnownNode(const NodeId& nodeId)
154 : {
155 5990 : return routing_table.addKnownNode(nodeId);
156 : }
157 :
158 : void
159 15 : SwarmManager::addMobileNodes(const NodeId& nodeId)
160 : {
161 15 : if (id_ != nodeId) {
162 14 : routing_table.addMobileNode(nodeId);
163 : }
164 15 : }
165 :
166 : void
167 1115 : SwarmManager::maintainBuckets(const std::set<NodeId>& toConnect)
168 : {
169 1115 : std::set<NodeId> nodes = toConnect;
170 1116 : std::unique_lock lock(mutex);
171 1118 : auto& buckets = routing_table.getBuckets();
172 4127 : for (auto it = buckets.begin(); it != buckets.end(); ++it) {
173 3006 : auto& bucket = *it;
174 3000 : bool myBucket = routing_table.contains(it, id_);
175 4904 : auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
176 1894 : : bucket.getConnectingNodesSize() + bucket.getNodesSize();
177 3010 : if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
178 : auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
179 1628 : rd);
180 2518 : for (auto& node : nodesToTry)
181 886 : routing_table.addConnectingNode(node);
182 :
183 1629 : nodes.insert(nodesToTry.begin(), nodesToTry.end());
184 1628 : }
185 : }
186 1117 : lock.unlock();
187 2296 : for (auto& node : nodes)
188 1177 : tryConnect(node);
189 1117 : }
190 :
191 : void
192 1451 : SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
193 : NodeId& nodeId,
194 : Query q,
195 : int numberNodes)
196 : {
197 1451 : msgpack::sbuffer buffer;
198 1450 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
199 1451 : std::error_code ec;
200 :
201 1451 : Request toRequest {q, numberNodes, nodeId};
202 1451 : Message msg;
203 1451 : msg.is_mobile = isMobile_;
204 1451 : msg.request = std::move(toRequest);
205 :
206 1451 : pk.pack(msg);
207 :
208 1446 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
209 :
210 1451 : if (ec) {
211 9 : JAMI_ERROR("{}", ec.message());
212 3 : return;
213 : }
214 1454 : }
215 :
216 : void
217 1416 : SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
218 : {
219 1416 : std::lock_guard lock(mutex);
220 :
221 1415 : if (msg_.request->q == Query::FIND) {
222 1414 : auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
223 1423 : auto bucket = routing_table.findBucket(msg_.request->nodeId);
224 1423 : const auto& m_nodes = bucket->getMobileNodes();
225 1422 : Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
226 :
227 1419 : Message msg;
228 1419 : msg.is_mobile = isMobile_;
229 1419 : msg.response = std::move(toResponse);
230 :
231 1421 : msgpack::sbuffer buffer((size_t) 60000);
232 1422 : msgpack::packer<msgpack::sbuffer> pk(&buffer);
233 1421 : pk.pack(msg);
234 :
235 1423 : std::error_code ec;
236 :
237 1423 : socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
238 1423 : if (ec) {
239 0 : JAMI_ERROR("{}", ec.message());
240 0 : return;
241 : }
242 1422 : }
243 :
244 : else {
245 : }
246 1421 : }
247 :
248 : void
249 2114 : SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
250 : {
251 : struct DecodingContext
252 : {
253 23008 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
254 : nullptr,
255 : 512};
256 : };
257 :
258 2114 : socket->setOnRecv([w = weak(),
259 : wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
260 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
261 2837 : ctx->pac.reserve_buffer(len);
262 2836 : std::copy_n(buf, len, ctx->pac.buffer());
263 2841 : ctx->pac.buffer_consumed(len);
264 :
265 2840 : msgpack::object_handle oh;
266 5673 : while (ctx->pac.next(oh)) {
267 2819 : auto shared = w.lock();
268 2820 : auto socket = wsocket.lock();
269 2814 : if (!shared || !socket)
270 0 : return size_t {0};
271 :
272 : try {
273 2811 : Message msg;
274 2811 : oh.get().convert(msg);
275 :
276 2827 : if (msg.is_mobile)
277 199 : shared->changeMobility(socket->deviceId(), msg.is_mobile);
278 :
279 2828 : if (msg.request) {
280 1416 : shared->sendAnswer(socket, msg);
281 :
282 1415 : } else if (msg.response) {
283 1415 : shared->setKnownNodes(msg.response->nodes);
284 1419 : shared->setMobileNodes(msg.response->mobile_nodes);
285 : }
286 :
287 2838 : } catch (const std::exception& e) {
288 0 : JAMI_WARNING("Error DRT recv: {}", e.what());
289 0 : return len;
290 0 : }
291 2841 : }
292 :
293 2837 : return len;
294 2837 : });
295 :
296 2113 : socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
297 1485 : dht::ThreadPool::io().run([w, deviceId] {
298 1147 : auto shared = w.lock();
299 1147 : if (shared && !shared->isShutdown_) {
300 584 : shared->removeNode(deviceId);
301 : }
302 1148 : });
303 1485 : });
304 2112 : }
305 :
306 : void
307 1451 : SwarmManager::resetNodeExpiry(const asio::error_code& ec,
308 : const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
309 : NodeId node)
310 : {
311 1451 : NodeId idToFind;
312 1450 : std::list<Bucket>::iterator bucket;
313 :
314 1449 : if (ec == asio::error::operation_aborted)
315 0 : return;
316 :
317 1450 : if (!node) {
318 0 : bucket = routing_table.findBucket(socket->deviceId());
319 0 : idToFind = bucket->randomId(rd);
320 : } else {
321 1450 : bucket = routing_table.findBucket(node);
322 1451 : idToFind = node;
323 : }
324 :
325 1451 : sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
326 :
327 1451 : if (!node) {
328 0 : auto& nodeTimer = bucket->getNodeTimer(socket);
329 0 : nodeTimer.expires_after(FIND_PERIOD);
330 0 : nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
331 0 : shared_from_this(),
332 : std::placeholders::_1,
333 : socket,
334 0 : NodeId {}));
335 : }
336 : }
337 :
338 : void
339 1177 : SwarmManager::tryConnect(const NodeId& nodeId)
340 : {
341 1177 : if (needSocketCb_)
342 1172 : needSocketCb_(nodeId.toString(),
343 1091 : [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
344 1091 : auto shared = w.lock();
345 1094 : if (!shared || shared->isShutdown_)
346 218 : return true;
347 873 : if (socket) {
348 799 : shared->addChannel(socket);
349 798 : return true;
350 : }
351 74 : std::unique_lock lk(shared->mutex);
352 74 : auto bucket = shared->routing_table.findBucket(nodeId);
353 74 : bucket->removeConnectingNode(nodeId);
354 74 : bucket->addKnownNode(nodeId);
355 74 : bucket = shared->routing_table.findBucket(shared->getId());
356 74 : if (bucket->getConnectingNodesSize() == 0
357 74 : && bucket->isEmpty() && shared->onConnectionChanged_) {
358 31 : lk.unlock();
359 93 : JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed",
360 : fmt::ptr(shared.get()));
361 31 : shared->onConnectionChanged_(false);
362 : }
363 74 : return true;
364 1090 : });
365 1178 : }
366 :
367 : void
368 552 : SwarmManager::removeNodeInternal(const NodeId& nodeId)
369 : {
370 552 : routing_table.removeNode(nodeId);
371 553 : }
372 :
373 : std::vector<NodeId>
374 0 : SwarmManager::getAllNodes() const
375 : {
376 0 : std::lock_guard lock(mutex);
377 0 : return routing_table.getAllNodes();
378 0 : }
379 :
380 : void
381 17 : SwarmManager::deleteNode(std::vector<NodeId> nodes)
382 : {
383 : {
384 17 : std::lock_guard lock(mutex);
385 29 : for (const auto& node : nodes) {
386 12 : routing_table.deleteNode(node);
387 : }
388 17 : }
389 17 : maintainBuckets();
390 17 : }
391 :
392 : } // namespace jami
|