Line data Source code
1 : /*
2 : * Copyright (C) 2020-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "channeled_transport.h"
22 :
23 : #include "logger.h"
24 : #include <dhtnet/multiplexed_socket.h>
25 : #include "connectivity/sip_utils.h"
26 :
27 : #include <pjsip/sip_transport.h>
28 : #include <pjsip/sip_endpoint.h>
29 : #include <pj/compat/socket.h>
30 : #include <pj/lock.h>
31 :
32 : namespace jami {
33 : namespace tls {
34 :
35 1312 : ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
36 : const std::shared_ptr<dhtnet::ChannelSocket>& socket,
37 1312 : onShutdownCb&& cb)
38 1310 : : socket_(socket)
39 1311 : , shutdownCb_(std::move(cb))
40 1307 : , trData_()
41 1308 : , pool_ {nullptr, pj_pool_release}
42 3931 : , rxPool_(nullptr, pj_pool_release)
43 : {
44 1311 : local_ = socket->getLocalAddress();
45 1312 : remote_ = socket->getRemoteAddress();
46 1312 : int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
47 :
48 1311 : JAMI_DBG("ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
49 :
50 : // Init memory
51 1312 : trData_.self = this; // up-link for PJSIP callbacks
52 :
53 2624 : pool_ = sip_utils::smart_alloc_pool(endpt,
54 : "channeled.pool",
55 : sip_utils::POOL_TP_INIT,
56 1312 : sip_utils::POOL_TP_INC);
57 :
58 1312 : auto& base = trData_.base;
59 1312 : std::memset(&base, 0, sizeof(base));
60 :
61 1312 : pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
62 1312 : base.endpt = endpt;
63 1312 : base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
64 1312 : base.pool = pool_.get();
65 :
66 1312 : if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
67 0 : throw std::runtime_error("Can't create PJSIP atomic.");
68 :
69 1312 : if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
70 0 : throw std::runtime_error("Can't create PJSIP mutex.");
71 :
72 1312 : if (not local_) {
73 0 : JAMI_ERR("Invalid local address");
74 0 : throw std::runtime_error("Invalid local address");
75 : }
76 1312 : if (not remote_) {
77 0 : JAMI_ERR("Invalid remote address");
78 0 : throw std::runtime_error("Invalid remote address");
79 : }
80 :
81 1312 : pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
82 1312 : base.key.type = tp_type;
83 1312 : auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
84 1312 : base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
85 1312 : base.flag = pjsip_transport_get_flag_from_type(reg_type);
86 1312 : base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
87 :
88 1312 : auto remote_addr = remote_.toString();
89 1312 : pj_ansi_snprintf(base.info,
90 : sip_utils::TRANSPORT_INFO_LENGTH,
91 : "%s to %s",
92 : base.type_name,
93 : remote_addr.c_str());
94 1312 : base.addr_len = remote_.getLength();
95 1312 : base.dir = PJSIP_TP_DIR_NONE;
96 :
97 : // Set initial local address
98 1312 : pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
99 :
100 1312 : sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
101 1312 : sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
102 :
103 : // Init transport callbacks
104 66939 : base.send_msg = [](pjsip_transport* transport,
105 : pjsip_tx_data* tdata,
106 : const pj_sockaddr_t* rem_addr,
107 : int addr_len,
108 : void* token,
109 : pjsip_transport_callback callback) -> pj_status_t {
110 65627 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
111 : reinterpret_cast<TransportData*>(transport)->self);
112 65627 : return this_->send(tdata, rem_addr, addr_len, token, callback);
113 1312 : };
114 1314 : base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
115 2 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
116 : reinterpret_cast<TransportData*>(transport)->self);
117 6 : JAMI_DEBUG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
118 : fmt::ptr(this_),
119 : fmt::ptr(transport),
120 : pj_atomic_get(transport->ref_cnt));
121 2 : if (this_->socket_)
122 2 : this_->socket_->shutdown();
123 2 : return PJ_SUCCESS;
124 1312 : };
125 2624 : base.destroy = [](pjsip_transport* transport) -> pj_status_t {
126 1312 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
127 : reinterpret_cast<TransportData*>(transport)->self);
128 3936 : JAMI_DEBUG("ChanneledSIPTransport@{}: destroying", fmt::ptr(this_));
129 1312 : delete this_;
130 1312 : return PJ_SUCCESS;
131 1312 : };
132 :
133 : // Init rdata_
134 1312 : std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
135 2624 : rxPool_ = sip_utils::smart_alloc_pool(endpt,
136 : "channeled.rxPool",
137 : PJSIP_POOL_RDATA_LEN,
138 1312 : PJSIP_POOL_RDATA_LEN);
139 1312 : rdata_.tp_info.pool = rxPool_.get();
140 1312 : rdata_.tp_info.transport = &base;
141 1312 : rdata_.tp_info.tp_data = this;
142 1312 : rdata_.tp_info.op_key.rdata = &rdata_;
143 1312 : pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
144 1312 : rdata_.pkt_info.src_addr = base.key.rem_addr;
145 1312 : rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
146 1312 : auto rem_addr = &base.key.rem_addr;
147 1312 : pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
148 1312 : rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
149 :
150 : // Register callbacks
151 1312 : if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
152 0 : throw std::runtime_error("Can't register PJSIP transport.");
153 1312 : }
154 :
155 : void
156 1312 : ChanneledSIPTransport::start()
157 : {
158 : // Link to Channel Socket
159 1312 : socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
160 65569 : pj_gettimeofday(&rdata_.pkt_info.timestamp);
161 65526 : size_t remaining {len};
162 131137 : while (remaining) {
163 : // Build rdata
164 65539 : size_t added = std::min(remaining,
165 65539 : (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
166 65553 : std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
167 65603 : rdata_.pkt_info.len += added;
168 65603 : buf += added;
169 65603 : remaining -= added;
170 :
171 : // Consume packet
172 65603 : auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
173 65604 : if (eaten == rdata_.pkt_info.len) {
174 65605 : rdata_.pkt_info.len = 0;
175 0 : } else if (eaten > 0) {
176 0 : memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
177 0 : rdata_.pkt_info.len -= eaten;
178 : }
179 65604 : pj_pool_reset(rdata_.tp_info.pool);
180 : }
181 65598 : return len;
182 : });
183 1312 : socket_->onShutdown([this] {
184 1312 : disconnected_ = true;
185 1312 : if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
186 1312 : JAMI_WARN("[SIPS] process disconnect event");
187 : pjsip_transport_state_info state_info;
188 1312 : std::memset(&state_info, 0, sizeof(state_info));
189 1312 : state_info.status = PJ_SUCCESS;
190 1312 : (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
191 : }
192 1312 : shutdownCb_();
193 1312 : });
194 1312 : }
195 :
196 2624 : ChanneledSIPTransport::~ChanneledSIPTransport()
197 : {
198 1312 : JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
199 1312 : auto base = getTransportBase();
200 :
201 : // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
202 : // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
203 1312 : socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
204 2624 : socket_->onShutdown([]() {});
205 : // Stop low-level transport first
206 1312 : socket_->shutdown();
207 1312 : socket_.reset();
208 :
209 : // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
210 1312 : if (not base->is_shutdown and not base->is_destroying)
211 0 : pjsip_transport_shutdown(base);
212 :
213 1312 : pj_lock_destroy(base->lock);
214 1312 : pj_atomic_destroy(base->ref_cnt);
215 1312 : JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p} bye", this, &trData_.base);
216 2624 : }
217 :
218 : pj_status_t
219 65625 : ChanneledSIPTransport::send(pjsip_tx_data* tdata,
220 : const pj_sockaddr_t* rem_addr,
221 : int addr_len,
222 : void*,
223 : pjsip_transport_callback)
224 : {
225 : // Sanity check
226 65625 : PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
227 :
228 : // Check that there's no pending operation associated with the tdata
229 65625 : PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
230 :
231 : // Check the address is supported
232 65625 : PJ_ASSERT_RETURN(rem_addr
233 : and (addr_len == sizeof(pj_sockaddr_in)
234 : or addr_len == sizeof(pj_sockaddr_in6)),
235 : PJ_EINVAL);
236 :
237 : // Check in we are able to send it in synchronous way first
238 65624 : std::size_t size = tdata->buf.cur - tdata->buf.start;
239 65624 : if (socket_) {
240 65609 : std::error_code ec;
241 65605 : socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
242 65624 : if (!ec)
243 65605 : return PJ_SUCCESS;
244 : }
245 11 : return PJ_EINVAL;
246 : }
247 :
248 : } // namespace tls
249 : } // namespace jami
|