LCOV - code coverage report
Current view: top level - src/jamidht - channeled_transport.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 118 129 91.5 %
Date: 2024-11-13 09:04:27 Functions: 15 16 93.8 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "channeled_transport.h"
      19             : 
      20             : #include "logger.h"
      21             : #include <dhtnet/multiplexed_socket.h>
      22             : #include "connectivity/sip_utils.h"
      23             : 
      24             : #include <pjsip/sip_transport.h>
      25             : #include <pjsip/sip_endpoint.h>
      26             : #include <pj/compat/socket.h>
      27             : #include <pj/lock.h>
      28             : 
      29             : namespace jami {
      30             : namespace tls {
      31             : 
      32        1359 : ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
      33             :                                              const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      34        1359 :                                              onShutdownCb&& cb)
      35        1359 :     : socket_(socket)
      36        1359 :     , shutdownCb_(std::move(cb))
      37        1359 :     , trData_()
      38        1359 :     , pool_ {nullptr, pj_pool_release}
      39        4077 :     , rxPool_(nullptr, pj_pool_release)
      40             : {
      41        1359 :     local_ = socket->getLocalAddress();
      42        1359 :     remote_ = socket->getRemoteAddress();
      43        1359 :     int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
      44             : 
      45        1359 :     JAMI_DBG("ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
      46             : 
      47             :     // Init memory
      48        1359 :     trData_.self = this; // up-link for PJSIP callbacks
      49             : 
      50        2718 :     pool_ = sip_utils::smart_alloc_pool(endpt,
      51             :                                         "channeled.pool",
      52             :                                         sip_utils::POOL_TP_INIT,
      53        1359 :                                         sip_utils::POOL_TP_INC);
      54             : 
      55        1359 :     auto& base = trData_.base;
      56        1359 :     std::memset(&base, 0, sizeof(base));
      57             : 
      58        1359 :     pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
      59        1359 :     base.endpt = endpt;
      60        1359 :     base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
      61        1359 :     base.pool = pool_.get();
      62             : 
      63        1359 :     if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
      64           0 :         throw std::runtime_error("Unable to create PJSIP atomic.");
      65             : 
      66        1359 :     if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
      67           0 :         throw std::runtime_error("Unable to create PJSIP mutex.");
      68             : 
      69        1359 :     if (not local_) {
      70           0 :         JAMI_ERR("Invalid local address");
      71           0 :         throw std::runtime_error("Invalid local address");
      72             :     }
      73        1359 :     if (not remote_) {
      74           0 :         JAMI_ERR("Invalid remote address");
      75           0 :         throw std::runtime_error("Invalid remote address");
      76             :     }
      77             : 
      78        1359 :     pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
      79        1359 :     base.key.type = tp_type;
      80        1359 :     auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
      81        1359 :     base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
      82        1359 :     base.flag = pjsip_transport_get_flag_from_type(reg_type);
      83        1359 :     base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
      84             : 
      85        1359 :     auto remote_addr = remote_.toString();
      86        1359 :     pj_ansi_snprintf(base.info,
      87             :                      sip_utils::TRANSPORT_INFO_LENGTH,
      88             :                      "%s to %s",
      89             :                      base.type_name,
      90             :                      remote_addr.c_str());
      91        1359 :     base.addr_len = remote_.getLength();
      92        1359 :     base.dir = PJSIP_TP_DIR_NONE;
      93             : 
      94             :     // Set initial local address
      95        1359 :     pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
      96             : 
      97        1359 :     sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
      98        1359 :     sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
      99             : 
     100             :     // Init transport callbacks
     101        3586 :     base.send_msg = [](pjsip_transport* transport,
     102             :                        pjsip_tx_data* tdata,
     103             :                        const pj_sockaddr_t* rem_addr,
     104             :                        int addr_len,
     105             :                        void* token,
     106             :                        pjsip_transport_callback callback) -> pj_status_t {
     107        2227 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     108             :             reinterpret_cast<TransportData*>(transport)->self);
     109        2227 :         return this_->send(tdata, rem_addr, addr_len, token, callback);
     110        1359 :     };
     111        1361 :     base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
     112           2 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     113             :             reinterpret_cast<TransportData*>(transport)->self);
     114           6 :         JAMI_DEBUG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
     115             :                  fmt::ptr(this_),
     116             :                  fmt::ptr(transport),
     117             :                  pj_atomic_get(transport->ref_cnt));
     118           2 :         if (this_->socket_)
     119           2 :             this_->socket_->shutdown();
     120           2 :         return PJ_SUCCESS;
     121        1359 :     };
     122        2718 :     base.destroy = [](pjsip_transport* transport) -> pj_status_t {
     123        1359 :         auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
     124             :             reinterpret_cast<TransportData*>(transport)->self);
     125        4077 :         JAMI_DEBUG("ChanneledSIPTransport@{}: destroying", fmt::ptr(this_));
     126        1359 :         delete this_;
     127        1359 :         return PJ_SUCCESS;
     128        1359 :     };
     129             : 
     130             :     // Init rdata_
     131        1359 :     std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
     132        2718 :     rxPool_ = sip_utils::smart_alloc_pool(endpt,
     133             :                                           "channeled.rxPool",
     134             :                                           PJSIP_POOL_RDATA_LEN,
     135        1359 :                                           PJSIP_POOL_RDATA_LEN);
     136        1359 :     rdata_.tp_info.pool = rxPool_.get();
     137        1359 :     rdata_.tp_info.transport = &base;
     138        1359 :     rdata_.tp_info.tp_data = this;
     139        1359 :     rdata_.tp_info.op_key.rdata = &rdata_;
     140        1359 :     pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
     141        1359 :     rdata_.pkt_info.src_addr = base.key.rem_addr;
     142        1359 :     rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
     143        1359 :     auto rem_addr = &base.key.rem_addr;
     144        1359 :     pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
     145        1359 :     rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
     146             : 
     147             :     // Register callbacks
     148        1359 :     if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
     149           0 :         throw std::runtime_error("Unable to register PJSIP transport.");
     150        1359 : }
     151             : 
     152             : void
     153        1359 : ChanneledSIPTransport::start()
     154             : {
     155             :     // Link to Channel Socket
     156        1359 :     socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
     157        2210 :         pj_gettimeofday(&rdata_.pkt_info.timestamp);
     158        2205 :         size_t remaining {len};
     159        4418 :         while (remaining) {
     160             :             // Build rdata
     161        2207 :             size_t added = std::min(remaining,
     162        2207 :                                     (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
     163        2210 :             std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
     164        2211 :             rdata_.pkt_info.len += added;
     165        2211 :             buf += added;
     166        2211 :             remaining -= added;
     167             : 
     168             :             // Consume packet
     169        2211 :             auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
     170        2211 :             if (eaten == rdata_.pkt_info.len) {
     171        2211 :                 rdata_.pkt_info.len = 0;
     172           0 :             } else if (eaten > 0) {
     173           0 :                 memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
     174           0 :                 rdata_.pkt_info.len -= eaten;
     175             :             }
     176        2211 :             pj_pool_reset(rdata_.tp_info.pool);
     177             :         }
     178        2211 :         return len;
     179             :     });
     180        1359 :     socket_->onShutdown([this] {
     181        1359 :         disconnected_ = true;
     182        1359 :         if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
     183        1359 :             JAMI_WARN("[SIPS] process disconnect event");
     184             :             pjsip_transport_state_info state_info;
     185        1359 :             std::memset(&state_info, 0, sizeof(state_info));
     186        1359 :             state_info.status = PJ_SUCCESS;
     187        1359 :             (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
     188             :         }
     189        1359 :         shutdownCb_();
     190        1359 :     });
     191        1359 : }
     192             : 
     193        2718 : ChanneledSIPTransport::~ChanneledSIPTransport()
     194             : {
     195        1359 :     JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
     196        1359 :     auto base = getTransportBase();
     197             : 
     198             :     // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
     199             :     // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
     200        1359 :     socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
     201        2718 :     socket_->onShutdown([]() {});
     202             :     // Stop low-level transport first
     203        1359 :     socket_->shutdown();
     204        1359 :     socket_.reset();
     205             : 
     206             :     // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
     207        1359 :     if (not base->is_shutdown and not base->is_destroying)
     208           0 :         pjsip_transport_shutdown(base);
     209             : 
     210        1359 :     pj_lock_destroy(base->lock);
     211        1359 :     pj_atomic_destroy(base->ref_cnt);
     212        1359 :     JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p} bye", this, &trData_.base);
     213        2718 : }
     214             : 
     215             : pj_status_t
     216        2227 : ChanneledSIPTransport::send(pjsip_tx_data* tdata,
     217             :                             const pj_sockaddr_t* rem_addr,
     218             :                             int addr_len,
     219             :                             void*,
     220             :                             pjsip_transport_callback)
     221             : {
     222             :     // Sanity check
     223        2227 :     PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
     224             : 
     225             :     // Check that there's no pending operation associated with the tdata
     226        2227 :     PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
     227             : 
     228             :     // Check the address is supported
     229        2227 :     PJ_ASSERT_RETURN(rem_addr
     230             :                          and (addr_len == sizeof(pj_sockaddr_in)
     231             :                               or addr_len == sizeof(pj_sockaddr_in6)),
     232             :                      PJ_EINVAL);
     233             : 
     234             :     // Check in we are able to send it in synchronous way first
     235        2224 :     std::size_t size = tdata->buf.cur - tdata->buf.start;
     236        2224 :     if (socket_) {
     237        2225 :         std::error_code ec;
     238        2226 :         socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
     239        2227 :         if (!ec)
     240        2216 :             return PJ_SUCCESS;
     241             :     }
     242          10 :     return PJ_EINVAL;
     243             : }
     244             : 
     245             : } // namespace tls
     246             : } // namespace jami

Generated by: LCOV version 1.14