Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "jamidht/message_channel_handler.h"
18 :
19 : static constexpr const char MESSAGE_SCHEME[] {"msg:"};
20 :
21 : namespace jami {
22 :
23 : using Key = std::pair<std::string, DeviceId>;
24 :
25 : struct MessageChannelHandler::Impl: public std::enable_shared_from_this<Impl>
26 : {
27 : std::weak_ptr<JamiAccount> account_;
28 : dhtnet::ConnectionManager& connectionManager_;
29 : std::recursive_mutex connectionsMtx_;
30 : std::map<Key, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> connections_;
31 :
32 680 : Impl(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm)
33 680 : : account_(acc)
34 1360 : , connectionManager_(cm)
35 680 : {}
36 :
37 : void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::string& peerId, const DeviceId& device);
38 : };
39 :
40 680 : MessageChannelHandler::MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc,
41 680 : dhtnet::ConnectionManager& cm)
42 : : ChannelHandlerInterface()
43 680 : , pimpl_(std::make_shared<Impl>(acc, cm))
44 680 : {}
45 :
46 1360 : MessageChannelHandler::~MessageChannelHandler() {}
47 :
48 : void
49 1309 : MessageChannelHandler::connect(const DeviceId& deviceId, const std::string&, ConnectCb&& cb)
50 : {
51 1309 : auto channelName = MESSAGE_SCHEME + deviceId.toString();
52 1309 : if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
53 550 : JAMI_INFO("Already connecting to %s", deviceId.to_c_str());
54 550 : return;
55 : }
56 1518 : pimpl_->connectionManager_.connectDevice(deviceId,
57 : channelName,
58 759 : std::move(cb));
59 1309 : }
60 :
61 : void
62 710 : MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::string& peerId, const DeviceId& device)
63 : {
64 710 : std::lock_guard lk(connectionsMtx_);
65 710 : auto connectionsIt = connections_.find({peerId, device});
66 710 : if (connectionsIt == connections_.end())
67 0 : return;
68 710 : auto& connections = connectionsIt->second;
69 710 : auto conn = std::find(connections.begin(), connections.end(), socket);
70 710 : if (conn != connections.end())
71 710 : connections.erase(conn);
72 710 : if (connections.empty())
73 659 : connections_.erase(connectionsIt);
74 710 : }
75 :
76 : std::shared_ptr<dhtnet::ChannelSocket>
77 16121 : MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
78 : {
79 16121 : std::lock_guard lk(pimpl_->connectionsMtx_);
80 16121 : auto it = pimpl_->connections_.find({peer, deviceId});
81 16121 : if (it == pimpl_->connections_.end())
82 1357 : return nullptr;
83 14764 : if (it->second.empty())
84 0 : return nullptr;
85 14764 : return it->second.front();
86 16121 : }
87 :
88 : std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
89 15636 : MessageChannelHandler::getChannels(const std::string& peer) const
90 : {
91 15636 : std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
92 15636 : std::lock_guard lk(pimpl_->connectionsMtx_);
93 15636 : auto lower = pimpl_->connections_.lower_bound({peer, DeviceId()});
94 29235 : for (auto it = lower; it != pimpl_->connections_.end() && it->first.first == peer; ++it)
95 13599 : sockets.insert(sockets.end(), it->second.begin(), it->second.end());
96 31272 : return sockets;
97 15636 : }
98 :
99 : bool
100 701 : MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert,
101 : const std::string& /* name */)
102 : {
103 701 : auto acc = pimpl_->account_.lock();
104 701 : if (!cert || !cert->issuer || !acc)
105 0 : return false;
106 701 : return true;
107 : //return cert->issuer->getId().toString() == acc->getUsername();
108 701 : }
109 :
110 : void
111 1359 : MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
112 : const std::string&,
113 : std::shared_ptr<dhtnet::ChannelSocket> socket)
114 : {
115 1359 : auto acc = pimpl_->account_.lock();
116 1358 : if (!cert || !cert->issuer || !acc)
117 0 : return;
118 1358 : auto peerId = cert->issuer->getId().toString();
119 1359 : auto device = cert->getLongId();
120 1359 : std::lock_guard lk(pimpl_->connectionsMtx_);
121 1359 : pimpl_->connections_[{peerId, device}].emplace_back(socket);
122 :
123 1359 : socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s=std::weak_ptr(socket)]() {
124 1359 : if (auto shared = w.lock())
125 1359 : shared->onChannelShutdown(s.lock(), peerId, device);
126 1359 : });
127 :
128 : struct DecodingContext
129 : {
130 140338 : msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
131 : nullptr,
132 : 1500};
133 : };
134 :
135 1359 : socket->setOnRecv([acc = pimpl_->account_.lock(), peerId,
136 : ctx = std::make_shared<DecodingContext>()
137 : ](const uint8_t* buf, size_t len) {
138 28073 : if (!buf || !acc)
139 0 : return len;
140 :
141 28072 : ctx->pac.reserve_buffer(len);
142 28072 : std::copy_n(buf, len, ctx->pac.buffer());
143 28075 : ctx->pac.buffer_consumed(len);
144 :
145 28074 : msgpack::object_handle oh;
146 : try {
147 56147 : while (ctx->pac.next(oh)) {
148 28069 : Message msg;
149 28069 : oh.get().convert(msg);
150 28071 : acc->handleMessage(peerId, {msg.t, msg.c});
151 28075 : }
152 0 : } catch (const std::exception& e) {
153 0 : JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
154 0 : }
155 :
156 28073 : return len;
157 28073 : });
158 1358 : }
159 :
160 : bool
161 28106 : MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const Message& message)
162 : {
163 28106 : if (!socket)
164 0 : return false;
165 28106 : msgpack::sbuffer buffer(UINT16_MAX); // Use max
166 28106 : msgpack::pack(buffer, message);
167 28105 : std::error_code ec;
168 28105 : auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
169 28106 : if (ec) {
170 9 : JAMI_WARNING("Error sending message: {:s}", ec.message());
171 : }
172 28106 : return !ec && sent == buffer.size();
173 28106 : }
174 :
175 : } // namespace jami
|