LCOV - code coverage report
Current view: top level - src/jamidht - channeled_transport.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 118 129 91.5 %
Date: 2024-04-24 08:04:06 Functions: 15 16 93.8 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2020-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
       5             :  *
       6             :  *  This program is free software; you can redistribute it and/or modify
       7             :  *  it under the terms of the GNU General Public License as published by
       8             :  *  the Free Software Foundation; either version 3 of the License, or
       9             :  *  (at your option) any later version.
      10             :  *
      11             :  *  This program is distributed in the hope that it will be useful,
      12             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  *  GNU General Public License for more details.
      15             :  *
      16             :  *  You should have received a copy of the GNU General Public License
      17             :  *  along with this program; if not, write to the Free Software
      18             :  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "channeled_transport.h"
      22             : 
      23             : #include "logger.h"
      24             : #include <dhtnet/multiplexed_socket.h>
      25             : #include "connectivity/sip_utils.h"
      26             : 
      27             : #include <pjsip/sip_transport.h>
      28             : #include <pjsip/sip_endpoint.h>
      29             : #include <pj/compat/socket.h>
      30             : #include <pj/lock.h>
      31             : 
      32             : namespace jami {
      33             : namespace tls {
      34             : 
      35        1312 : ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
      36             :                                              const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      37        1312 :                                              onShutdownCb&& cb)
      38        1310 :     : socket_(socket)
      39        1311 :     , shutdownCb_(std::move(cb))
      40        1307 :     , trData_()
      41        1308 :     , pool_ {nullptr, pj_pool_release}
      42        3931 :     , rxPool_(nullptr, pj_pool_release)
      43             : {
      44        1311 :     local_ = socket->getLocalAddress();
      45        1312 :     remote_ = socket->getRemoteAddress();
      46        1312 :     int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
      47             : 
      48        1311 :     JAMI_DBG("ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
      49             : 
      50             :     // Init memory
      51        1312 :     trData_.self = this; // up-link for PJSIP callbacks
      52             : 
      53        2624 :     pool_ = sip_utils::smart_alloc_pool(endpt,
      54             :                                         "channeled.pool",
      55             :                                         sip_utils::POOL_TP_INIT,
      56        1312 :                                         sip_utils::POOL_TP_INC);
      57             : 
      58        1312 :     auto& base = trData_.base;
      59        1312 :     std::memset(&base, 0, sizeof(base));
      60             : 
      61        1312 :     pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
      62        1312 :     base.endpt = endpt;
      63        1312 :     base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
      64        1312 :     base.pool = pool_.get();
      65             : 
      66        1312 :     if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
      67           0 :         throw std::runtime_error("Can't create PJSIP atomic.");
      68             : 
      69        1312 :     if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
      70           0 :         throw std::runtime_error("Can't create PJSIP mutex.");
      71             : 
      72        1312 :     if (not local_) {
      73           0 :         JAMI_ERR("Invalid local address");
      74           0 :         throw std::runtime_error("Invalid local address");
      75             :     }
      76        1312 :     if (not remote_) {
      77           0 :         JAMI_ERR("Invalid remote address");
      78           0 :         throw std::runtime_error("Invalid remote address");
      79             :     }
      80             : 
      81        1312 :     pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
      82        1312 :     base.key.type = tp_type;
      83        1312 :     auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
      84        1312 :     base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
      85        1312 :     base.flag = pjsip_transport_get_flag_from_type(reg_type);
      86        1312 :     base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
      87             : 
      88        1312 :     auto remote_addr = remote_.toString();
      89        1312 :     pj_ansi_snprintf(base.info,
      90             :                      sip_utils::TRANSPORT_INFO_LENGTH,
      91             :                      "%s to %s",
      92             :                      base.type_name,
      93             :                      remote_addr.c_str());
      94        1312 :     base.addr_len = remote_.getLength();
      95        1312 :     base.dir = PJSIP_TP_DIR_NONE;
      96             : 
      97             :     // Set initial local address
      98        1312 :     pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
      99             : 
     100        1312 :     sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
     101        1312 :     sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
     102             : 
     103             :     // Init transport callbacks
     104       66939 :     base.send_msg = [](pjsip_transport* transport,
     105             :                        pjsip_tx_data* tdata,
     106             :                        const pj_sockaddr_t* rem_addr,
     107             :                        int addr_len,
     108             :                        void* token,
     109             :                        pjsip_transport_callback callback) -> pj_status_t {
     110       65627 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     111             :             reinterpret_cast<TransportData*>(transport)->self);
     112       65627 :         return this_->send(tdata, rem_addr, addr_len, token, callback);
     113        1312 :     };
     114        1314 :     base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
     115           2 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     116             :             reinterpret_cast<TransportData*>(transport)->self);
     117           6 :         JAMI_DEBUG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
     118             :                  fmt::ptr(this_),
     119             :                  fmt::ptr(transport),
     120             :                  pj_atomic_get(transport->ref_cnt));
     121           2 :         if (this_->socket_)
     122           2 :             this_->socket_->shutdown();
     123           2 :         return PJ_SUCCESS;
     124        1312 :     };
     125        2624 :     base.destroy = [](pjsip_transport* transport) -> pj_status_t {
     126        1312 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     127             :             reinterpret_cast<TransportData*>(transport)->self);
     128        3936 :         JAMI_DEBUG("ChanneledSIPTransport@{}: destroying", fmt::ptr(this_));
     129        1312 :         delete this_;
     130        1312 :         return PJ_SUCCESS;
     131        1312 :     };
     132             : 
     133             :     // Init rdata_
     134        1312 :     std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
     135        2624 :     rxPool_ = sip_utils::smart_alloc_pool(endpt,
     136             :                                           "channeled.rxPool",
     137             :                                           PJSIP_POOL_RDATA_LEN,
     138        1312 :                                           PJSIP_POOL_RDATA_LEN);
     139        1312 :     rdata_.tp_info.pool = rxPool_.get();
     140        1312 :     rdata_.tp_info.transport = &base;
     141        1312 :     rdata_.tp_info.tp_data = this;
     142        1312 :     rdata_.tp_info.op_key.rdata = &rdata_;
     143        1312 :     pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
     144        1312 :     rdata_.pkt_info.src_addr = base.key.rem_addr;
     145        1312 :     rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
     146        1312 :     auto rem_addr = &base.key.rem_addr;
     147        1312 :     pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
     148        1312 :     rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
     149             : 
     150             :     // Register callbacks
     151        1312 :     if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
     152           0 :         throw std::runtime_error("Can't register PJSIP transport.");
     153        1312 : }
     154             : 
     155             : void
     156        1312 : ChanneledSIPTransport::start()
     157             : {
     158             :     // Link to Channel Socket
     159        1312 :     socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
     160       65569 :         pj_gettimeofday(&rdata_.pkt_info.timestamp);
     161       65526 :         size_t remaining {len};
     162      131137 :         while (remaining) {
     163             :             // Build rdata
     164       65539 :             size_t added = std::min(remaining,
     165       65539 :                                     (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
     166       65553 :             std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
     167       65603 :             rdata_.pkt_info.len += added;
     168       65603 :             buf += added;
     169       65603 :             remaining -= added;
     170             : 
     171             :             // Consume packet
     172       65603 :             auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
     173       65604 :             if (eaten == rdata_.pkt_info.len) {
     174       65605 :                 rdata_.pkt_info.len = 0;
     175           0 :             } else if (eaten > 0) {
     176           0 :                 memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
     177           0 :                 rdata_.pkt_info.len -= eaten;
     178             :             }
     179       65604 :             pj_pool_reset(rdata_.tp_info.pool);
     180             :         }
     181       65598 :         return len;
     182             :     });
     183        1312 :     socket_->onShutdown([this] {
     184        1312 :         disconnected_ = true;
     185        1312 :         if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
     186        1312 :             JAMI_WARN("[SIPS] process disconnect event");
     187             :             pjsip_transport_state_info state_info;
     188        1312 :             std::memset(&state_info, 0, sizeof(state_info));
     189        1312 :             state_info.status = PJ_SUCCESS;
     190        1312 :             (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
     191             :         }
     192        1312 :         shutdownCb_();
     193        1312 :     });
     194        1312 : }
     195             : 
     196        2624 : ChanneledSIPTransport::~ChanneledSIPTransport()
     197             : {
     198        1312 :     JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
     199        1312 :     auto base = getTransportBase();
     200             : 
     201             :     // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
     202             :     // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
     203        1312 :     socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
     204        2624 :     socket_->onShutdown([]() {});
     205             :     // Stop low-level transport first
     206        1312 :     socket_->shutdown();
     207        1312 :     socket_.reset();
     208             : 
     209             :     // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
     210        1312 :     if (not base->is_shutdown and not base->is_destroying)
     211           0 :         pjsip_transport_shutdown(base);
     212             : 
     213        1312 :     pj_lock_destroy(base->lock);
     214        1312 :     pj_atomic_destroy(base->ref_cnt);
     215        1312 :     JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p} bye", this, &trData_.base);
     216        2624 : }
     217             : 
     218             : pj_status_t
     219       65625 : ChanneledSIPTransport::send(pjsip_tx_data* tdata,
     220             :                             const pj_sockaddr_t* rem_addr,
     221             :                             int addr_len,
     222             :                             void*,
     223             :                             pjsip_transport_callback)
     224             : {
     225             :     // Sanity check
     226       65625 :     PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
     227             : 
     228             :     // Check that there's no pending operation associated with the tdata
     229       65625 :     PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
     230             : 
     231             :     // Check the address is supported
     232       65625 :     PJ_ASSERT_RETURN(rem_addr
     233             :                          and (addr_len == sizeof(pj_sockaddr_in)
     234             :                               or addr_len == sizeof(pj_sockaddr_in6)),
     235             :                      PJ_EINVAL);
     236             : 
     237             :     // Check in we are able to send it in synchronous way first
     238       65624 :     std::size_t size = tdata->buf.cur - tdata->buf.start;
     239       65624 :     if (socket_) {
     240       65609 :         std::error_code ec;
     241       65605 :         socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
     242       65624 :         if (!ec)
     243       65605 :             return PJ_SUCCESS;
     244             :     }
     245          11 :     return PJ_EINVAL;
     246             : }
     247             : 
     248             : } // namespace tls
     249             : } // namespace jami

Generated by: LCOV version 1.14