LCOV - code coverage report
Current view: top level - src/jamidht - message_channel_handler.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 84 94 89.4 %
Date: 2024-12-21 08:56:24 Functions: 14 18 77.8 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : #include "jamidht/message_channel_handler.h"
      18             : 
      19             : static constexpr const char MESSAGE_SCHEME[] {"msg:"};
      20             : 
      21             : namespace jami {
      22             : 
      23             : using Key = std::pair<std::string, DeviceId>;
      24             : 
      25             : struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
      26             : {
      27             :     std::weak_ptr<JamiAccount> account_;
      28             :     dhtnet::ConnectionManager& connectionManager_;
      29             :     std::recursive_mutex connectionsMtx_;
      30             :     std::map<Key, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> connections_;
      31             : 
      32         678 :     Impl(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm)
      33         678 :         : account_(acc)
      34        1356 :         , connectionManager_(cm)
      35         678 :     {}
      36             : 
      37             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      38             :                            const std::string& peerId,
      39             :                            const DeviceId& device);
      40             : };
      41             : 
      42         678 : MessageChannelHandler::MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc,
      43         678 :                                              dhtnet::ConnectionManager& cm)
      44             :     : ChannelHandlerInterface()
      45         678 :     , pimpl_(std::make_shared<Impl>(acc, cm))
      46         678 : {}
      47             : 
      48        1356 : MessageChannelHandler::~MessageChannelHandler() {}
      49             : 
      50             : void
      51        1225 : MessageChannelHandler::connect(const DeviceId& deviceId,
      52             :                                const std::string&,
      53             :                                ConnectCb&& cb,
      54             :                                const std::string& connectionType,
      55             :                                bool forceNewConnection)
      56             : {
      57        1225 :     auto channelName = MESSAGE_SCHEME + deviceId.toString();
      58        1224 :     if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
      59         474 :         JAMI_INFO("Already connecting to %s", deviceId.to_c_str());
      60         474 :         return;
      61             :     }
      62        1501 :     pimpl_->connectionManager_.connectDevice(deviceId,
      63             :                                              channelName,
      64         751 :                                              std::move(cb),
      65             :                                              false,
      66             :                                              forceNewConnection,
      67             :                                              connectionType);
      68        1225 : }
      69             : 
      70             : void
      71         711 : MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      72             :                                                const std::string& peerId,
      73             :                                                const DeviceId& device)
      74             : {
      75         711 :     std::lock_guard lk(connectionsMtx_);
      76         711 :     auto connectionsIt = connections_.find({peerId, device});
      77         711 :     if (connectionsIt == connections_.end())
      78           0 :         return;
      79         711 :     auto& connections = connectionsIt->second;
      80         711 :     auto conn = std::find(connections.begin(), connections.end(), socket);
      81         711 :     if (conn != connections.end())
      82         711 :         connections.erase(conn);
      83         711 :     if (connections.empty())
      84         661 :         connections_.erase(connectionsIt);
      85         711 : }
      86             : 
      87             : std::shared_ptr<dhtnet::ChannelSocket>
      88       16340 : MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
      89             : {
      90       16340 :     std::lock_guard lk(pimpl_->connectionsMtx_);
      91       16340 :     auto it = pimpl_->connections_.find({peer, deviceId});
      92       16340 :     if (it == pimpl_->connections_.end())
      93        1273 :         return nullptr;
      94       15067 :     if (it->second.empty())
      95           0 :         return nullptr;
      96       15067 :     return it->second.front();
      97       16339 : }
      98             : 
      99             : std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
     100        3103 : MessageChannelHandler::getChannels(const std::string& peer) const
     101             : {
     102        3103 :     std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
     103        3104 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     104        3104 :     auto lower = pimpl_->connections_.lower_bound({peer, DeviceId()});
     105        4229 :     for (auto it = lower; it != pimpl_->connections_.end() && it->first.first == peer; ++it)
     106        1125 :         sockets.insert(sockets.end(), it->second.begin(), it->second.end());
     107        6208 :     return sockets;
     108        3104 : }
     109             : 
     110             : bool
     111         701 : MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert,
     112             :                                  const std::string& /* name */)
     113             : {
     114         701 :     auto acc = pimpl_->account_.lock();
     115         701 :     if (!cert || !cert->issuer || !acc)
     116           0 :         return false;
     117         701 :     return true;
     118             :     // return cert->issuer->getId().toString() == acc->getUsername();
     119         701 : }
     120             : 
     121             : void
     122        1360 : MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
     123             :                                const std::string&,
     124             :                                std::shared_ptr<dhtnet::ChannelSocket> socket)
     125             : {
     126        1360 :     auto acc = pimpl_->account_.lock();
     127        1360 :     if (!cert || !cert->issuer || !acc)
     128           0 :         return;
     129        1360 :     auto peerId = cert->issuer->getId().toString();
     130        1360 :     auto device = cert->getLongId();
     131        1360 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     132        1360 :     pimpl_->connections_[{peerId, device}].emplace_back(socket);
     133             : 
     134        1359 :     socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)]() {
     135        1360 :         if (auto shared = w.lock())
     136        1360 :             shared->onChannelShutdown(s.lock(), peerId, device);
     137        1360 :     });
     138             : 
     139             :     struct DecodingContext
     140             :     {
     141       78682 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     142             :                                nullptr,
     143             :                                1500};
     144             :     };
     145             : 
     146        1360 :     socket->setOnRecv([acc = pimpl_->account_.lock(),
     147             :                        peerId,
     148             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     149       15744 :         if (!buf || !acc)
     150           0 :             return len;
     151             : 
     152       15743 :         ctx->pac.reserve_buffer(len);
     153       15745 :         std::copy_n(buf, len, ctx->pac.buffer());
     154       15745 :         ctx->pac.buffer_consumed(len);
     155             : 
     156       15745 :         msgpack::object_handle oh;
     157             :         try {
     158       31489 :             while (ctx->pac.next(oh)) {
     159       15741 :                 Message msg;
     160       15742 :                 oh.get().convert(msg);
     161       15744 :                 acc->handleMessage(peerId, {msg.t, msg.c});
     162       15745 :             }
     163           0 :         } catch (const std::exception& e) {
     164           0 :             JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
     165           0 :         }
     166             : 
     167       15743 :         return len;
     168       15743 :     });
     169        1360 : }
     170             : 
     171             : bool
     172       15755 : MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
     173             :                                    const Message& message)
     174             : {
     175       15755 :     if (!socket)
     176           0 :         return false;
     177       15755 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max
     178       15755 :     msgpack::pack(buffer, message);
     179       15755 :     std::error_code ec;
     180       15755 :     auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
     181       15755 :     if (ec) {
     182           0 :         JAMI_WARNING("Error sending message: {:s}", ec.message());
     183             :     }
     184       15755 :     return !ec && sent == buffer.size();
     185       15755 : }
     186             : 
     187             : } // namespace jami

Generated by: LCOV version 1.14