Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "channeled_transport.h"
19 :
20 : #include "logger.h"
21 : #include <dhtnet/multiplexed_socket.h>
22 : #include "connectivity/sip_utils.h"
23 :
24 : #include <pjsip/sip_transport.h>
25 : #include <pjsip/sip_endpoint.h>
26 : #include <pj/compat/socket.h>
27 : #include <pj/lock.h>
28 :
29 : namespace jami {
30 : namespace tls {
31 :
32 176 : ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
33 : const std::shared_ptr<dhtnet::ChannelSocket>& socket,
34 176 : onShutdownCb&& cb)
35 176 : : socket_(socket)
36 176 : , shutdownCb_(std::move(cb))
37 176 : , trData_()
38 176 : , pool_ {nullptr}
39 528 : , rxPool_(nullptr)
40 : {
41 176 : local_ = socket->getLocalAddress();
42 176 : remote_ = socket->getRemoteAddress();
43 176 : int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
44 :
45 704 : JAMI_LOG("ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
46 :
47 : // Init memory
48 176 : trData_.self = this; // up-link for PJSIP callbacks
49 :
50 176 : pool_ = sip_utils::smart_alloc_pool(endpt, "channeled.pool", sip_utils::POOL_TP_INIT, sip_utils::POOL_TP_INC);
51 :
52 176 : auto& base = trData_.base;
53 176 : std::memset(&base, 0, sizeof(base));
54 :
55 176 : pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
56 176 : base.endpt = endpt;
57 176 : base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
58 176 : base.pool = pool_.get();
59 :
60 176 : if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
61 0 : throw std::runtime_error("Unable to create PJSIP atomic.");
62 :
63 176 : if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
64 0 : throw std::runtime_error("Unable to create PJSIP mutex.");
65 :
66 176 : if (not local_) {
67 0 : JAMI_ERROR("Invalid local address");
68 0 : throw std::runtime_error("Invalid local address");
69 : }
70 176 : if (not remote_) {
71 0 : JAMI_ERROR("Invalid remote address");
72 0 : throw std::runtime_error("Invalid remote address");
73 : }
74 :
75 176 : pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
76 176 : base.key.type = tp_type;
77 176 : auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
78 176 : base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
79 176 : base.flag = pjsip_transport_get_flag_from_type(reg_type);
80 176 : base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
81 :
82 176 : auto remote_addr = remote_.toString();
83 176 : pj_ansi_snprintf(base.info, sip_utils::TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name, remote_addr.c_str());
84 176 : base.addr_len = remote_.getLength();
85 176 : base.dir = PJSIP_TP_DIR_NONE;
86 :
87 : // Set initial local address
88 176 : pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
89 :
90 176 : sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
91 176 : sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
92 :
93 : // Init transport callbacks
94 1739 : base.send_msg = [](pjsip_transport* transport,
95 : pjsip_tx_data* tdata,
96 : const pj_sockaddr_t* rem_addr,
97 : int addr_len,
98 : void* token,
99 : pjsip_transport_callback callback) -> pj_status_t {
100 1563 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(reinterpret_cast<TransportData*>(transport)->self);
101 1563 : return this_->send(tdata, rem_addr, addr_len, token, callback);
102 176 : };
103 178 : base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
104 2 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(reinterpret_cast<TransportData*>(transport)->self);
105 8 : JAMI_LOG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
106 : fmt::ptr(this_),
107 : fmt::ptr(transport),
108 : pj_atomic_get(transport->ref_cnt));
109 2 : if (this_->socket_)
110 2 : this_->socket_->shutdown();
111 2 : return PJ_SUCCESS;
112 176 : };
113 352 : base.destroy = [](pjsip_transport* transport) -> pj_status_t {
114 176 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(reinterpret_cast<TransportData*>(transport)->self);
115 176 : delete this_;
116 176 : return PJ_SUCCESS;
117 176 : };
118 :
119 : // Init rdata_
120 176 : std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
121 176 : rxPool_ = sip_utils::smart_alloc_pool(endpt, "channeled.rxPool", PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_LEN);
122 176 : rdata_.tp_info.pool = rxPool_.get();
123 176 : rdata_.tp_info.transport = &base;
124 176 : rdata_.tp_info.tp_data = this;
125 176 : rdata_.tp_info.op_key.rdata = &rdata_;
126 176 : pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
127 176 : rdata_.pkt_info.src_addr = base.key.rem_addr;
128 176 : rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
129 176 : auto rem_addr = &base.key.rem_addr;
130 176 : pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
131 176 : rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
132 :
133 : // Register callbacks
134 176 : if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
135 0 : throw std::runtime_error("Unable to register PJSIP transport.");
136 176 : }
137 :
138 : void
139 176 : ChanneledSIPTransport::start()
140 : {
141 : // Link to Channel Socket
142 176 : socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
143 1558 : pj_gettimeofday(&rdata_.pkt_info.timestamp);
144 1556 : size_t remaining {len};
145 3116 : while (remaining) {
146 : // Build rdata
147 1558 : size_t added = std::min(remaining, (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
148 1558 : std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
149 1558 : rdata_.pkt_info.len += added;
150 1558 : buf += added;
151 1558 : remaining -= added;
152 :
153 : // Consume packet
154 1558 : auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
155 1558 : if (eaten == rdata_.pkt_info.len) {
156 1558 : rdata_.pkt_info.len = 0;
157 0 : } else if (eaten > 0) {
158 0 : memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
159 0 : rdata_.pkt_info.len -= eaten;
160 : }
161 1558 : pj_pool_reset(rdata_.tp_info.pool);
162 : }
163 1558 : return len;
164 : });
165 176 : socket_->onShutdown([this](const std::error_code& ec) {
166 176 : disconnected_ = true;
167 176 : if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
168 : // JAMI_LOG("[SIPS] process disconnect event");
169 : pjsip_transport_state_info state_info;
170 176 : std::memset(&state_info, 0, sizeof(state_info));
171 176 : state_info.status = ec ? PJ_STATUS_FROM_OS(ec.value()) : PJ_SUCCESS;
172 176 : (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
173 : }
174 176 : shutdownCb_();
175 176 : });
176 176 : }
177 :
178 352 : ChanneledSIPTransport::~ChanneledSIPTransport()
179 : {
180 176 : auto base = getTransportBase();
181 :
182 : // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
183 : // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
184 176 : socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
185 352 : socket_->onShutdown([](const std::error_code&) {});
186 : // Stop low-level transport first
187 176 : socket_->shutdown();
188 176 : socket_.reset();
189 :
190 : // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
191 176 : if (not base->is_shutdown and not base->is_destroying)
192 0 : pjsip_transport_shutdown(base);
193 :
194 176 : pj_lock_destroy(base->lock);
195 176 : pj_atomic_destroy(base->ref_cnt);
196 704 : JAMI_LOG("~ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
197 352 : }
198 :
199 : pj_status_t
200 1563 : ChanneledSIPTransport::send(
201 : pjsip_tx_data* tdata, const pj_sockaddr_t* rem_addr, int addr_len, void*, pjsip_transport_callback)
202 : {
203 : // Sanity check
204 1563 : PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
205 :
206 : // Check that there's no pending operation associated with the tdata
207 1563 : PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
208 :
209 : // Check the address is supported
210 1563 : PJ_ASSERT_RETURN(rem_addr and (addr_len == sizeof(pj_sockaddr_in) or addr_len == sizeof(pj_sockaddr_in6)),
211 : PJ_EINVAL);
212 :
213 : // Check in we are able to send it in synchronous way first
214 1563 : std::size_t size = tdata->buf.cur - tdata->buf.start;
215 1563 : if (socket_) {
216 1562 : std::error_code ec;
217 1562 : socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
218 1564 : if (!ec)
219 1558 : return PJ_SUCCESS;
220 : }
221 6 : return PJ_EINVAL;
222 : }
223 :
224 : } // namespace tls
225 : } // namespace jami
|