LCOV - code coverage report
Current view: top level - foo/src/jamidht - channeled_transport.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 115 126 91.3 %
Date: 2025-08-24 09:11:10 Functions: 17 22 77.3 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "channeled_transport.h"
      19             : 
      20             : #include "logger.h"
      21             : #include <dhtnet/multiplexed_socket.h>
      22             : #include "connectivity/sip_utils.h"
      23             : 
      24             : #include <pjsip/sip_transport.h>
      25             : #include <pjsip/sip_endpoint.h>
      26             : #include <pj/compat/socket.h>
      27             : #include <pj/lock.h>
      28             : 
      29             : namespace jami {
      30             : namespace tls {
      31             : 
      32         188 : ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
      33             :                                              const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      34         188 :                                              onShutdownCb&& cb)
      35         188 :     : socket_(socket)
      36         188 :     , shutdownCb_(std::move(cb))
      37         188 :     , trData_()
      38         188 :     , pool_ {nullptr, pj_pool_release}
      39         564 :     , rxPool_(nullptr, pj_pool_release)
      40             : {
      41         188 :     local_ = socket->getLocalAddress();
      42         188 :     remote_ = socket->getRemoteAddress();
      43         188 :     int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
      44             : 
      45         564 :     JAMI_LOG("ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
      46             : 
      47             :     // Init memory
      48         188 :     trData_.self = this; // up-link for PJSIP callbacks
      49             : 
      50         376 :     pool_ = sip_utils::smart_alloc_pool(endpt,
      51             :                                         "channeled.pool",
      52             :                                         sip_utils::POOL_TP_INIT,
      53         188 :                                         sip_utils::POOL_TP_INC);
      54             : 
      55         188 :     auto& base = trData_.base;
      56         188 :     std::memset(&base, 0, sizeof(base));
      57             : 
      58         188 :     pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
      59         188 :     base.endpt = endpt;
      60         188 :     base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
      61         188 :     base.pool = pool_.get();
      62             : 
      63         188 :     if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
      64           0 :         throw std::runtime_error("Unable to create PJSIP atomic.");
      65             : 
      66         188 :     if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
      67           0 :         throw std::runtime_error("Unable to create PJSIP mutex.");
      68             : 
      69         188 :     if (not local_) {
      70           0 :         JAMI_ERROR("Invalid local address");
      71           0 :         throw std::runtime_error("Invalid local address");
      72             :     }
      73         188 :     if (not remote_) {
      74           0 :         JAMI_ERROR("Invalid remote address");
      75           0 :         throw std::runtime_error("Invalid remote address");
      76             :     }
      77             : 
      78         188 :     pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
      79         188 :     base.key.type = tp_type;
      80         188 :     auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
      81         188 :     base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
      82         188 :     base.flag = pjsip_transport_get_flag_from_type(reg_type);
      83         188 :     base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
      84             : 
      85         188 :     auto remote_addr = remote_.toString();
      86         188 :     pj_ansi_snprintf(base.info,
      87             :                      sip_utils::TRANSPORT_INFO_LENGTH,
      88             :                      "%s to %s",
      89             :                      base.type_name,
      90             :                      remote_addr.c_str());
      91         188 :     base.addr_len = remote_.getLength();
      92         188 :     base.dir = PJSIP_TP_DIR_NONE;
      93             : 
      94             :     // Set initial local address
      95         188 :     pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
      96             : 
      97         188 :     sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
      98         188 :     sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
      99             : 
     100             :     // Init transport callbacks
     101        1755 :     base.send_msg = [](pjsip_transport* transport,
     102             :                        pjsip_tx_data* tdata,
     103             :                        const pj_sockaddr_t* rem_addr,
     104             :                        int addr_len,
     105             :                        void* token,
     106             :                        pjsip_transport_callback callback) -> pj_status_t {
     107        1567 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     108             :             reinterpret_cast<TransportData*>(transport)->self);
     109        1567 :         return this_->send(tdata, rem_addr, addr_len, token, callback);
     110         188 :     };
     111         190 :     base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
     112           2 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     113             :             reinterpret_cast<TransportData*>(transport)->self);
     114           6 :         JAMI_LOG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
     115             :                  fmt::ptr(this_),
     116             :                  fmt::ptr(transport),
     117             :                  pj_atomic_get(transport->ref_cnt));
     118           2 :         if (this_->socket_)
     119           2 :             this_->socket_->shutdown();
     120           2 :         return PJ_SUCCESS;
     121         188 :     };
     122         376 :     base.destroy = [](pjsip_transport* transport) -> pj_status_t {
     123         188 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     124             :             reinterpret_cast<TransportData*>(transport)->self);
     125         188 :         delete this_;
     126         188 :         return PJ_SUCCESS;
     127         188 :     };
     128             : 
     129             :     // Init rdata_
     130         188 :     std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
     131         376 :     rxPool_ = sip_utils::smart_alloc_pool(endpt,
     132             :                                           "channeled.rxPool",
     133             :                                           PJSIP_POOL_RDATA_LEN,
     134         188 :                                           PJSIP_POOL_RDATA_LEN);
     135         188 :     rdata_.tp_info.pool = rxPool_.get();
     136         188 :     rdata_.tp_info.transport = &base;
     137         188 :     rdata_.tp_info.tp_data = this;
     138         188 :     rdata_.tp_info.op_key.rdata = &rdata_;
     139         188 :     pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
     140         188 :     rdata_.pkt_info.src_addr = base.key.rem_addr;
     141         188 :     rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
     142         188 :     auto rem_addr = &base.key.rem_addr;
     143         188 :     pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
     144         188 :     rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
     145             : 
     146             :     // Register callbacks
     147         188 :     if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
     148           0 :         throw std::runtime_error("Unable to register PJSIP transport.");
     149         188 : }
     150             : 
     151             : void
     152         188 : ChanneledSIPTransport::start()
     153             : {
     154             :     // Link to Channel Socket
     155         188 :     socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
     156        1561 :         pj_gettimeofday(&rdata_.pkt_info.timestamp);
     157        1561 :         size_t remaining {len};
     158        3122 :         while (remaining) {
     159             :             // Build rdata
     160        1561 :             size_t added = std::min(remaining,
     161        1561 :                                     (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
     162        1561 :             std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
     163        1561 :             rdata_.pkt_info.len += added;
     164        1561 :             buf += added;
     165        1561 :             remaining -= added;
     166             : 
     167             :             // Consume packet
     168        1561 :             auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
     169        1561 :             if (eaten == rdata_.pkt_info.len) {
     170        1561 :                 rdata_.pkt_info.len = 0;
     171           0 :             } else if (eaten > 0) {
     172           0 :                 memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
     173           0 :                 rdata_.pkt_info.len -= eaten;
     174             :             }
     175        1561 :             pj_pool_reset(rdata_.tp_info.pool);
     176             :         }
     177        1561 :         return len;
     178             :     });
     179         188 :     socket_->onShutdown([this] {
     180         188 :         disconnected_ = true;
     181         188 :         if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
     182             :             //JAMI_LOG("[SIPS] process disconnect event");
     183             :             pjsip_transport_state_info state_info;
     184         188 :             std::memset(&state_info, 0, sizeof(state_info));
     185         188 :             state_info.status = PJ_SUCCESS;
     186         188 :             (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
     187             :         }
     188         188 :         shutdownCb_();
     189         188 :     });
     190         188 : }
     191             : 
     192         376 : ChanneledSIPTransport::~ChanneledSIPTransport()
     193             : {
     194         188 :     auto base = getTransportBase();
     195             : 
     196             :     // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
     197             :     // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
     198         188 :     socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
     199         376 :     socket_->onShutdown([]() {});
     200             :     // Stop low-level transport first
     201         188 :     socket_->shutdown();
     202         188 :     socket_.reset();
     203             : 
     204             :     // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
     205         188 :     if (not base->is_shutdown and not base->is_destroying)
     206           0 :         pjsip_transport_shutdown(base);
     207             : 
     208         188 :     pj_lock_destroy(base->lock);
     209         188 :     pj_atomic_destroy(base->ref_cnt);
     210         564 :     JAMI_LOG("~ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
     211         376 : }
     212             : 
     213             : pj_status_t
     214        1567 : ChanneledSIPTransport::send(pjsip_tx_data* tdata,
     215             :                             const pj_sockaddr_t* rem_addr,
     216             :                             int addr_len,
     217             :                             void*,
     218             :                             pjsip_transport_callback)
     219             : {
     220             :     // Sanity check
     221        1567 :     PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
     222             : 
     223             :     // Check that there's no pending operation associated with the tdata
     224        1567 :     PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
     225             : 
     226             :     // Check the address is supported
     227        1567 :     PJ_ASSERT_RETURN(rem_addr
     228             :                          and (addr_len == sizeof(pj_sockaddr_in)
     229             :                               or addr_len == sizeof(pj_sockaddr_in6)),
     230             :                      PJ_EINVAL);
     231             : 
     232             :     // Check in we are able to send it in synchronous way first
     233        1567 :     std::size_t size = tdata->buf.cur - tdata->buf.start;
     234        1567 :     if (socket_) {
     235        1567 :         std::error_code ec;
     236        1567 :         socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
     237        1567 :         if (!ec)
     238        1561 :             return PJ_SUCCESS;
     239             :     }
     240           6 :     return PJ_EINVAL;
     241             : }
     242             : 
     243             : } // namespace tls
     244             : } // namespace jami

Generated by: LCOV version 1.14