Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "channeled_transport.h"
19 :
20 : #include "logger.h"
21 : #include <dhtnet/multiplexed_socket.h>
22 : #include "connectivity/sip_utils.h"
23 :
24 : #include <pjsip/sip_transport.h>
25 : #include <pjsip/sip_endpoint.h>
26 : #include <pj/compat/socket.h>
27 : #include <pj/lock.h>
28 :
29 : namespace jami {
30 : namespace tls {
31 :
32 1359 : ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
33 : const std::shared_ptr<dhtnet::ChannelSocket>& socket,
34 1359 : onShutdownCb&& cb)
35 1359 : : socket_(socket)
36 1359 : , shutdownCb_(std::move(cb))
37 1359 : , trData_()
38 1359 : , pool_ {nullptr, pj_pool_release}
39 4077 : , rxPool_(nullptr, pj_pool_release)
40 : {
41 1359 : local_ = socket->getLocalAddress();
42 1359 : remote_ = socket->getRemoteAddress();
43 1359 : int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
44 :
45 1359 : JAMI_DBG("ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
46 :
47 : // Init memory
48 1359 : trData_.self = this; // up-link for PJSIP callbacks
49 :
50 2718 : pool_ = sip_utils::smart_alloc_pool(endpt,
51 : "channeled.pool",
52 : sip_utils::POOL_TP_INIT,
53 1359 : sip_utils::POOL_TP_INC);
54 :
55 1359 : auto& base = trData_.base;
56 1359 : std::memset(&base, 0, sizeof(base));
57 :
58 1359 : pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
59 1359 : base.endpt = endpt;
60 1359 : base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
61 1359 : base.pool = pool_.get();
62 :
63 1359 : if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
64 0 : throw std::runtime_error("Unable to create PJSIP atomic.");
65 :
66 1359 : if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
67 0 : throw std::runtime_error("Unable to create PJSIP mutex.");
68 :
69 1359 : if (not local_) {
70 0 : JAMI_ERR("Invalid local address");
71 0 : throw std::runtime_error("Invalid local address");
72 : }
73 1359 : if (not remote_) {
74 0 : JAMI_ERR("Invalid remote address");
75 0 : throw std::runtime_error("Invalid remote address");
76 : }
77 :
78 1359 : pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
79 1359 : base.key.type = tp_type;
80 1359 : auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
81 1359 : base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
82 1359 : base.flag = pjsip_transport_get_flag_from_type(reg_type);
83 1359 : base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
84 :
85 1359 : auto remote_addr = remote_.toString();
86 1359 : pj_ansi_snprintf(base.info,
87 : sip_utils::TRANSPORT_INFO_LENGTH,
88 : "%s to %s",
89 : base.type_name,
90 : remote_addr.c_str());
91 1359 : base.addr_len = remote_.getLength();
92 1359 : base.dir = PJSIP_TP_DIR_NONE;
93 :
94 : // Set initial local address
95 1359 : pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
96 :
97 1359 : sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
98 1359 : sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
99 :
100 : // Init transport callbacks
101 3586 : base.send_msg = [](pjsip_transport* transport,
102 : pjsip_tx_data* tdata,
103 : const pj_sockaddr_t* rem_addr,
104 : int addr_len,
105 : void* token,
106 : pjsip_transport_callback callback) -> pj_status_t {
107 2227 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
108 : reinterpret_cast<TransportData*>(transport)->self);
109 2227 : return this_->send(tdata, rem_addr, addr_len, token, callback);
110 1359 : };
111 1361 : base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
112 2 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
113 : reinterpret_cast<TransportData*>(transport)->self);
114 6 : JAMI_DEBUG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
115 : fmt::ptr(this_),
116 : fmt::ptr(transport),
117 : pj_atomic_get(transport->ref_cnt));
118 2 : if (this_->socket_)
119 2 : this_->socket_->shutdown();
120 2 : return PJ_SUCCESS;
121 1359 : };
122 2718 : base.destroy = [](pjsip_transport* transport) -> pj_status_t {
123 1359 : auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
124 : reinterpret_cast<TransportData*>(transport)->self);
125 4077 : JAMI_DEBUG("ChanneledSIPTransport@{}: destroying", fmt::ptr(this_));
126 1359 : delete this_;
127 1359 : return PJ_SUCCESS;
128 1359 : };
129 :
130 : // Init rdata_
131 1359 : std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
132 2718 : rxPool_ = sip_utils::smart_alloc_pool(endpt,
133 : "channeled.rxPool",
134 : PJSIP_POOL_RDATA_LEN,
135 1359 : PJSIP_POOL_RDATA_LEN);
136 1359 : rdata_.tp_info.pool = rxPool_.get();
137 1359 : rdata_.tp_info.transport = &base;
138 1359 : rdata_.tp_info.tp_data = this;
139 1359 : rdata_.tp_info.op_key.rdata = &rdata_;
140 1359 : pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
141 1359 : rdata_.pkt_info.src_addr = base.key.rem_addr;
142 1359 : rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
143 1359 : auto rem_addr = &base.key.rem_addr;
144 1359 : pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
145 1359 : rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
146 :
147 : // Register callbacks
148 1359 : if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
149 0 : throw std::runtime_error("Unable to register PJSIP transport.");
150 1359 : }
151 :
152 : void
153 1359 : ChanneledSIPTransport::start()
154 : {
155 : // Link to Channel Socket
156 1359 : socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
157 2210 : pj_gettimeofday(&rdata_.pkt_info.timestamp);
158 2205 : size_t remaining {len};
159 4418 : while (remaining) {
160 : // Build rdata
161 2207 : size_t added = std::min(remaining,
162 2207 : (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
163 2210 : std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
164 2211 : rdata_.pkt_info.len += added;
165 2211 : buf += added;
166 2211 : remaining -= added;
167 :
168 : // Consume packet
169 2211 : auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
170 2211 : if (eaten == rdata_.pkt_info.len) {
171 2211 : rdata_.pkt_info.len = 0;
172 0 : } else if (eaten > 0) {
173 0 : memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
174 0 : rdata_.pkt_info.len -= eaten;
175 : }
176 2211 : pj_pool_reset(rdata_.tp_info.pool);
177 : }
178 2211 : return len;
179 : });
180 1359 : socket_->onShutdown([this] {
181 1359 : disconnected_ = true;
182 1359 : if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
183 1359 : JAMI_WARN("[SIPS] process disconnect event");
184 : pjsip_transport_state_info state_info;
185 1359 : std::memset(&state_info, 0, sizeof(state_info));
186 1359 : state_info.status = PJ_SUCCESS;
187 1359 : (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
188 : }
189 1359 : shutdownCb_();
190 1359 : });
191 1359 : }
192 :
193 2718 : ChanneledSIPTransport::~ChanneledSIPTransport()
194 : {
195 1359 : JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p}", this, &trData_.base);
196 1359 : auto base = getTransportBase();
197 :
198 : // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
199 : // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
200 1359 : socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
201 2718 : socket_->onShutdown([]() {});
202 : // Stop low-level transport first
203 1359 : socket_->shutdown();
204 1359 : socket_.reset();
205 :
206 : // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
207 1359 : if (not base->is_shutdown and not base->is_destroying)
208 0 : pjsip_transport_shutdown(base);
209 :
210 1359 : pj_lock_destroy(base->lock);
211 1359 : pj_atomic_destroy(base->ref_cnt);
212 1359 : JAMI_DBG("~ChanneledSIPTransport@%p {tr=%p} bye", this, &trData_.base);
213 2718 : }
214 :
215 : pj_status_t
216 2227 : ChanneledSIPTransport::send(pjsip_tx_data* tdata,
217 : const pj_sockaddr_t* rem_addr,
218 : int addr_len,
219 : void*,
220 : pjsip_transport_callback)
221 : {
222 : // Sanity check
223 2227 : PJ_ASSERT_RETURN(tdata, PJ_EINVAL);
224 :
225 : // Check that there's no pending operation associated with the tdata
226 2227 : PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
227 :
228 : // Check the address is supported
229 2227 : PJ_ASSERT_RETURN(rem_addr
230 : and (addr_len == sizeof(pj_sockaddr_in)
231 : or addr_len == sizeof(pj_sockaddr_in6)),
232 : PJ_EINVAL);
233 :
234 : // Check in we are able to send it in synchronous way first
235 2224 : std::size_t size = tdata->buf.cur - tdata->buf.start;
236 2224 : if (socket_) {
237 2225 : std::error_code ec;
238 2226 : socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
239 2227 : if (!ec)
240 2216 : return PJ_SUCCESS;
241 : }
242 10 : return PJ_EINVAL;
243 : }
244 :
245 : } // namespace tls
246 : } // namespace jami
|