LCOV - code coverage report
Current view: top level - src/jamidht - message_channel_handler.cpp (source / functions) Coverage Total Hit
Test: jami-coverage-filtered.info Lines: 79.7 % 123 98
Test Date: 2026-06-13 09:18:46 Functions: 77.3 % 22 17

            Line data    Source code
       1              : /*
       2              :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3              :  *
       4              :  *  This program is free software: you can redistribute it and/or modify
       5              :  *  it under the terms of the GNU General Public License as published by
       6              :  *  the Free Software Foundation, either version 3 of the License, or
       7              :  *  (at your option) any later version.
       8              :  *
       9              :  *  This program is distributed in the hope that it will be useful,
      10              :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12              :  *  GNU General Public License for more details.
      13              :  *
      14              :  *  You should have received a copy of the GNU General Public License
      15              :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16              :  */
      17              : #include "jamidht/message_channel_handler.h"
      18              : 
      19              : #include <dhtnet/channel_utils.h>
      20              : #include <string_view>
      21              : 
      22              : using namespace std::literals;
      23              : 
      24              : static constexpr auto MESSAGE_SCHEME = "msg:"sv;
      25              : 
      26              : namespace jami {
      27              : 
      28              : using Key = std::pair<std::string, DeviceId>;
      29              : 
      30              : struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
      31              : {
      32              :     dhtnet::ConnectionManager& connectionManager_;
      33              :     OnMessage onMessage_;
      34              :     OnPeerStateChanged onPeerStateChanged_;
      35              :     std::recursive_mutex connectionsMtx_;
      36              :     std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>> connections_;
      37              : 
      38          672 :     Impl(dhtnet::ConnectionManager& cm, OnMessage onMessage, OnPeerStateChanged onPeer)
      39         1344 :         : connectionManager_(cm)
      40          672 :         , onMessage_(std::move(onMessage))
      41         1344 :         , onPeerStateChanged_(std::move(onPeer))
      42          672 :     {}
      43              : 
      44              :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      45              :                            const std::string& peerId,
      46              :                            const DeviceId& device);
      47              : };
      48              : 
      49          672 : MessageChannelHandler::MessageChannelHandler(dhtnet::ConnectionManager& cm,
      50              :                                              OnMessage onMessage,
      51          672 :                                              OnPeerStateChanged onPeer)
      52              :     : ChannelHandlerInterface()
      53          672 :     , pimpl_(std::make_shared<Impl>(cm, std::move(onMessage), std::move(onPeer)))
      54          672 : {}
      55              : 
      56         1344 : MessageChannelHandler::~MessageChannelHandler()
      57              : {
      58          672 :     std::unique_lock lk(pimpl_->connectionsMtx_);
      59         1264 :     for (const auto& [peerId, _] : pimpl_->connections_) {
      60          592 :         pimpl_->onPeerStateChanged_(peerId, false);
      61              :     }
      62          672 :     auto connections = std::move(pimpl_->connections_);
      63          672 :     pimpl_->connections_.clear();
      64          672 :     lk.unlock();
      65         1344 : }
      66              : 
      67              : void
      68         2569 : MessageChannelHandler::connect(const DeviceId& deviceId,
      69              :                                const std::string&,
      70              :                                ConnectCb&& cb,
      71              :                                const std::string& connectionType,
      72              :                                bool forceNewConnection)
      73              : {
      74         2569 :     auto channelName = concat(MESSAGE_SCHEME, deviceId.to_view());
      75         2569 :     if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
      76         4950 :         JAMI_LOG("Already connecting to {}", deviceId);
      77         1238 :         return;
      78              :     }
      79         1331 :     pimpl_->connectionManager_
      80         1331 :         .connectDevice(deviceId, channelName, std::move(cb), false, forceNewConnection, connectionType);
      81         2569 : }
      82              : 
      83              : void
      84         1362 : MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      85              :                                                const std::string& peerId,
      86              :                                                const DeviceId& device)
      87              : {
      88         1362 :     std::lock_guard lk(connectionsMtx_);
      89         1363 :     auto peerIt = connections_.find(peerId);
      90         1363 :     if (peerIt == connections_.end()) {
      91          408 :         JAMI_WARNING("onChannelShutdown: No connections found for peer {}", peerId);
      92          102 :         return;
      93              :     }
      94         1261 :     auto connectionsIt = peerIt->second.find(device);
      95         1261 :     if (connectionsIt == peerIt->second.end()) {
      96            0 :         JAMI_WARNING("onChannelShutdown: No connections found for device {} of peer {}", device.toString(), peerId);
      97            0 :         return;
      98              :     }
      99         1261 :     auto& connections = connectionsIt->second;
     100         1261 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     101         1261 :     if (conn != connections.end())
     102         1261 :         connections.erase(conn);
     103         1261 :     if (connections.empty()) {
     104          640 :         peerIt->second.erase(connectionsIt);
     105              :     }
     106         1261 :     if (peerIt->second.empty()) {
     107          637 :         connections_.erase(peerIt);
     108          637 :         onPeerStateChanged_(peerId, false);
     109              :     }
     110         1363 : }
     111              : 
     112              : std::shared_ptr<dhtnet::ChannelSocket>
     113        18115 : MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
     114              : {
     115        18115 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     116        18115 :     auto it = pimpl_->connections_.find(peer);
     117        18113 :     if (it == pimpl_->connections_.end())
     118         2564 :         return nullptr;
     119        15551 :     auto deviceIt = it->second.find(deviceId);
     120        15551 :     if (deviceIt == it->second.end())
     121           16 :         return nullptr;
     122        15535 :     if (deviceIt->second.empty())
     123            0 :         return nullptr;
     124        15535 :     return deviceIt->second.back();
     125        18115 : }
     126              : 
     127              : std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
     128         3089 : MessageChannelHandler::getChannels(const std::string& peer) const
     129              : {
     130         3089 :     std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
     131         3089 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     132         3089 :     auto it = pimpl_->connections_.find(peer);
     133         3089 :     if (it == pimpl_->connections_.end())
     134         1855 :         return sockets;
     135         1234 :     sockets.reserve(it->second.size());
     136         2470 :     for (auto& [deviceId, channels] : it->second) {
     137         3621 :         for (auto& channel : channels) {
     138         2384 :             sockets.push_back(channel);
     139              :         }
     140              :     }
     141         1234 :     return sockets;
     142         3089 : }
     143              : 
     144              : bool
     145         1235 : MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& /* name */)
     146              : {
     147         1235 :     if (!cert || !cert->issuer)
     148            0 :         return false;
     149         1236 :     return true;
     150              : }
     151              : 
     152              : void
     153         2449 : MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
     154              :                                const std::string&,
     155              :                                std::shared_ptr<dhtnet::ChannelSocket> socket)
     156              : {
     157         2449 :     if (!cert || !cert->issuer)
     158            0 :         return;
     159         2449 :     auto peerId = cert->issuer->getId().toString();
     160         2448 :     auto device = cert->getLongId();
     161         2448 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     162         2447 :     auto& connections = pimpl_->connections_[peerId];
     163         2448 :     bool newPeerConnection = connections.empty();
     164         2448 :     auto& deviceConnections = connections[device];
     165         2447 :     deviceConnections.push_back(socket);
     166         2448 :     if (newPeerConnection)
     167         1228 :         pimpl_->onPeerStateChanged_(peerId, true);
     168              : 
     169         2447 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>([onMessage = pimpl_->onMessage_, cert](Message&& msg) {
     170        15568 :         onMessage(cert, msg.t, msg.c);
     171        15570 :         return std::error_code();
     172              :     }));
     173              : 
     174         4892 :     socket->onShutdown(
     175         4887 :         [w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)](const std::error_code& /*ec*/) {
     176         2450 :             if (auto shared = w.lock())
     177         2449 :                 shared->onChannelShutdown(s.lock(), peerId, device);
     178         2450 :         });
     179         2445 : }
     180              : 
     181              : void
     182            0 : MessageChannelHandler::closeChannel(const std::string& peer,
     183              :                                     const DeviceId& device,
     184              :                                     const std::shared_ptr<dhtnet::ChannelSocket>& conn)
     185              : {
     186            0 :     if (!conn)
     187            0 :         return;
     188            0 :     std::unique_lock lk(pimpl_->connectionsMtx_);
     189            0 :     auto it = pimpl_->connections_.find(peer);
     190            0 :     if (it != pimpl_->connections_.end()) {
     191            0 :         auto deviceIt = it->second.find(device);
     192            0 :         if (deviceIt != it->second.end()) {
     193            0 :             auto& channels = deviceIt->second;
     194            0 :             channels.erase(std::remove(channels.begin(), channels.end(), conn), channels.end());
     195            0 :             if (channels.empty()) {
     196            0 :                 it->second.erase(deviceIt);
     197            0 :                 if (it->second.empty()) {
     198            0 :                     pimpl_->connections_.erase(it);
     199            0 :                     pimpl_->onPeerStateChanged_(peer, false);
     200              :                 }
     201              :             }
     202              :         }
     203              :     }
     204            0 :     lk.unlock();
     205            0 :     conn->stop();
     206            0 : }
     207              : 
     208              : bool
     209        15580 : MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const Message& message)
     210              : {
     211        15580 :     if (!socket)
     212            0 :         return false;
     213        15579 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max
     214        15580 :     msgpack::pack(buffer, message);
     215        15580 :     std::error_code ec;
     216        15579 :     auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
     217        15580 :     if (ec) {
     218            0 :         JAMI_WARNING("Error sending message: {:s}", ec.message());
     219              :     }
     220        15580 :     return !ec && sent == buffer.size();
     221        15580 : }
     222              : 
     223              : } // namespace jami
        

Generated by: LCOV version 2.0-1