Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "logger.h"
25 : #include "connectivity/security/memory.h"
26 :
27 : #include <dhtnet/ice_socket.h>
28 :
29 : #include <string>
30 : #include <algorithm>
31 :
32 : extern "C" {
33 : #include "srtp.h"
34 : }
35 :
36 : #include <cstring>
37 : #include <stdexcept>
38 : #include <unistd.h>
39 : #include <sys/types.h>
40 :
41 : #ifdef _WIN32
42 : #define SOCK_NONBLOCK FIONBIO
43 : #define poll WSAPoll
44 : #define close(x) closesocket(x)
45 : #endif
46 :
47 : #ifdef __ANDROID__
48 : #include <asm-generic/fcntl.h>
49 : #define SOCK_NONBLOCK O_NONBLOCK
50 : #endif
51 :
52 : #ifdef __APPLE__
53 : #include <fcntl.h>
54 : #endif
55 :
56 : // Swap 2 byte, 16 bit values:
57 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
58 :
59 : // Swap 4 byte, 32 bit values:
60 : #define Swap4Bytes(val) \
61 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
62 : | (((val) << 24) & 0xFF000000))
63 :
64 : // Swap 8 byte, 64 bit values:
65 : #define Swap8Bytes(val) \
66 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
67 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
68 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
69 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
70 :
71 : namespace jami {
72 :
73 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
74 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
75 : static constexpr auto UDP_HEADER_SIZE = 8;
76 : static constexpr auto SRTP_OVERHEAD = 10;
77 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
78 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
79 :
80 : enum class DataType : uint8_t { RTP = 1 << 0, RTCP = 1 << 1 };
81 :
82 : class SRTPProtoContext
83 : {
84 : public:
85 316 : SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
86 316 : {
87 316 : jami_secure_memzero(&srtp_out, sizeof(srtp_out));
88 316 : jami_secure_memzero(&srtp_in, sizeof(srtp_in));
89 315 : if (out_suite && out_key) {
90 : // XXX: see srtp_open from libavformat/srtpproto.c
91 315 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
92 0 : srtp_close();
93 0 : throw std::runtime_error("Unable to set crypto on output");
94 : }
95 : }
96 :
97 316 : if (in_suite && in_key) {
98 316 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
99 0 : srtp_close();
100 0 : throw std::runtime_error("Unable to set crypto on input");
101 : }
102 : }
103 316 : }
104 :
105 316 : ~SRTPProtoContext() { srtp_close(); }
106 :
107 : SRTPContext srtp_out {};
108 : SRTPContext srtp_in {};
109 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
110 :
111 : private:
112 316 : void srtp_close() noexcept
113 : {
114 316 : ff_srtp_free(&srtp_out);
115 316 : ff_srtp_free(&srtp_in);
116 316 : }
117 : };
118 :
119 : static int
120 0 : ff_network_wait_fd(int fd)
121 : {
122 0 : struct pollfd p = {fd, POLLOUT, 0};
123 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
124 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
125 : }
126 :
127 : static int
128 12 : udp_socket_create(int family, int port)
129 : {
130 12 : int udp_fd = -1;
131 :
132 : #ifdef __APPLE__
133 : udp_fd = socket(family, SOCK_DGRAM, 0);
134 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
135 : close(udp_fd);
136 : udp_fd = -1;
137 : }
138 : #elif defined _WIN32
139 : udp_fd = socket(family, SOCK_DGRAM, 0);
140 : u_long block = 1;
141 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
142 : close(udp_fd);
143 : udp_fd = -1;
144 : }
145 : #else
146 12 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
147 : #endif
148 :
149 12 : if (udp_fd < 0) {
150 0 : JAMI_ERR("socket() failed");
151 0 : strErr();
152 0 : return -1;
153 : }
154 :
155 12 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
156 12 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
157 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
158 0 : close(udp_fd);
159 0 : return -1;
160 : }
161 :
162 12 : bind_addr.setPort(port);
163 12 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
164 12 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
165 0 : JAMI_ERR("bind() failed");
166 0 : strErr();
167 0 : close(udp_fd);
168 0 : udp_fd = -1;
169 : }
170 :
171 12 : return udp_fd;
172 : }
173 :
174 6 : SocketPair::SocketPair(const char* uri, int localPort)
175 : {
176 6 : openSockets(uri, localPort);
177 6 : }
178 :
179 308 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
180 308 : : rtp_sock_(std::move(rtp_sock))
181 618 : , rtcp_sock_(std::move(rtcp_sock))
182 : {
183 309 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
184 : this,
185 : rtp_sock_->getCompId(),
186 : rtcp_sock_->getCompId());
187 :
188 310 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
189 5146 : std::lock_guard l(dataBuffMutex_);
190 5146 : rtpDataBuff_.emplace_back(buf, buf + len);
191 5146 : cv_.notify_one();
192 5146 : return len;
193 5146 : });
194 310 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
195 240 : std::lock_guard l(dataBuffMutex_);
196 240 : rtcpDataBuff_.emplace_back(buf, buf + len);
197 240 : cv_.notify_one();
198 240 : return len;
199 240 : });
200 309 : }
201 :
202 316 : SocketPair::~SocketPair()
203 : {
204 316 : interrupt();
205 316 : closeSockets();
206 316 : JAMI_DBG("[%p] Instance destroyed", this);
207 316 : }
208 :
209 : bool
210 621 : SocketPair::waitForRTCP(std::chrono::seconds interval)
211 : {
212 621 : std::unique_lock lock(rtcpInfo_mutex_);
213 621 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
214 1888 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
215 1242 : });
216 621 : }
217 :
218 : void
219 9 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
220 : {
221 9 : if (len < sizeof(rtcpRRHeader))
222 0 : return;
223 :
224 9 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
225 9 : if (header->pt != 201) // 201 = RR PT
226 0 : return;
227 :
228 9 : std::lock_guard lock(rtcpInfo_mutex_);
229 :
230 9 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
231 0 : listRtcpRRHeader_.pop_front();
232 : }
233 :
234 9 : listRtcpRRHeader_.emplace_back(*header);
235 :
236 9 : cvRtcpPacketReadyToRead_.notify_one();
237 9 : }
238 :
239 : void
240 159 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
241 : {
242 159 : if (len < sizeof(rtcpREMBHeader))
243 0 : return;
244 :
245 159 : auto* header = reinterpret_cast<rtcpREMBHeader*>(buf);
246 159 : if (header->pt != 206) // 206 = REMB PT
247 0 : return;
248 :
249 159 : if (header->uid != 0x424D4552) // uid must be "REMB"
250 0 : return;
251 :
252 159 : std::lock_guard lock(rtcpInfo_mutex_);
253 :
254 159 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
255 0 : listRtcpREMBHeader_.pop_front();
256 : }
257 :
258 159 : listRtcpREMBHeader_.push_back(*header);
259 :
260 159 : cvRtcpPacketReadyToRead_.notify_one();
261 159 : }
262 :
263 : std::list<rtcpRRHeader>
264 621 : SocketPair::getRtcpRR()
265 : {
266 621 : std::lock_guard lock(rtcpInfo_mutex_);
267 1242 : return std::move(listRtcpRRHeader_);
268 621 : }
269 :
270 : std::list<rtcpREMBHeader>
271 348 : SocketPair::getRtcpREMB()
272 : {
273 348 : std::lock_guard lock(rtcpInfo_mutex_);
274 696 : return std::move(listRtcpREMBHeader_);
275 348 : }
276 :
277 : void
278 316 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
279 : {
280 316 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
281 316 : }
282 :
283 : void
284 632 : SocketPair::interrupt()
285 : {
286 632 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
287 632 : interrupted_ = true;
288 632 : if (rtp_sock_)
289 620 : rtp_sock_->setOnRecv(nullptr);
290 632 : if (rtcp_sock_)
291 620 : rtcp_sock_->setOnRecv(nullptr);
292 632 : cv_.notify_all();
293 632 : cvRtcpPacketReadyToRead_.notify_all();
294 632 : }
295 :
296 : void
297 910 : SocketPair::setReadBlockingMode(bool block)
298 : {
299 910 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
300 910 : readBlockingMode_ = block;
301 910 : cv_.notify_all();
302 910 : cvRtcpPacketReadyToRead_.notify_all();
303 910 : }
304 :
305 : void
306 908 : SocketPair::stopSendOp(bool state)
307 : {
308 908 : noWrite_ = state;
309 908 : }
310 :
311 : void
312 316 : SocketPair::closeSockets()
313 : {
314 316 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
315 0 : strErr();
316 316 : if (rtpHandle_ > 0 and close(rtpHandle_))
317 0 : strErr();
318 316 : }
319 :
320 : void
321 5 : SocketPair::openSockets(const char* uri, int local_rtp_port)
322 : {
323 5 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
324 :
325 : char hostname[256];
326 : char path[1024];
327 : int dst_rtp_port;
328 :
329 6 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
330 :
331 6 : const int local_rtcp_port = local_rtp_port + 1;
332 6 : const int dst_rtcp_port = dst_rtp_port + 1;
333 :
334 6 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
335 6 : rtpDestAddr_.setPort(dst_rtp_port);
336 6 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
337 6 : rtcpDestAddr_.setPort(dst_rtcp_port);
338 :
339 : // Open local sockets (RTP/RTCP)
340 6 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
341 6 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
342 0 : closeSockets();
343 0 : JAMI_ERR("[%p] Sockets creation failed", this);
344 0 : throw std::runtime_error("Sockets creation failed");
345 : }
346 :
347 6 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
348 : local_rtp_port,
349 : local_rtcp_port,
350 : hostname,
351 : dst_rtp_port,
352 : dst_rtcp_port);
353 6 : }
354 :
355 : MediaIOHandle*
356 623 : SocketPair::createIOContext(const uint16_t mtu)
357 : {
358 : unsigned ip_header_size;
359 623 : if (rtp_sock_)
360 611 : ip_header_size = rtp_sock_->getTransportOverhead();
361 12 : else if (rtpDestAddr_.getFamily() == AF_INET6)
362 0 : ip_header_size = 40;
363 : else
364 12 : ip_header_size = 20;
365 : return new MediaIOHandle(
366 623 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
367 : true,
368 5806 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
369 5232 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
370 : 0,
371 623 : reinterpret_cast<void*>(this));
372 : }
373 :
374 : int
375 5806 : SocketPair::waitForData()
376 : {
377 : // System sockets
378 5806 : if (rtpHandle_ >= 0) {
379 : int ret;
380 : do {
381 86 : if (interrupted_) {
382 4 : errno = EINTR;
383 5 : return -1;
384 : }
385 :
386 82 : if (not readBlockingMode_) {
387 1 : return 0;
388 : }
389 :
390 : // work with system socket
391 81 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
392 81 : ret = poll(p, 2, NET_POLL_TIMEOUT);
393 81 : if (ret > 0) {
394 0 : ret = 0;
395 0 : if (p[0].revents & POLLIN)
396 0 : ret |= static_cast<int>(DataType::RTP);
397 0 : if (p[1].revents & POLLIN)
398 0 : ret |= static_cast<int>(DataType::RTCP);
399 : }
400 81 : } while (!ret or (ret < 0 and errno == EAGAIN));
401 :
402 0 : return ret;
403 : }
404 :
405 : // work with IceSocket
406 : {
407 5801 : std::unique_lock lk(dataBuffMutex_);
408 5801 : cv_.wait(lk, [this] {
409 11449 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
410 : });
411 5801 : }
412 :
413 5801 : if (interrupted_) {
414 3 : errno = EINTR;
415 3 : return -1;
416 : }
417 :
418 5798 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
419 : }
420 :
421 : int
422 5558 : SocketPair::readRtpData(void* buf, int buf_size)
423 : {
424 : // handle system socket
425 5558 : if (rtpHandle_ >= 0) {
426 : struct sockaddr_storage from;
427 0 : socklen_t from_len = sizeof(from);
428 0 : return static_cast<int>(recvfrom(rtpHandle_,
429 : static_cast<char*>(buf),
430 : buf_size,
431 : 0,
432 : reinterpret_cast<struct sockaddr*>(&from),
433 0 : &from_len));
434 : }
435 :
436 : // handle ICE
437 5558 : std::unique_lock lk(dataBuffMutex_);
438 5558 : if (not rtpDataBuff_.empty()) {
439 5143 : auto pkt = std::move(rtpDataBuff_.front());
440 5143 : rtpDataBuff_.pop_front();
441 5143 : lk.unlock(); // to not block our ICE callbacks
442 5143 : int pkt_size = static_cast<int>(pkt.size());
443 5143 : int len = std::min(pkt_size, buf_size);
444 5143 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
445 5143 : return len;
446 5143 : }
447 :
448 415 : return 0;
449 5558 : }
450 :
451 : int
452 5798 : SocketPair::readRtcpData(void* buf, int buf_size)
453 : {
454 : // handle system socket
455 5798 : if (rtcpHandle_ >= 0) {
456 : struct sockaddr_storage from;
457 0 : socklen_t from_len = sizeof(from);
458 0 : return static_cast<int>(recvfrom(rtcpHandle_,
459 : static_cast<char*>(buf),
460 : buf_size,
461 : 0,
462 : reinterpret_cast<struct sockaddr*>(&from),
463 0 : &from_len));
464 : }
465 :
466 : // handle ICE
467 5798 : std::unique_lock lk(dataBuffMutex_);
468 5798 : if (not rtcpDataBuff_.empty()) {
469 240 : auto pkt = std::move(rtcpDataBuff_.front());
470 240 : rtcpDataBuff_.pop_front();
471 240 : lk.unlock();
472 240 : int pkt_size = static_cast<int>(pkt.size());
473 240 : int len = std::min(pkt_size, buf_size);
474 240 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
475 240 : return len;
476 240 : }
477 :
478 5558 : return 0;
479 5798 : }
480 :
481 : int
482 5806 : SocketPair::readCallback(uint8_t* buf, int buf_size)
483 : {
484 5806 : auto datatype = waitForData();
485 5806 : if (datatype < 0)
486 7 : return datatype;
487 :
488 5799 : int len = 0;
489 5799 : bool fromRTCP = false;
490 :
491 5799 : if (datatype & static_cast<int>(DataType::RTCP)) {
492 5798 : len = readRtcpData(buf, buf_size);
493 5798 : if (len > 0) {
494 240 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
495 : // 201 = RR PT
496 240 : if (header->pt == 201) {
497 9 : lastDLSR_ = Swap4Bytes(header->dlsr);
498 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
499 9 : lastRR_time = std::chrono::steady_clock::now();
500 9 : saveRtcpRRPacket(buf, len);
501 : }
502 : // 206 = REMB PT
503 231 : else if (header->pt == 206)
504 159 : saveRtcpREMBPacket(buf, len);
505 : // 200 = SR PT
506 72 : else if (header->pt == 200) {
507 : // not used yet
508 : } else {
509 0 : JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
510 : }
511 240 : fromRTCP = true;
512 : }
513 : }
514 :
515 : // No RTCP… attempt RTP
516 5799 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
517 5558 : len = readRtpData(buf, buf_size);
518 5558 : fromRTCP = false;
519 : }
520 :
521 5799 : if (len <= 0)
522 416 : return len;
523 :
524 5383 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
525 0 : return len;
526 :
527 : // SRTP decrypt
528 5383 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
529 5143 : int32_t gradient = 0;
530 5143 : int32_t deltaT = 0;
531 5143 : float abs = 0.0f;
532 5143 : bool res_parse = false;
533 5143 : bool res_delay = false;
534 :
535 5143 : res_parse = parse_RTP_ext(buf, &abs);
536 5143 : bool marker = (buf[1] & 0x80) >> 7;
537 :
538 5143 : if (res_parse)
539 5143 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
540 :
541 : // rtpDelayCallback_ is not set for audio
542 5143 : if (rtpDelayCallback_ and res_delay)
543 5051 : rtpDelayCallback_(gradient, deltaT);
544 :
545 5143 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
546 5143 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
547 0 : packetLossCallback_();
548 5143 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
549 5143 : if (err < 0)
550 0 : JAMI_WARN("decrypt error %d", err);
551 : }
552 :
553 5383 : if (len != 0)
554 5383 : return len;
555 : else
556 0 : return AVERROR_EOF;
557 : }
558 :
559 : int
560 5391 : SocketPair::writeData(uint8_t* buf, int buf_size)
561 : {
562 5391 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
563 :
564 : // System sockets?
565 5391 : if (rtpHandle_ >= 0) {
566 : int fd;
567 : dhtnet::IpAddr* dest_addr;
568 :
569 0 : if (isRTCP) {
570 0 : fd = rtcpHandle_;
571 0 : dest_addr = &rtcpDestAddr_;
572 : } else {
573 0 : fd = rtpHandle_;
574 0 : dest_addr = &rtpDestAddr_;
575 : }
576 :
577 0 : auto ret = ff_network_wait_fd(fd);
578 0 : if (ret < 0)
579 0 : return ret;
580 :
581 0 : if (noWrite_)
582 0 : return buf_size;
583 : return static_cast<int>(
584 0 : ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength()));
585 : }
586 :
587 5391 : if (noWrite_)
588 0 : return buf_size;
589 :
590 : // IceSocket
591 5391 : if (isRTCP)
592 240 : return static_cast<int>(rtcp_sock_->send(buf, buf_size));
593 : else
594 5151 : return static_cast<int>(rtp_sock_->send(buf, buf_size));
595 : }
596 :
597 : int
598 5232 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
599 : {
600 5232 : if (noWrite_)
601 0 : return 0;
602 :
603 : int ret;
604 5232 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
605 : unsigned int ts_LSB, ts_MSB;
606 : double currentSRTS, currentLatency;
607 :
608 : // Encrypt?
609 5232 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
610 5151 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
611 : buf,
612 : buf_size,
613 5151 : srtpContext_->encryptbuf,
614 : sizeof(srtpContext_->encryptbuf));
615 5151 : if (buf_size < 0) {
616 0 : JAMI_WARN("encrypt error %d", buf_size);
617 0 : return buf_size;
618 : }
619 :
620 5151 : buf = srtpContext_->encryptbuf;
621 : }
622 :
623 : // check if we're sending an RR, if so, detect packet loss
624 : // buf_size gives length of buffer, not just header
625 5232 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
626 9 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
627 9 : rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
628 : }
629 :
630 : do {
631 5232 : if (interrupted_)
632 0 : return -EINTR;
633 5232 : ret = writeData(buf, buf_size);
634 5232 : } while (ret < 0 and errno == EAGAIN);
635 :
636 5232 : if (buf[1] == 200) // Sender Report
637 : {
638 72 : auto* header = reinterpret_cast<rtcpSRHeader*>(buf);
639 72 : ts_LSB = Swap4Bytes(header->timestampLSB);
640 72 : ts_MSB = Swap4Bytes(header->timestampMSB);
641 :
642 72 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
643 :
644 72 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
645 14 : if (histoLatency_.size() >= MAX_LIST_SIZE)
646 0 : histoLatency_.pop_front();
647 :
648 14 : currentLatency = (currentSRTS - lastSRTS_) / 2;
649 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
650 14 : histoLatency_.push_back(currentLatency);
651 : }
652 :
653 72 : lastSRTS_ = currentSRTS;
654 :
655 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
656 :
657 5160 : } else if (buf[1] == 201) // Receiver Report
658 : {
659 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
660 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
661 : }
662 :
663 5232 : return ret < 0 ? -errno : ret;
664 : }
665 :
666 : double
667 9 : SocketPair::getLastLatency()
668 : {
669 9 : if (not histoLatency_.empty())
670 1 : return histoLatency_.back();
671 : else
672 8 : return -1;
673 : }
674 :
675 : void
676 141 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
677 : {
678 141 : rtpDelayCallback_ = std::move(cb);
679 141 : }
680 :
681 : bool
682 5143 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
683 : {
684 : // Keep only last packet of each frame
685 5143 : if (not marker) {
686 47 : return 0;
687 : }
688 :
689 : // 1st frame
690 5096 : if (lastSendTS_ == 0.0f) {
691 45 : lastSendTS_ = sendTS;
692 45 : lastReceiveTS_ = std::chrono::steady_clock::now();
693 45 : return 0;
694 : }
695 :
696 5051 : int32_t deltaS = static_cast<int32_t>((sendTS - lastSendTS_) * 1000); // milliseconds
697 5051 : if (deltaS < 0)
698 3 : deltaS += 64000;
699 5051 : lastSendTS_ = sendTS;
700 :
701 5051 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
702 : auto deltaR = static_cast<int32_t>(
703 5051 : std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count());
704 5051 : lastReceiveTS_ = arrival_TS;
705 :
706 5051 : *gradient = deltaR - deltaS;
707 5051 : *deltaT = deltaR;
708 :
709 5051 : return true;
710 : }
711 :
712 : bool
713 5143 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
714 : {
715 5143 : if (not(buf[0] & 0x10))
716 0 : return false;
717 :
718 5143 : uint16_t magic_word = (buf[12] << 8) + buf[13];
719 5143 : if (magic_word != 0xBEDE)
720 0 : return false;
721 :
722 5143 : uint8_t sec = buf[17] >> 2;
723 5143 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
724 5143 : float milli = static_cast<float>(fract / pow(2, 32));
725 :
726 5143 : *abs = static_cast<float>(sec) + (milli);
727 5143 : return true;
728 : }
729 :
730 : uint16_t
731 139 : SocketPair::lastSeqValOut()
732 : {
733 139 : if (srtpContext_)
734 139 : return srtpContext_->srtp_out.seq_largest;
735 0 : JAMI_ERR("SRTP context not found.");
736 0 : return 0;
737 : }
738 :
739 : } // namespace jami
|