Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "libav_utils.h"
25 : #include "logger.h"
26 : #include "connectivity/security/memory.h"
27 :
28 : #include <dhtnet/ice_socket.h>
29 :
30 : #include <iostream>
31 : #include <string>
32 : #include <algorithm>
33 : #include <iterator>
34 :
35 : extern "C" {
36 : #include "srtp.h"
37 : }
38 :
39 : #include <cstring>
40 : #include <stdexcept>
41 : #include <unistd.h>
42 : #include <sys/types.h>
43 : #include <ciso646> // fix windows compiler bug
44 :
45 : #ifdef _WIN32
46 : #define SOCK_NONBLOCK FIONBIO
47 : #define poll WSAPoll
48 : #define close(x) closesocket(x)
49 : #endif
50 :
51 : #ifdef __ANDROID__
52 : #include <asm-generic/fcntl.h>
53 : #define SOCK_NONBLOCK O_NONBLOCK
54 : #endif
55 :
56 : #ifdef __APPLE__
57 : #include <fcntl.h>
58 : #endif
59 :
60 : // Swap 2 byte, 16 bit values:
61 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
62 :
63 : // Swap 4 byte, 32 bit values:
64 : #define Swap4Bytes(val) \
65 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
66 : | (((val) << 24) & 0xFF000000))
67 :
68 : // Swap 8 byte, 64 bit values:
69 : #define Swap8Bytes(val) \
70 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
71 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
72 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
73 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
74 :
75 : namespace jami {
76 :
77 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
78 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
79 : static constexpr auto UDP_HEADER_SIZE = 8;
80 : static constexpr auto SRTP_OVERHEAD = 10;
81 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
82 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
83 :
84 : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
85 :
86 : class SRTPProtoContext
87 : {
88 : public:
89 340 : SRTPProtoContext(const char* out_suite,
90 : const char* out_key,
91 : const char* in_suite,
92 : const char* in_key)
93 340 : {
94 340 : jami_secure_memzero(&srtp_out, sizeof(srtp_out));
95 340 : jami_secure_memzero(&srtp_in, sizeof(srtp_in));
96 340 : if (out_suite && out_key) {
97 : // XXX: see srtp_open from libavformat/srtpproto.c
98 340 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
99 0 : srtp_close();
100 0 : throw std::runtime_error("Unable to set crypto on output");
101 : }
102 : }
103 :
104 340 : if (in_suite && in_key) {
105 340 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
106 0 : srtp_close();
107 0 : throw std::runtime_error("Unable to set crypto on input");
108 : }
109 : }
110 340 : }
111 :
112 340 : ~SRTPProtoContext() { srtp_close(); }
113 :
114 : SRTPContext srtp_out {};
115 : SRTPContext srtp_in {};
116 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
117 :
118 : private:
119 340 : void srtp_close() noexcept
120 : {
121 340 : ff_srtp_free(&srtp_out);
122 340 : ff_srtp_free(&srtp_in);
123 340 : }
124 : };
125 :
126 : static int
127 0 : ff_network_wait_fd(int fd)
128 : {
129 0 : struct pollfd p = {fd, POLLOUT, 0};
130 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
131 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
132 : }
133 :
134 : static int
135 32 : udp_socket_create(int family, int port)
136 : {
137 32 : int udp_fd = -1;
138 :
139 : #ifdef __APPLE__
140 : udp_fd = socket(family, SOCK_DGRAM, 0);
141 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
142 : close(udp_fd);
143 : udp_fd = -1;
144 : }
145 : #elif defined _WIN32
146 : udp_fd = socket(family, SOCK_DGRAM, 0);
147 : u_long block = 1;
148 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
149 : close(udp_fd);
150 : udp_fd = -1;
151 : }
152 : #else
153 32 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
154 : #endif
155 :
156 32 : if (udp_fd < 0) {
157 0 : JAMI_ERR("socket() failed");
158 0 : strErr();
159 0 : return -1;
160 : }
161 :
162 32 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
163 32 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
164 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
165 0 : close(udp_fd);
166 0 : return -1;
167 : }
168 :
169 32 : bind_addr.setPort(port);
170 32 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
171 32 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
172 0 : JAMI_ERR("bind() failed");
173 0 : strErr();
174 0 : close(udp_fd);
175 0 : udp_fd = -1;
176 : }
177 :
178 32 : return udp_fd;
179 : }
180 :
181 16 : SocketPair::SocketPair(const char* uri, int localPort)
182 : {
183 16 : openSockets(uri, localPort);
184 16 : }
185 :
186 324 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
187 324 : : rtp_sock_(std::move(rtp_sock))
188 648 : , rtcp_sock_(std::move(rtcp_sock))
189 : {
190 324 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
191 : this,
192 : rtp_sock_->getCompId(),
193 : rtcp_sock_->getCompId());
194 :
195 324 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
196 5638 : std::lock_guard l(dataBuffMutex_);
197 5638 : rtpDataBuff_.emplace_back(buf, buf + len);
198 5638 : cv_.notify_one();
199 5638 : return len;
200 5638 : });
201 324 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
202 265 : std::lock_guard l(dataBuffMutex_);
203 265 : rtcpDataBuff_.emplace_back(buf, buf + len);
204 265 : cv_.notify_one();
205 265 : return len;
206 265 : });
207 324 : }
208 :
209 340 : SocketPair::~SocketPair()
210 : {
211 340 : interrupt();
212 340 : closeSockets();
213 340 : JAMI_DBG("[%p] Instance destroyed", this);
214 340 : }
215 :
216 : bool
217 758 : SocketPair::waitForRTCP(std::chrono::seconds interval)
218 : {
219 758 : std::unique_lock lock(rtcpInfo_mutex_);
220 758 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
221 2072 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
222 1516 : });
223 758 : }
224 :
225 : void
226 10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
227 : {
228 10 : if (len < sizeof(rtcpRRHeader))
229 0 : return;
230 :
231 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
232 10 : if (header->pt != 201) // 201 = RR PT
233 0 : return;
234 :
235 10 : std::lock_guard lock(rtcpInfo_mutex_);
236 :
237 10 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
238 0 : listRtcpRRHeader_.pop_front();
239 : }
240 :
241 10 : listRtcpRRHeader_.emplace_back(*header);
242 :
243 10 : cvRtcpPacketReadyToRead_.notify_one();
244 10 : }
245 :
246 : void
247 180 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
248 : {
249 180 : if (len < sizeof(rtcpREMBHeader))
250 0 : return;
251 :
252 180 : auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
253 180 : if (header->pt != 206) // 206 = REMB PT
254 0 : return;
255 :
256 180 : if (header->uid != 0x424D4552) // uid must be "REMB"
257 0 : return;
258 :
259 180 : std::lock_guard lock(rtcpInfo_mutex_);
260 :
261 180 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
262 0 : listRtcpREMBHeader_.pop_front();
263 : }
264 :
265 180 : listRtcpREMBHeader_.push_back(*header);
266 :
267 180 : cvRtcpPacketReadyToRead_.notify_one();
268 180 : }
269 :
270 : std::list<rtcpRRHeader>
271 758 : SocketPair::getRtcpRR()
272 : {
273 758 : std::lock_guard lock(rtcpInfo_mutex_);
274 1516 : return std::move(listRtcpRRHeader_);
275 758 : }
276 :
277 : std::list<rtcpREMBHeader>
278 417 : SocketPair::getRtcpREMB()
279 : {
280 417 : std::lock_guard lock(rtcpInfo_mutex_);
281 834 : return std::move(listRtcpREMBHeader_);
282 417 : }
283 :
284 : void
285 340 : SocketPair::createSRTP(const char* out_suite,
286 : const char* out_key,
287 : const char* in_suite,
288 : const char* in_key)
289 : {
290 340 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
291 340 : }
292 :
293 : void
294 680 : SocketPair::interrupt()
295 : {
296 680 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
297 680 : interrupted_ = true;
298 680 : if (rtp_sock_)
299 648 : rtp_sock_->setOnRecv(nullptr);
300 680 : if (rtcp_sock_)
301 648 : rtcp_sock_->setOnRecv(nullptr);
302 680 : cv_.notify_all();
303 680 : cvRtcpPacketReadyToRead_.notify_all();
304 680 : }
305 :
306 : void
307 984 : SocketPair::setReadBlockingMode(bool block)
308 : {
309 984 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
310 984 : readBlockingMode_ = block;
311 984 : cv_.notify_all();
312 984 : cvRtcpPacketReadyToRead_.notify_all();
313 984 : }
314 :
315 : void
316 991 : SocketPair::stopSendOp(bool state)
317 : {
318 991 : noWrite_ = state;
319 991 : }
320 :
321 : void
322 340 : SocketPair::closeSockets()
323 : {
324 340 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
325 0 : strErr();
326 340 : if (rtpHandle_ > 0 and close(rtpHandle_))
327 0 : strErr();
328 340 : }
329 :
330 : void
331 16 : SocketPair::openSockets(const char* uri, int local_rtp_port)
332 : {
333 16 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
334 :
335 : char hostname[256];
336 : char path[1024];
337 : int dst_rtp_port;
338 :
339 16 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
340 :
341 16 : const int local_rtcp_port = local_rtp_port + 1;
342 16 : const int dst_rtcp_port = dst_rtp_port + 1;
343 :
344 16 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
345 16 : rtpDestAddr_.setPort(dst_rtp_port);
346 16 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
347 16 : rtcpDestAddr_.setPort(dst_rtcp_port);
348 :
349 : // Open local sockets (RTP/RTCP)
350 16 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
351 16 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
352 0 : closeSockets();
353 0 : JAMI_ERR("[%p] Sockets creation failed", this);
354 0 : throw std::runtime_error("Sockets creation failed");
355 : }
356 :
357 16 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
358 : local_rtp_port,
359 : local_rtcp_port,
360 : hostname,
361 : dst_rtp_port,
362 : dst_rtcp_port);
363 16 : }
364 :
365 : MediaIOHandle*
366 683 : SocketPair::createIOContext(const uint16_t mtu)
367 : {
368 : unsigned ip_header_size;
369 683 : if (rtp_sock_)
370 655 : ip_header_size = rtp_sock_->getTransportOverhead();
371 28 : else if (rtpDestAddr_.getFamily() == AF_INET6)
372 0 : ip_header_size = 40;
373 : else
374 28 : ip_header_size = 20;
375 : return new MediaIOHandle(
376 683 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
377 : true,
378 6360 : [](void* sp, uint8_t* buf, int len) {
379 6360 : return static_cast<SocketPair*>(sp)->readCallback(buf, len);
380 : },
381 5731 : [](void* sp, uint8_t* buf, int len) {
382 5731 : return static_cast<SocketPair*>(sp)->writeCallback(buf, len);
383 : },
384 : 0,
385 683 : reinterpret_cast<void*>(this));
386 : }
387 :
388 : int
389 6360 : SocketPair::waitForData()
390 : {
391 : // System sockets
392 6360 : if (rtpHandle_ >= 0) {
393 : int ret;
394 : do {
395 111 : if (interrupted_) {
396 8 : errno = EINTR;
397 15 : return -1;
398 : }
399 :
400 103 : if (not readBlockingMode_) {
401 7 : return 0;
402 : }
403 :
404 : // work with system socket
405 96 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
406 96 : ret = poll(p, 2, NET_POLL_TIMEOUT);
407 96 : if (ret > 0) {
408 0 : ret = 0;
409 0 : if (p[0].revents & POLLIN)
410 0 : ret |= static_cast<int>(DataType::RTP);
411 0 : if (p[1].revents & POLLIN)
412 0 : ret |= static_cast<int>(DataType::RTCP);
413 : }
414 96 : } while (!ret or (ret < 0 and errno == EAGAIN));
415 :
416 0 : return ret;
417 : }
418 :
419 : // work with IceSocket
420 : {
421 6345 : std::unique_lock lk(dataBuffMutex_);
422 6345 : cv_.wait(lk, [this] {
423 25028 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty()
424 25028 : or not readBlockingMode_;
425 : });
426 6345 : }
427 :
428 6345 : if (interrupted_) {
429 30 : errno = EINTR;
430 30 : return -1;
431 : }
432 :
433 6315 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
434 : }
435 :
436 : int
437 6050 : SocketPair::readRtpData(void* buf, int buf_size)
438 : {
439 : // handle system socket
440 6050 : if (rtpHandle_ >= 0) {
441 : struct sockaddr_storage from;
442 0 : socklen_t from_len = sizeof(from);
443 0 : return recvfrom(rtpHandle_,
444 : static_cast<char*>(buf),
445 : buf_size,
446 : 0,
447 : reinterpret_cast<struct sockaddr*>(&from),
448 0 : &from_len);
449 : }
450 :
451 : // handle ICE
452 6050 : std::unique_lock lk(dataBuffMutex_);
453 6050 : if (not rtpDataBuff_.empty()) {
454 5637 : auto pkt = std::move(rtpDataBuff_.front());
455 5637 : rtpDataBuff_.pop_front();
456 5637 : lk.unlock(); // to not block our ICE callbacks
457 5637 : int pkt_size = pkt.size();
458 5637 : int len = std::min(pkt_size, buf_size);
459 5637 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
460 5637 : return len;
461 5637 : }
462 :
463 413 : return 0;
464 6050 : }
465 :
466 : int
467 6315 : SocketPair::readRtcpData(void* buf, int buf_size)
468 : {
469 : // handle system socket
470 6315 : if (rtcpHandle_ >= 0) {
471 : struct sockaddr_storage from;
472 0 : socklen_t from_len = sizeof(from);
473 0 : return recvfrom(rtcpHandle_,
474 : static_cast<char*>(buf),
475 : buf_size,
476 : 0,
477 : reinterpret_cast<struct sockaddr*>(&from),
478 0 : &from_len);
479 : }
480 :
481 : // handle ICE
482 6315 : std::unique_lock lk(dataBuffMutex_);
483 6315 : if (not rtcpDataBuff_.empty()) {
484 265 : auto pkt = std::move(rtcpDataBuff_.front());
485 265 : rtcpDataBuff_.pop_front();
486 265 : lk.unlock();
487 265 : int pkt_size = pkt.size();
488 265 : int len = std::min(pkt_size, buf_size);
489 265 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
490 265 : return len;
491 265 : }
492 :
493 6050 : return 0;
494 6315 : }
495 :
496 : int
497 6360 : SocketPair::readCallback(uint8_t* buf, int buf_size)
498 : {
499 6360 : auto datatype = waitForData();
500 6360 : if (datatype < 0)
501 38 : return datatype;
502 :
503 6322 : int len = 0;
504 6322 : bool fromRTCP = false;
505 :
506 6322 : if (datatype & static_cast<int>(DataType::RTCP)) {
507 6315 : len = readRtcpData(buf, buf_size);
508 6315 : if (len > 0) {
509 265 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
510 : // 201 = RR PT
511 265 : if (header->pt == 201) {
512 10 : lastDLSR_ = Swap4Bytes(header->dlsr);
513 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
514 10 : lastRR_time = std::chrono::steady_clock::now();
515 10 : saveRtcpRRPacket(buf, len);
516 : }
517 : // 206 = REMB PT
518 255 : else if (header->pt == 206)
519 180 : saveRtcpREMBPacket(buf, len);
520 : // 200 = SR PT
521 75 : else if (header->pt == 200) {
522 : // not used yet
523 : } else {
524 0 : JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
525 : }
526 265 : fromRTCP = true;
527 : }
528 : }
529 :
530 : // No RTCP… attempt RTP
531 6322 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
532 6050 : len = readRtpData(buf, buf_size);
533 6050 : fromRTCP = false;
534 : }
535 :
536 6322 : if (len <= 0)
537 420 : return len;
538 :
539 5902 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
540 0 : return len;
541 :
542 : // SRTP decrypt
543 5902 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
544 5637 : int32_t gradient = 0;
545 5637 : int32_t deltaT = 0;
546 5637 : float abs = 0.0f;
547 5637 : bool res_parse = false;
548 5637 : bool res_delay = false;
549 :
550 5637 : res_parse = parse_RTP_ext(buf, &abs);
551 5637 : bool marker = (buf[1] & 0x80) >> 7;
552 :
553 5637 : if (res_parse)
554 5637 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
555 :
556 : // rtpDelayCallback_ is not set for audio
557 5637 : if (rtpDelayCallback_ and res_delay)
558 5540 : rtpDelayCallback_(gradient, deltaT);
559 :
560 5637 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
561 5637 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
562 0 : packetLossCallback_();
563 5637 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
564 5637 : if (err < 0)
565 0 : JAMI_WARN("decrypt error %d", err);
566 : }
567 :
568 5902 : if (len != 0)
569 5902 : return len;
570 : else
571 0 : return AVERROR_EOF;
572 : }
573 :
574 : int
575 5911 : SocketPair::writeData(uint8_t* buf, int buf_size)
576 : {
577 5911 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
578 :
579 : // System sockets?
580 5911 : if (rtpHandle_ >= 0) {
581 : int fd;
582 : dhtnet::IpAddr* dest_addr;
583 :
584 0 : if (isRTCP) {
585 0 : fd = rtcpHandle_;
586 0 : dest_addr = &rtcpDestAddr_;
587 : } else {
588 0 : fd = rtpHandle_;
589 0 : dest_addr = &rtpDestAddr_;
590 : }
591 :
592 0 : auto ret = ff_network_wait_fd(fd);
593 0 : if (ret < 0)
594 0 : return ret;
595 :
596 0 : if (noWrite_)
597 0 : return buf_size;
598 0 : return ::sendto(fd,
599 : reinterpret_cast<const char*>(buf),
600 : buf_size,
601 : 0,
602 : *dest_addr,
603 0 : dest_addr->getLength());
604 : }
605 :
606 5911 : if (noWrite_)
607 0 : return buf_size;
608 :
609 : // IceSocket
610 5911 : if (isRTCP)
611 265 : return rtcp_sock_->send(buf, buf_size);
612 : else
613 5646 : return rtp_sock_->send(buf, buf_size);
614 : }
615 :
616 : int
617 5731 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
618 : {
619 5731 : if (noWrite_)
620 0 : return 0;
621 :
622 : int ret;
623 5731 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
624 : unsigned int ts_LSB, ts_MSB;
625 : double currentSRTS, currentLatency;
626 :
627 : // Encrypt?
628 5731 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
629 5646 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
630 : buf,
631 : buf_size,
632 5646 : srtpContext_->encryptbuf,
633 : sizeof(srtpContext_->encryptbuf));
634 5646 : if (buf_size < 0) {
635 0 : JAMI_WARN("encrypt error %d", buf_size);
636 0 : return buf_size;
637 : }
638 :
639 5646 : buf = srtpContext_->encryptbuf;
640 : }
641 :
642 : // check if we're sending an RR, if so, detect packet loss
643 : // buf_size gives length of buffer, not just header
644 5731 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
645 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
646 10 : rtcpPacketLoss_ = (header->pt == 201
647 10 : && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
648 : }
649 :
650 : do {
651 5731 : if (interrupted_)
652 0 : return -EINTR;
653 5731 : ret = writeData(buf, buf_size);
654 5731 : } while (ret < 0 and errno == EAGAIN);
655 :
656 5731 : if (buf[1] == 200) // Sender Report
657 : {
658 75 : auto header = reinterpret_cast<rtcpSRHeader*>(buf);
659 75 : ts_LSB = Swap4Bytes(header->timestampLSB);
660 75 : ts_MSB = Swap4Bytes(header->timestampMSB);
661 :
662 75 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
663 :
664 75 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
665 15 : if (histoLatency_.size() >= MAX_LIST_SIZE)
666 0 : histoLatency_.pop_front();
667 :
668 15 : currentLatency = (currentSRTS - lastSRTS_) / 2;
669 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
670 15 : histoLatency_.push_back(currentLatency);
671 : }
672 :
673 75 : lastSRTS_ = currentSRTS;
674 :
675 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
676 :
677 5656 : } else if (buf[1] == 201) // Receiver Report
678 : {
679 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
680 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
681 : }
682 :
683 5731 : return ret < 0 ? -errno : ret;
684 : }
685 :
686 : double
687 10 : SocketPair::getLastLatency()
688 : {
689 10 : if (not histoLatency_.empty())
690 0 : return histoLatency_.back();
691 : else
692 10 : return -1;
693 : }
694 :
695 : void
696 150 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
697 : {
698 150 : rtpDelayCallback_ = std::move(cb);
699 150 : }
700 :
701 : bool
702 5637 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
703 : {
704 : // Keep only last packet of each frame
705 5637 : if (not marker) {
706 49 : return 0;
707 : }
708 :
709 : // 1st frame
710 5588 : if (not lastSendTS_) {
711 48 : lastSendTS_ = sendTS;
712 48 : lastReceiveTS_ = std::chrono::steady_clock::now();
713 48 : return 0;
714 : }
715 :
716 5540 : int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
717 5540 : if (deltaS < 0)
718 7 : deltaS += 64000;
719 5540 : lastSendTS_ = sendTS;
720 :
721 5540 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
722 5540 : auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_)
723 5540 : .count();
724 5540 : lastReceiveTS_ = arrival_TS;
725 :
726 5540 : *gradient = deltaR - deltaS;
727 5540 : *deltaT = deltaR;
728 :
729 5540 : return true;
730 : }
731 :
732 : bool
733 5637 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
734 : {
735 5637 : if (not(buf[0] & 0x10))
736 0 : return false;
737 :
738 5637 : uint16_t magic_word = (buf[12] << 8) + buf[13];
739 5637 : if (magic_word != 0xBEDE)
740 0 : return false;
741 :
742 5637 : uint8_t sec = buf[17] >> 2;
743 5637 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
744 5637 : float milli = fract / pow(2, 32);
745 :
746 5637 : *abs = sec + (milli);
747 5637 : return true;
748 : }
749 :
750 : uint16_t
751 157 : SocketPair::lastSeqValOut()
752 : {
753 157 : if (srtpContext_)
754 157 : return srtpContext_->srtp_out.seq_largest;
755 0 : JAMI_ERR("SRTP context not found.");
756 0 : return 0;
757 : }
758 :
759 : } // namespace jami
|