Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "jamidht/message_channel_handler.h"
18 :
19 : static constexpr const char MESSAGE_SCHEME[] {"msg:"};
20 :
21 : namespace jami {
22 :
23 : using Key = std::pair<std::string, DeviceId>;
24 :
25 : struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
26 : {
27 : std::weak_ptr<JamiAccount> account_;
28 : dhtnet::ConnectionManager& connectionManager_;
29 : std::recursive_mutex connectionsMtx_;
30 : std::map<Key, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> connections_;
31 :
32 678 : Impl(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm)
33 678 : : account_(acc)
34 1356 : , connectionManager_(cm)
35 678 : {}
36 :
37 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
38 : const std::string& peerId,
39 : const DeviceId& device);
40 : };
41 :
42 678 : MessageChannelHandler::MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc,
43 678 : dhtnet::ConnectionManager& cm)
44 : : ChannelHandlerInterface()
45 678 : , pimpl_(std::make_shared<Impl>(acc, cm))
46 678 : {}
47 :
48 1356 : MessageChannelHandler::~MessageChannelHandler() {}
49 :
50 : void
51 1225 : MessageChannelHandler::connect(const DeviceId& deviceId,
52 : const std::string&,
53 : ConnectCb&& cb,
54 : const std::string& connectionType,
55 : bool forceNewConnection)
56 : {
57 1225 : auto channelName = MESSAGE_SCHEME + deviceId.toString();
58 1224 : if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
59 474 : JAMI_INFO("Already connecting to %s", deviceId.to_c_str());
60 474 : return;
61 : }
62 1501 : pimpl_->connectionManager_.connectDevice(deviceId,
63 : channelName,
64 751 : std::move(cb),
65 : false,
66 : forceNewConnection,
67 : connectionType);
68 1225 : }
69 :
70 : void
71 711 : MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
72 : const std::string& peerId,
73 : const DeviceId& device)
74 : {
75 711 : std::lock_guard lk(connectionsMtx_);
76 711 : auto connectionsIt = connections_.find({peerId, device});
77 711 : if (connectionsIt == connections_.end())
78 0 : return;
79 711 : auto& connections = connectionsIt->second;
80 711 : auto conn = std::find(connections.begin(), connections.end(), socket);
81 711 : if (conn != connections.end())
82 711 : connections.erase(conn);
83 711 : if (connections.empty())
84 661 : connections_.erase(connectionsIt);
85 711 : }
86 :
87 : std::shared_ptr<dhtnet::ChannelSocket>
88 16340 : MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
89 : {
90 16340 : std::lock_guard lk(pimpl_->connectionsMtx_);
91 16340 : auto it = pimpl_->connections_.find({peer, deviceId});
92 16340 : if (it == pimpl_->connections_.end())
93 1273 : return nullptr;
94 15067 : if (it->second.empty())
95 0 : return nullptr;
96 15067 : return it->second.front();
97 16339 : }
98 :
99 : std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
100 3103 : MessageChannelHandler::getChannels(const std::string& peer) const
101 : {
102 3103 : std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
103 3104 : std::lock_guard lk(pimpl_->connectionsMtx_);
104 3104 : auto lower = pimpl_->connections_.lower_bound({peer, DeviceId()});
105 4229 : for (auto it = lower; it != pimpl_->connections_.end() && it->first.first == peer; ++it)
106 1125 : sockets.insert(sockets.end(), it->second.begin(), it->second.end());
107 6208 : return sockets;
108 3104 : }
109 :
110 : bool
111 701 : MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert,
112 : const std::string& /* name */)
113 : {
114 701 : auto acc = pimpl_->account_.lock();
115 701 : if (!cert || !cert->issuer || !acc)
116 0 : return false;
117 701 : return true;
118 : // return cert->issuer->getId().toString() == acc->getUsername();
119 701 : }
120 :
121 : void
122 1360 : MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
123 : const std::string&,
124 : std::shared_ptr<dhtnet::ChannelSocket> socket)
125 : {
126 1360 : auto acc = pimpl_->account_.lock();
127 1360 : if (!cert || !cert->issuer || !acc)
128 0 : return;
129 1360 : auto peerId = cert->issuer->getId().toString();
130 1360 : auto device = cert->getLongId();
131 1360 : std::lock_guard lk(pimpl_->connectionsMtx_);
132 1360 : pimpl_->connections_[{peerId, device}].emplace_back(socket);
133 :
134 1359 : socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)]() {
135 1360 : if (auto shared = w.lock())
136 1360 : shared->onChannelShutdown(s.lock(), peerId, device);
137 1360 : });
138 :
139 : struct DecodingContext
140 : {
141 78682 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
142 : nullptr,
143 : 1500};
144 : };
145 :
146 1360 : socket->setOnRecv([acc = pimpl_->account_.lock(),
147 : peerId,
148 : ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
149 15744 : if (!buf || !acc)
150 0 : return len;
151 :
152 15743 : ctx->pac.reserve_buffer(len);
153 15745 : std::copy_n(buf, len, ctx->pac.buffer());
154 15745 : ctx->pac.buffer_consumed(len);
155 :
156 15745 : msgpack::object_handle oh;
157 : try {
158 31489 : while (ctx->pac.next(oh)) {
159 15741 : Message msg;
160 15742 : oh.get().convert(msg);
161 15744 : acc->handleMessage(peerId, {msg.t, msg.c});
162 15745 : }
163 0 : } catch (const std::exception& e) {
164 0 : JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
165 0 : }
166 :
167 15743 : return len;
168 15743 : });
169 1360 : }
170 :
171 : bool
172 15755 : MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
173 : const Message& message)
174 : {
175 15755 : if (!socket)
176 0 : return false;
177 15755 : msgpack::sbuffer buffer(UINT16_MAX); // Use max
178 15755 : msgpack::pack(buffer, message);
179 15755 : std::error_code ec;
180 15755 : auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
181 15755 : if (ec) {
182 0 : JAMI_WARNING("Error sending message: {:s}", ec.message());
183 : }
184 15755 : return !ec && sent == buffer.size();
185 15755 : }
186 :
187 : } // namespace jami
|