LCOV - code coverage report
Current view: top level - foo/src/jamidht - message_channel_handler.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 116 123 94.3 %
Date: 2026-01-22 10:39:23 Functions: 20 22 90.9 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : #include "jamidht/message_channel_handler.h"
      18             : 
      19             : #include <dhtnet/channel_utils.h>
      20             : #include <string_view>
      21             : 
      22             : using namespace std::literals;
      23             : 
      24             : static constexpr auto MESSAGE_SCHEME = "msg:"sv;
      25             : 
      26             : namespace jami {
      27             : 
      28             : using Key = std::pair<std::string, DeviceId>;
      29             : 
      30             : struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
      31             : {
      32             :     dhtnet::ConnectionManager& connectionManager_;
      33             :     OnMessage onMessage_;
      34             :     OnPeerStateChanged onPeerStateChanged_;
      35             :     std::recursive_mutex connectionsMtx_;
      36             :     std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>> connections_;
      37             : 
      38         669 :     Impl(dhtnet::ConnectionManager& cm, OnMessage onMessage, OnPeerStateChanged onPeer)
      39        1338 :         : connectionManager_(cm)
      40         669 :         , onMessage_(std::move(onMessage))
      41        1338 :         , onPeerStateChanged_(std::move(onPeer))
      42         669 :     {}
      43             : 
      44             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      45             :                            const std::string& peerId,
      46             :                            const DeviceId& device);
      47             : };
      48             : 
      49         669 : MessageChannelHandler::MessageChannelHandler(dhtnet::ConnectionManager& cm,
      50             :                                              OnMessage onMessage,
      51         669 :                                              OnPeerStateChanged onPeer)
      52             :     : ChannelHandlerInterface()
      53         669 :     , pimpl_(std::make_shared<Impl>(cm, std::move(onMessage), std::move(onPeer)))
      54         669 : {}
      55             : 
      56        1338 : MessageChannelHandler::~MessageChannelHandler()
      57             : {
      58         669 :     std::unique_lock lk(pimpl_->connectionsMtx_);
      59        1178 :     for (const auto& [peerId, _] : pimpl_->connections_) {
      60         509 :         pimpl_->onPeerStateChanged_(peerId, false);
      61             :     }
      62         669 :     auto connections = std::move(pimpl_->connections_);
      63         669 :     pimpl_->connections_.clear();
      64         669 :     lk.unlock();
      65        1338 : }
      66             : 
      67             : void
      68        1423 : MessageChannelHandler::connect(const DeviceId& deviceId,
      69             :                                const std::string&,
      70             :                                ConnectCb&& cb,
      71             :                                const std::string& connectionType,
      72             :                                bool forceNewConnection)
      73             : {
      74        1423 :     auto channelName = concat(MESSAGE_SCHEME, deviceId.to_view());
      75        1422 :     if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
      76        2596 :         JAMI_LOG("Already connecting to {}", deviceId);
      77         649 :         return;
      78             :     }
      79         774 :     pimpl_->connectionManager_
      80         774 :         .connectDevice(deviceId, channelName, std::move(cb), false, forceNewConnection, connectionType);
      81        1423 : }
      82             : 
      83             : void
      84         689 : MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      85             :                                                const std::string& peerId,
      86             :                                                const DeviceId& device)
      87             : {
      88         689 :     std::lock_guard lk(connectionsMtx_);
      89         689 :     auto peerIt = connections_.find(peerId);
      90         689 :     if (peerIt == connections_.end()) {
      91          48 :         JAMI_WARNING("onChannelShutdown: No connections found for peer {}", peerId);
      92          12 :         return;
      93             :     }
      94         677 :     auto connectionsIt = peerIt->second.find(device);
      95         677 :     if (connectionsIt == peerIt->second.end()) {
      96           0 :         JAMI_WARNING("onChannelShutdown: No connections found for device {} of peer {}", device.toString(), peerId);
      97           0 :         return;
      98             :     }
      99         677 :     auto& connections = connectionsIt->second;
     100         677 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     101         676 :     if (conn != connections.end())
     102         677 :         connections.erase(conn);
     103         677 :     if (connections.empty()) {
     104         568 :         peerIt->second.erase(connectionsIt);
     105             :     }
     106         677 :     if (peerIt->second.empty()) {
     107         565 :         connections_.erase(peerIt);
     108         565 :         onPeerStateChanged_(peerId, false);
     109             :     }
     110         689 : }
     111             : 
     112             : std::shared_ptr<dhtnet::ChannelSocket>
     113       14539 : MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
     114             : {
     115       14539 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     116       14539 :     auto it = pimpl_->connections_.find(peer);
     117       14539 :     if (it == pimpl_->connections_.end())
     118        1468 :         return nullptr;
     119       13071 :     auto deviceIt = it->second.find(deviceId);
     120       13071 :     if (deviceIt == it->second.end())
     121           7 :         return nullptr;
     122       13064 :     if (deviceIt->second.empty())
     123           0 :         return nullptr;
     124       13064 :     return deviceIt->second.back();
     125       14539 : }
     126             : 
     127             : std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
     128        3443 : MessageChannelHandler::getChannels(const std::string& peer) const
     129             : {
     130        3443 :     std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
     131        3443 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     132        3443 :     auto it = pimpl_->connections_.find(peer);
     133        3443 :     if (it == pimpl_->connections_.end())
     134        1885 :         return sockets;
     135        1558 :     sockets.reserve(it->second.size());
     136        3117 :     for (auto& [deviceId, channels] : it->second) {
     137        3323 :         for (auto& channel : channels) {
     138        1764 :             sockets.push_back(channel);
     139             :         }
     140             :     }
     141        1557 :     return sockets;
     142        3442 : }
     143             : 
     144             : bool
     145         668 : MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& /* name */)
     146             : {
     147         668 :     if (!cert || !cert->issuer)
     148           0 :         return false;
     149         667 :     return true;
     150             : }
     151             : 
     152             : void
     153        1297 : MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
     154             :                                const std::string&,
     155             :                                std::shared_ptr<dhtnet::ChannelSocket> socket)
     156             : {
     157        1297 :     if (!cert || !cert->issuer)
     158           0 :         return;
     159        1297 :     auto peerId = cert->issuer->getId().toString();
     160        1297 :     auto device = cert->getLongId();
     161        1297 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     162        1297 :     auto& connections = pimpl_->connections_[peerId];
     163        1297 :     bool newPeerConnection = connections.empty();
     164        1295 :     auto& deviceConnections = connections[device];
     165        1295 :     deviceConnections.push_back(socket);
     166        1295 :     if (newPeerConnection)
     167        1075 :         pimpl_->onPeerStateChanged_(peerId, true);
     168             : 
     169        1296 :     socket->setOnRecv(dhtnet::buildMsgpackReader<Message>([onMessage = pimpl_->onMessage_, cert](Message&& msg) {
     170       12798 :         onMessage(cert, msg.t, msg.c);
     171       12807 :         return std::error_code();
     172             :     }));
     173             : 
     174        2588 :     socket->onShutdown(
     175        1294 :         [w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)](const std::error_code& /*ec*/) {
     176        1297 :             if (auto shared = w.lock())
     177        1297 :                 shared->onChannelShutdown(s.lock(), peerId, device);
     178        1297 :         });
     179        1295 : }
     180             : 
     181             : void
     182           3 : MessageChannelHandler::closeChannel(const std::string& peer,
     183             :                                     const DeviceId& device,
     184             :                                     const std::shared_ptr<dhtnet::ChannelSocket>& conn)
     185             : {
     186           3 :     if (!conn)
     187           0 :         return;
     188           3 :     std::unique_lock lk(pimpl_->connectionsMtx_);
     189           3 :     auto it = pimpl_->connections_.find(peer);
     190           3 :     if (it != pimpl_->connections_.end()) {
     191           1 :         auto deviceIt = it->second.find(device);
     192           1 :         if (deviceIt != it->second.end()) {
     193           1 :             auto& channels = deviceIt->second;
     194           1 :             channels.erase(std::remove(channels.begin(), channels.end(), conn), channels.end());
     195           1 :             if (channels.empty()) {
     196           1 :                 it->second.erase(deviceIt);
     197           1 :                 if (it->second.empty()) {
     198           1 :                     pimpl_->connections_.erase(it);
     199           1 :                     pimpl_->onPeerStateChanged_(peer, false);
     200             :                 }
     201             :             }
     202             :         }
     203             :     }
     204           3 :     lk.unlock();
     205           3 :     conn->stop();
     206           3 : }
     207             : 
     208             : bool
     209       12814 : MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const Message& message)
     210             : {
     211       12814 :     if (!socket)
     212           0 :         return false;
     213       12814 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max
     214       12814 :     msgpack::pack(buffer, message);
     215       12811 :     std::error_code ec;
     216       12808 :     auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
     217       12814 :     if (ec) {
     218          12 :         JAMI_WARNING("Error sending message: {:s}", ec.message());
     219             :     }
     220       12813 :     return !ec && sent == buffer.size();
     221       12812 : }
     222             : 
     223             : } // namespace jami

Generated by: LCOV version 1.14