LCOV - code coverage report
Current view: top level - src/jamidht - message_channel_handler.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 85 94 90.4 %
Date: 2024-11-13 09:04:27 Functions: 16 18 88.9 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : #include "jamidht/message_channel_handler.h"
      18             : 
      19             : static constexpr const char MESSAGE_SCHEME[] {"msg:"};
      20             : 
      21             : namespace jami {
      22             : 
      23             : using Key = std::pair<std::string, DeviceId>;
      24             : 
      25             : struct MessageChannelHandler::Impl: public std::enable_shared_from_this<Impl>
      26             : {
      27             :     std::weak_ptr<JamiAccount> account_;
      28             :     dhtnet::ConnectionManager& connectionManager_;
      29             :     std::recursive_mutex connectionsMtx_;
      30             :     std::map<Key, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> connections_;
      31             : 
      32         680 :     Impl(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm)
      33         680 :         : account_(acc)
      34        1360 :         , connectionManager_(cm)
      35         680 :     {}
      36             : 
      37             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::string& peerId, const DeviceId& device);
      38             : };
      39             : 
      40         680 : MessageChannelHandler::MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc,
      41         680 :                                        dhtnet::ConnectionManager& cm)
      42             :     : ChannelHandlerInterface()
      43         680 :     , pimpl_(std::make_shared<Impl>(acc, cm))
      44         680 : {}
      45             : 
      46        1360 : MessageChannelHandler::~MessageChannelHandler() {}
      47             : 
      48             : void
      49        1309 : MessageChannelHandler::connect(const DeviceId& deviceId, const std::string&, ConnectCb&& cb)
      50             : {
      51        1309 :     auto channelName = MESSAGE_SCHEME + deviceId.toString();
      52        1309 :     if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
      53         550 :         JAMI_INFO("Already connecting to %s", deviceId.to_c_str());
      54         550 :         return;
      55             :     }
      56        1518 :     pimpl_->connectionManager_.connectDevice(deviceId,
      57             :                                      channelName,
      58         759 :                                      std::move(cb));
      59        1309 : }
      60             : 
      61             : void
      62         710 : MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::string& peerId, const DeviceId& device)
      63             : {
      64         710 :     std::lock_guard lk(connectionsMtx_);
      65         710 :     auto connectionsIt = connections_.find({peerId, device});
      66         710 :     if (connectionsIt == connections_.end())
      67           0 :         return;
      68         710 :     auto& connections = connectionsIt->second;
      69         710 :     auto conn = std::find(connections.begin(), connections.end(), socket);
      70         710 :     if (conn != connections.end())
      71         710 :         connections.erase(conn);
      72         710 :     if (connections.empty())
      73         659 :         connections_.erase(connectionsIt);
      74         710 : }
      75             : 
      76             : std::shared_ptr<dhtnet::ChannelSocket>
      77       16121 : MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
      78             : {
      79       16121 :     std::lock_guard lk(pimpl_->connectionsMtx_);
      80       16121 :     auto it = pimpl_->connections_.find({peer, deviceId});
      81       16121 :     if (it == pimpl_->connections_.end())
      82        1357 :         return nullptr;
      83       14764 :     if (it->second.empty())
      84           0 :         return nullptr;
      85       14764 :     return it->second.front();
      86       16121 : }
      87             : 
      88             : std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
      89       15636 : MessageChannelHandler::getChannels(const std::string& peer) const
      90             : {
      91       15636 :     std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
      92       15636 :     std::lock_guard lk(pimpl_->connectionsMtx_);
      93       15636 :     auto lower = pimpl_->connections_.lower_bound({peer, DeviceId()});
      94       29235 :     for (auto it = lower; it != pimpl_->connections_.end() && it->first.first == peer; ++it)
      95       13599 :         sockets.insert(sockets.end(), it->second.begin(), it->second.end());
      96       31272 :     return sockets;
      97       15636 : }
      98             : 
      99             : bool
     100         701 : MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert,
     101             :                               const std::string& /* name */)
     102             : {
     103         701 :     auto acc = pimpl_->account_.lock();
     104         701 :     if (!cert || !cert->issuer || !acc)
     105           0 :         return false;
     106         701 :     return true;
     107             :     //return cert->issuer->getId().toString() == acc->getUsername();
     108         701 : }
     109             : 
     110             : void
     111        1359 : MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
     112             :                             const std::string&,
     113             :                             std::shared_ptr<dhtnet::ChannelSocket> socket)
     114             : {
     115        1359 :     auto acc = pimpl_->account_.lock();
     116        1358 :     if (!cert || !cert->issuer || !acc)
     117           0 :         return;
     118        1358 :     auto peerId = cert->issuer->getId().toString();
     119        1359 :     auto device = cert->getLongId();
     120        1359 :     std::lock_guard lk(pimpl_->connectionsMtx_);
     121        1359 :     pimpl_->connections_[{peerId, device}].emplace_back(socket);
     122             : 
     123        1359 :     socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s=std::weak_ptr(socket)]() {
     124        1359 :         if (auto shared = w.lock())
     125        1359 :             shared->onChannelShutdown(s.lock(), peerId, device);
     126        1359 :     });
     127             : 
     128             :     struct DecodingContext
     129             :     {
     130      140338 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     131             :                                nullptr,
     132             :                                1500};
     133             :     };
     134             : 
     135        1359 :     socket->setOnRecv([acc = pimpl_->account_.lock(), peerId,
     136             :                        ctx = std::make_shared<DecodingContext>()
     137             :     ](const uint8_t* buf, size_t len) {
     138       28073 :         if (!buf || !acc)
     139           0 :             return len;
     140             : 
     141       28072 :         ctx->pac.reserve_buffer(len);
     142       28072 :         std::copy_n(buf, len, ctx->pac.buffer());
     143       28075 :         ctx->pac.buffer_consumed(len);
     144             : 
     145       28074 :         msgpack::object_handle oh;
     146             :         try {
     147       56147 :             while (ctx->pac.next(oh)) {
     148       28069 :                 Message msg;
     149       28069 :                 oh.get().convert(msg);
     150       28071 :                 acc->handleMessage(peerId, {msg.t, msg.c});
     151       28075 :             }
     152           0 :         } catch (const std::exception& e) {
     153           0 :             JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
     154           0 :         }
     155             : 
     156       28073 :         return len;
     157       28073 :     });
     158        1358 : }
     159             : 
     160             : bool
     161       28106 : MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const Message& message)
     162             : {
     163       28106 :     if (!socket)
     164           0 :         return false;
     165       28106 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max
     166       28106 :     msgpack::pack(buffer, message);
     167       28105 :     std::error_code ec;
     168       28105 :     auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
     169       28106 :     if (ec) {
     170           9 :         JAMI_WARNING("Error sending message: {:s}", ec.message());
     171             :     }
     172       28106 :     return !ec && sent == buffer.size();
     173       28106 : }
     174             : 
     175             : } // namespace jami

Generated by: LCOV version 1.14