Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "libav_utils.h"
25 : #include "logger.h"
26 : #include "connectivity/security/memory.h"
27 :
28 : #include <dhtnet/ice_socket.h>
29 :
30 : #include <iostream>
31 : #include <string>
32 : #include <algorithm>
33 : #include <iterator>
34 :
35 : extern "C" {
36 : #include "srtp.h"
37 : }
38 :
39 : #include <cstring>
40 : #include <stdexcept>
41 : #include <unistd.h>
42 : #include <sys/types.h>
43 : #include <ciso646> // fix windows compiler bug
44 :
45 : #ifdef _WIN32
46 : #define SOCK_NONBLOCK FIONBIO
47 : #define poll WSAPoll
48 : #define close(x) closesocket(x)
49 : #endif
50 :
51 : #ifdef __ANDROID__
52 : #include <asm-generic/fcntl.h>
53 : #define SOCK_NONBLOCK O_NONBLOCK
54 : #endif
55 :
56 : #ifdef __APPLE__
57 : #include <fcntl.h>
58 : #endif
59 :
60 : // Swap 2 byte, 16 bit values:
61 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
62 :
63 : // Swap 4 byte, 32 bit values:
64 : #define Swap4Bytes(val) \
65 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
66 : | (((val) << 24) & 0xFF000000))
67 :
68 : // Swap 8 byte, 64 bit values:
69 : #define Swap8Bytes(val) \
70 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
71 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
72 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
73 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
74 :
75 : namespace jami {
76 :
77 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
78 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
79 : static constexpr auto UDP_HEADER_SIZE = 8;
80 : static constexpr auto SRTP_OVERHEAD = 10;
81 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
82 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
83 :
84 : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
85 :
86 : class SRTPProtoContext
87 : {
88 : public:
89 326 : SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
90 326 : {
91 326 : jami_secure_memzero(&srtp_out, sizeof(srtp_out));
92 326 : jami_secure_memzero(&srtp_in, sizeof(srtp_in));
93 326 : if (out_suite && out_key) {
94 : // XXX: see srtp_open from libavformat/srtpproto.c
95 326 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
96 0 : srtp_close();
97 0 : throw std::runtime_error("Unable to set crypto on output");
98 : }
99 : }
100 :
101 326 : if (in_suite && in_key) {
102 326 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
103 0 : srtp_close();
104 0 : throw std::runtime_error("Unable to set crypto on input");
105 : }
106 : }
107 326 : }
108 :
109 326 : ~SRTPProtoContext() { srtp_close(); }
110 :
111 : SRTPContext srtp_out {};
112 : SRTPContext srtp_in {};
113 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
114 :
115 : private:
116 326 : void srtp_close() noexcept
117 : {
118 326 : ff_srtp_free(&srtp_out);
119 326 : ff_srtp_free(&srtp_in);
120 326 : }
121 : };
122 :
123 : static int
124 0 : ff_network_wait_fd(int fd)
125 : {
126 0 : struct pollfd p = {fd, POLLOUT, 0};
127 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
128 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
129 : }
130 :
131 : static int
132 16 : udp_socket_create(int family, int port)
133 : {
134 16 : int udp_fd = -1;
135 :
136 : #ifdef __APPLE__
137 : udp_fd = socket(family, SOCK_DGRAM, 0);
138 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
139 : close(udp_fd);
140 : udp_fd = -1;
141 : }
142 : #elif defined _WIN32
143 : udp_fd = socket(family, SOCK_DGRAM, 0);
144 : u_long block = 1;
145 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
146 : close(udp_fd);
147 : udp_fd = -1;
148 : }
149 : #else
150 16 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
151 : #endif
152 :
153 16 : if (udp_fd < 0) {
154 0 : JAMI_ERR("socket() failed");
155 0 : strErr();
156 0 : return -1;
157 : }
158 :
159 16 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
160 16 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
161 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
162 0 : close(udp_fd);
163 0 : return -1;
164 : }
165 :
166 16 : bind_addr.setPort(port);
167 16 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
168 16 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
169 0 : JAMI_ERR("bind() failed");
170 0 : strErr();
171 0 : close(udp_fd);
172 0 : udp_fd = -1;
173 : }
174 :
175 16 : return udp_fd;
176 : }
177 :
178 7 : SocketPair::SocketPair(const char* uri, int localPort)
179 : {
180 8 : openSockets(uri, localPort);
181 8 : }
182 :
183 315 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
184 317 : : rtp_sock_(std::move(rtp_sock))
185 632 : , rtcp_sock_(std::move(rtcp_sock))
186 : {
187 317 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
188 : this,
189 : rtp_sock_->getCompId(),
190 : rtcp_sock_->getCompId());
191 :
192 318 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
193 5791 : std::lock_guard l(dataBuffMutex_);
194 5791 : rtpDataBuff_.emplace_back(buf, buf + len);
195 5791 : cv_.notify_one();
196 5791 : return len;
197 5791 : });
198 318 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
199 269 : std::lock_guard l(dataBuffMutex_);
200 269 : rtcpDataBuff_.emplace_back(buf, buf + len);
201 269 : cv_.notify_one();
202 269 : return len;
203 269 : });
204 318 : }
205 :
206 326 : SocketPair::~SocketPair()
207 : {
208 326 : interrupt();
209 326 : closeSockets();
210 326 : JAMI_DBG("[%p] Instance destroyed", this);
211 326 : }
212 :
213 : bool
214 745 : SocketPair::waitForRTCP(std::chrono::seconds interval)
215 : {
216 745 : std::unique_lock lock(rtcpInfo_mutex_);
217 745 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
218 1991 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
219 1489 : });
220 745 : }
221 :
222 : void
223 10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
224 : {
225 10 : if (len < sizeof(rtcpRRHeader))
226 0 : return;
227 :
228 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
229 10 : if (header->pt != 201) // 201 = RR PT
230 0 : return;
231 :
232 10 : std::lock_guard lock(rtcpInfo_mutex_);
233 :
234 10 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
235 0 : listRtcpRRHeader_.pop_front();
236 : }
237 :
238 10 : listRtcpRRHeader_.emplace_back(*header);
239 :
240 10 : cvRtcpPacketReadyToRead_.notify_one();
241 10 : }
242 :
243 : void
244 182 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
245 : {
246 182 : if (len < sizeof(rtcpREMBHeader))
247 0 : return;
248 :
249 182 : auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
250 182 : if (header->pt != 206) // 206 = REMB PT
251 0 : return;
252 :
253 182 : if (header->uid != 0x424D4552) // uid must be "REMB"
254 0 : return;
255 :
256 182 : std::lock_guard lock(rtcpInfo_mutex_);
257 :
258 182 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
259 0 : listRtcpREMBHeader_.pop_front();
260 : }
261 :
262 182 : listRtcpREMBHeader_.push_back(*header);
263 :
264 182 : cvRtcpPacketReadyToRead_.notify_one();
265 182 : }
266 :
267 : std::list<rtcpRRHeader>
268 745 : SocketPair::getRtcpRR()
269 : {
270 745 : std::lock_guard lock(rtcpInfo_mutex_);
271 1490 : return std::move(listRtcpRRHeader_);
272 745 : }
273 :
274 : std::list<rtcpREMBHeader>
275 382 : SocketPair::getRtcpREMB()
276 : {
277 382 : std::lock_guard lock(rtcpInfo_mutex_);
278 764 : return std::move(listRtcpREMBHeader_);
279 382 : }
280 :
281 : void
282 326 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
283 : {
284 326 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
285 326 : }
286 :
287 : void
288 652 : SocketPair::interrupt()
289 : {
290 652 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
291 652 : interrupted_ = true;
292 652 : if (rtp_sock_)
293 636 : rtp_sock_->setOnRecv(nullptr);
294 652 : if (rtcp_sock_)
295 636 : rtcp_sock_->setOnRecv(nullptr);
296 652 : cv_.notify_all();
297 652 : cvRtcpPacketReadyToRead_.notify_all();
298 652 : }
299 :
300 : void
301 943 : SocketPair::setReadBlockingMode(bool block)
302 : {
303 943 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
304 943 : readBlockingMode_ = block;
305 943 : cv_.notify_all();
306 943 : cvRtcpPacketReadyToRead_.notify_all();
307 943 : }
308 :
309 : void
310 944 : SocketPair::stopSendOp(bool state)
311 : {
312 944 : noWrite_ = state;
313 944 : }
314 :
315 : void
316 326 : SocketPair::closeSockets()
317 : {
318 326 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
319 0 : strErr();
320 326 : if (rtpHandle_ > 0 and close(rtpHandle_))
321 0 : strErr();
322 326 : }
323 :
324 : void
325 8 : SocketPair::openSockets(const char* uri, int local_rtp_port)
326 : {
327 8 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
328 :
329 : char hostname[256];
330 : char path[1024];
331 : int dst_rtp_port;
332 :
333 8 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
334 :
335 8 : const int local_rtcp_port = local_rtp_port + 1;
336 8 : const int dst_rtcp_port = dst_rtp_port + 1;
337 :
338 8 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
339 8 : rtpDestAddr_.setPort(dst_rtp_port);
340 8 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
341 8 : rtcpDestAddr_.setPort(dst_rtcp_port);
342 :
343 : // Open local sockets (RTP/RTCP)
344 8 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
345 8 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
346 0 : closeSockets();
347 0 : JAMI_ERR("[%p] Sockets creation failed", this);
348 0 : throw std::runtime_error("Sockets creation failed");
349 : }
350 :
351 8 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
352 : local_rtp_port,
353 : local_rtcp_port,
354 : hostname,
355 : dst_rtp_port,
356 : dst_rtcp_port);
357 8 : }
358 :
359 : MediaIOHandle*
360 646 : SocketPair::createIOContext(const uint16_t mtu)
361 : {
362 : unsigned ip_header_size;
363 646 : if (rtp_sock_)
364 630 : ip_header_size = rtp_sock_->getTransportOverhead();
365 16 : else if (rtpDestAddr_.getFamily() == AF_INET6)
366 0 : ip_header_size = 40;
367 : else
368 16 : ip_header_size = 20;
369 : return new MediaIOHandle(
370 646 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
371 : true,
372 6486 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
373 5887 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
374 : 0,
375 646 : reinterpret_cast<void*>(this));
376 : }
377 :
378 : int
379 6486 : SocketPair::waitForData()
380 : {
381 : // System sockets
382 6486 : if (rtpHandle_ >= 0) {
383 : int ret;
384 : do {
385 87 : if (interrupted_) {
386 4 : errno = EINTR;
387 6 : return -1;
388 : }
389 :
390 83 : if (not readBlockingMode_) {
391 2 : return 0;
392 : }
393 :
394 : // work with system socket
395 81 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
396 81 : ret = poll(p, 2, NET_POLL_TIMEOUT);
397 81 : if (ret > 0) {
398 0 : ret = 0;
399 0 : if (p[0].revents & POLLIN)
400 0 : ret |= static_cast<int>(DataType::RTP);
401 0 : if (p[1].revents & POLLIN)
402 0 : ret |= static_cast<int>(DataType::RTCP);
403 : }
404 81 : } while (!ret or (ret < 0 and errno == EAGAIN));
405 :
406 0 : return ret;
407 : }
408 :
409 : // work with IceSocket
410 : {
411 6480 : std::unique_lock lk(dataBuffMutex_);
412 6480 : cv_.wait(lk, [this] {
413 12807 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
414 : });
415 6480 : }
416 :
417 6480 : if (interrupted_) {
418 28 : errno = EINTR;
419 28 : return -1;
420 : }
421 :
422 6452 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
423 : }
424 :
425 : int
426 6184 : SocketPair::readRtpData(void* buf, int buf_size)
427 : {
428 : // handle system socket
429 6184 : if (rtpHandle_ >= 0) {
430 : struct sockaddr_storage from;
431 0 : socklen_t from_len = sizeof(from);
432 0 : return recvfrom(rtpHandle_,
433 : static_cast<char*>(buf),
434 : buf_size,
435 : 0,
436 : reinterpret_cast<struct sockaddr*>(&from),
437 0 : &from_len);
438 : }
439 :
440 : // handle ICE
441 6184 : std::unique_lock lk(dataBuffMutex_);
442 6184 : if (not rtpDataBuff_.empty()) {
443 5790 : auto pkt = std::move(rtpDataBuff_.front());
444 5790 : rtpDataBuff_.pop_front();
445 5790 : lk.unlock(); // to not block our ICE callbacks
446 5790 : int pkt_size = pkt.size();
447 5790 : int len = std::min(pkt_size, buf_size);
448 5790 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
449 5790 : return len;
450 5790 : }
451 :
452 393 : return 0;
453 6183 : }
454 :
455 : int
456 6452 : SocketPair::readRtcpData(void* buf, int buf_size)
457 : {
458 : // handle system socket
459 6452 : if (rtcpHandle_ >= 0) {
460 : struct sockaddr_storage from;
461 0 : socklen_t from_len = sizeof(from);
462 0 : return recvfrom(rtcpHandle_,
463 : static_cast<char*>(buf),
464 : buf_size,
465 : 0,
466 : reinterpret_cast<struct sockaddr*>(&from),
467 0 : &from_len);
468 : }
469 :
470 : // handle ICE
471 6452 : std::unique_lock lk(dataBuffMutex_);
472 6452 : if (not rtcpDataBuff_.empty()) {
473 268 : auto pkt = std::move(rtcpDataBuff_.front());
474 268 : rtcpDataBuff_.pop_front();
475 268 : lk.unlock();
476 268 : int pkt_size = pkt.size();
477 268 : int len = std::min(pkt_size, buf_size);
478 268 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
479 268 : return len;
480 268 : }
481 :
482 6184 : return 0;
483 6452 : }
484 :
485 : int
486 6486 : SocketPair::readCallback(uint8_t* buf, int buf_size)
487 : {
488 6486 : auto datatype = waitForData();
489 6486 : if (datatype < 0)
490 32 : return datatype;
491 :
492 6454 : int len = 0;
493 6454 : bool fromRTCP = false;
494 :
495 6454 : if (datatype & static_cast<int>(DataType::RTCP)) {
496 6452 : len = readRtcpData(buf, buf_size);
497 6452 : if (len > 0) {
498 268 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
499 : // 201 = RR PT
500 268 : if (header->pt == 201) {
501 10 : lastDLSR_ = Swap4Bytes(header->dlsr);
502 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
503 10 : lastRR_time = std::chrono::steady_clock::now();
504 10 : saveRtcpRRPacket(buf, len);
505 : }
506 : // 206 = REMB PT
507 258 : else if (header->pt == 206)
508 182 : saveRtcpREMBPacket(buf, len);
509 : // 200 = SR PT
510 76 : else if (header->pt == 200) {
511 : // not used yet
512 : } else {
513 0 : JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
514 : }
515 268 : fromRTCP = true;
516 : }
517 : }
518 :
519 : // No RTCP… attempt RTP
520 6454 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
521 6184 : len = readRtpData(buf, buf_size);
522 6183 : fromRTCP = false;
523 : }
524 :
525 6453 : if (len <= 0)
526 395 : return len;
527 :
528 6058 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
529 0 : return len;
530 :
531 : // SRTP decrypt
532 6058 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
533 5790 : int32_t gradient = 0;
534 5790 : int32_t deltaT = 0;
535 5790 : float abs = 0.0f;
536 5790 : bool res_parse = false;
537 5790 : bool res_delay = false;
538 :
539 5790 : res_parse = parse_RTP_ext(buf, &abs);
540 5790 : bool marker = (buf[1] & 0x80) >> 7;
541 :
542 5790 : if (res_parse)
543 5790 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
544 :
545 : // rtpDelayCallback_ is not set for audio
546 5790 : if (rtpDelayCallback_ and res_delay)
547 5694 : rtpDelayCallback_(gradient, deltaT);
548 :
549 5790 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
550 5790 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
551 0 : packetLossCallback_();
552 5790 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
553 5790 : if (err < 0)
554 0 : JAMI_WARN("decrypt error %d", err);
555 : }
556 :
557 6058 : if (len != 0)
558 6058 : return len;
559 : else
560 0 : return AVERROR_EOF;
561 : }
562 :
563 : int
564 6069 : SocketPair::writeData(uint8_t* buf, int buf_size)
565 : {
566 6069 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
567 :
568 : // System sockets?
569 6069 : if (rtpHandle_ >= 0) {
570 : int fd;
571 : dhtnet::IpAddr* dest_addr;
572 :
573 0 : if (isRTCP) {
574 0 : fd = rtcpHandle_;
575 0 : dest_addr = &rtcpDestAddr_;
576 : } else {
577 0 : fd = rtpHandle_;
578 0 : dest_addr = &rtpDestAddr_;
579 : }
580 :
581 0 : auto ret = ff_network_wait_fd(fd);
582 0 : if (ret < 0)
583 0 : return ret;
584 :
585 0 : if (noWrite_)
586 0 : return buf_size;
587 0 : return ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength());
588 : }
589 :
590 6069 : if (noWrite_)
591 0 : return buf_size;
592 :
593 : // IceSocket
594 6069 : if (isRTCP)
595 269 : return rtcp_sock_->send(buf, buf_size);
596 : else
597 5800 : return rtp_sock_->send(buf, buf_size);
598 : }
599 :
600 : int
601 5887 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
602 : {
603 5887 : if (noWrite_)
604 0 : return 0;
605 :
606 : int ret;
607 5887 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
608 : unsigned int ts_LSB, ts_MSB;
609 : double currentSRTS, currentLatency;
610 :
611 : // Encrypt?
612 5887 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
613 5800 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
614 : buf,
615 : buf_size,
616 5800 : srtpContext_->encryptbuf,
617 : sizeof(srtpContext_->encryptbuf));
618 5800 : if (buf_size < 0) {
619 0 : JAMI_WARN("encrypt error %d", buf_size);
620 0 : return buf_size;
621 : }
622 :
623 5800 : buf = srtpContext_->encryptbuf;
624 : }
625 :
626 : // check if we're sending an RR, if so, detect packet loss
627 : // buf_size gives length of buffer, not just header
628 5887 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
629 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
630 10 : rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
631 : }
632 :
633 : do {
634 5887 : if (interrupted_)
635 0 : return -EINTR;
636 5887 : ret = writeData(buf, buf_size);
637 5887 : } while (ret < 0 and errno == EAGAIN);
638 :
639 5887 : if (buf[1] == 200) // Sender Report
640 : {
641 77 : auto header = reinterpret_cast<rtcpSRHeader*>(buf);
642 77 : ts_LSB = Swap4Bytes(header->timestampLSB);
643 77 : ts_MSB = Swap4Bytes(header->timestampMSB);
644 :
645 77 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
646 :
647 77 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
648 14 : if (histoLatency_.size() >= MAX_LIST_SIZE)
649 0 : histoLatency_.pop_front();
650 :
651 14 : currentLatency = (currentSRTS - lastSRTS_) / 2;
652 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
653 14 : histoLatency_.push_back(currentLatency);
654 : }
655 :
656 77 : lastSRTS_ = currentSRTS;
657 :
658 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
659 :
660 5810 : } else if (buf[1] == 201) // Receiver Report
661 : {
662 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
663 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
664 : }
665 :
666 5887 : return ret < 0 ? -errno : ret;
667 : }
668 :
669 : double
670 10 : SocketPair::getLastLatency()
671 : {
672 10 : if (not histoLatency_.empty())
673 0 : return histoLatency_.back();
674 : else
675 10 : return -1;
676 : }
677 :
678 : void
679 146 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
680 : {
681 146 : rtpDelayCallback_ = std::move(cb);
682 146 : }
683 :
684 : bool
685 5790 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
686 : {
687 : // Keep only last packet of each frame
688 5790 : if (not marker) {
689 49 : return 0;
690 : }
691 :
692 : // 1st frame
693 5741 : if (not lastSendTS_) {
694 47 : lastSendTS_ = sendTS;
695 47 : lastReceiveTS_ = std::chrono::steady_clock::now();
696 47 : return 0;
697 : }
698 :
699 5694 : int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
700 5694 : if (deltaS < 0)
701 1 : deltaS += 64000;
702 5694 : lastSendTS_ = sendTS;
703 :
704 5694 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
705 5694 : auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count();
706 5694 : lastReceiveTS_ = arrival_TS;
707 :
708 5694 : *gradient = deltaR - deltaS;
709 5694 : *deltaT = deltaR;
710 :
711 5694 : return true;
712 : }
713 :
714 : bool
715 5790 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
716 : {
717 5790 : if (not(buf[0] & 0x10))
718 0 : return false;
719 :
720 5790 : uint16_t magic_word = (buf[12] << 8) + buf[13];
721 5790 : if (magic_word != 0xBEDE)
722 0 : return false;
723 :
724 5790 : uint8_t sec = buf[17] >> 2;
725 5790 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
726 5790 : float milli = fract / pow(2, 32);
727 :
728 5790 : *abs = sec + (milli);
729 5790 : return true;
730 : }
731 :
732 : uint16_t
733 147 : SocketPair::lastSeqValOut()
734 : {
735 147 : if (srtpContext_)
736 147 : return srtpContext_->srtp_out.seq_largest;
737 0 : JAMI_ERR("SRTP context not found.");
738 0 : return 0;
739 : }
740 :
741 : } // namespace jami
|