Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "libav_utils.h"
25 : #include "logger.h"
26 : #include "connectivity/security/memory.h"
27 :
28 : #include <dhtnet/ice_socket.h>
29 :
30 : #include <iostream>
31 : #include <string>
32 : #include <algorithm>
33 : #include <iterator>
34 :
35 : extern "C" {
36 : #include "srtp.h"
37 : }
38 :
39 : #include <cstring>
40 : #include <stdexcept>
41 : #include <unistd.h>
42 : #include <sys/types.h>
43 :
44 : #ifdef _WIN32
45 : #define SOCK_NONBLOCK FIONBIO
46 : #define poll WSAPoll
47 : #define close(x) closesocket(x)
48 : #endif
49 :
50 : #ifdef __ANDROID__
51 : #include <asm-generic/fcntl.h>
52 : #define SOCK_NONBLOCK O_NONBLOCK
53 : #endif
54 :
55 : #ifdef __APPLE__
56 : #include <fcntl.h>
57 : #endif
58 :
59 : // Swap 2 byte, 16 bit values:
60 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
61 :
62 : // Swap 4 byte, 32 bit values:
63 : #define Swap4Bytes(val) \
64 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
65 : | (((val) << 24) & 0xFF000000))
66 :
67 : // Swap 8 byte, 64 bit values:
68 : #define Swap8Bytes(val) \
69 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
70 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
71 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
72 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
73 :
74 : namespace jami {
75 :
76 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
77 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
78 : static constexpr auto UDP_HEADER_SIZE = 8;
79 : static constexpr auto SRTP_OVERHEAD = 10;
80 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
81 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
82 :
83 : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
84 :
85 : class SRTPProtoContext
86 : {
87 : public:
88 304 : SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
89 304 : {
90 304 : jami_secure_memzero(&srtp_out, sizeof(srtp_out));
91 304 : jami_secure_memzero(&srtp_in, sizeof(srtp_in));
92 304 : if (out_suite && out_key) {
93 : // XXX: see srtp_open from libavformat/srtpproto.c
94 304 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
95 0 : srtp_close();
96 0 : throw std::runtime_error("Unable to set crypto on output");
97 : }
98 : }
99 :
100 304 : if (in_suite && in_key) {
101 304 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
102 0 : srtp_close();
103 0 : throw std::runtime_error("Unable to set crypto on input");
104 : }
105 : }
106 303 : }
107 :
108 304 : ~SRTPProtoContext() { srtp_close(); }
109 :
110 : SRTPContext srtp_out {};
111 : SRTPContext srtp_in {};
112 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
113 :
114 : private:
115 304 : void srtp_close() noexcept
116 : {
117 304 : ff_srtp_free(&srtp_out);
118 304 : ff_srtp_free(&srtp_in);
119 304 : }
120 : };
121 :
122 : static int
123 0 : ff_network_wait_fd(int fd)
124 : {
125 0 : struct pollfd p = {fd, POLLOUT, 0};
126 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
127 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
128 : }
129 :
130 : static int
131 0 : udp_socket_create(int family, int port)
132 : {
133 0 : int udp_fd = -1;
134 :
135 : #ifdef __APPLE__
136 : udp_fd = socket(family, SOCK_DGRAM, 0);
137 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
138 : close(udp_fd);
139 : udp_fd = -1;
140 : }
141 : #elif defined _WIN32
142 : udp_fd = socket(family, SOCK_DGRAM, 0);
143 : u_long block = 1;
144 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
145 : close(udp_fd);
146 : udp_fd = -1;
147 : }
148 : #else
149 0 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
150 : #endif
151 :
152 0 : if (udp_fd < 0) {
153 0 : JAMI_ERR("socket() failed");
154 0 : strErr();
155 0 : return -1;
156 : }
157 :
158 0 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
159 0 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
160 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
161 0 : close(udp_fd);
162 0 : return -1;
163 : }
164 :
165 0 : bind_addr.setPort(port);
166 0 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
167 0 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
168 0 : JAMI_ERR("bind() failed");
169 0 : strErr();
170 0 : close(udp_fd);
171 0 : udp_fd = -1;
172 : }
173 :
174 0 : return udp_fd;
175 : }
176 :
177 0 : SocketPair::SocketPair(const char* uri, int localPort)
178 : {
179 0 : openSockets(uri, localPort);
180 0 : }
181 :
182 299 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
183 304 : : rtp_sock_(std::move(rtp_sock))
184 603 : , rtcp_sock_(std::move(rtcp_sock))
185 : {
186 303 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
187 : this,
188 : rtp_sock_->getCompId(),
189 : rtcp_sock_->getCompId());
190 :
191 304 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
192 5609 : std::lock_guard l(dataBuffMutex_);
193 5609 : rtpDataBuff_.emplace_back(buf, buf + len);
194 5609 : cv_.notify_one();
195 5609 : return len;
196 5609 : });
197 304 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
198 264 : std::lock_guard l(dataBuffMutex_);
199 264 : rtcpDataBuff_.emplace_back(buf, buf + len);
200 264 : cv_.notify_one();
201 264 : return len;
202 264 : });
203 303 : }
204 :
205 304 : SocketPair::~SocketPair()
206 : {
207 304 : interrupt();
208 304 : closeSockets();
209 304 : JAMI_DBG("[%p] Instance destroyed", this);
210 304 : }
211 :
212 : bool
213 648 : SocketPair::waitForRTCP(std::chrono::seconds interval)
214 : {
215 648 : std::unique_lock lock(rtcpInfo_mutex_);
216 648 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
217 1881 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
218 1296 : });
219 648 : }
220 :
221 : void
222 10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
223 : {
224 10 : if (len < sizeof(rtcpRRHeader))
225 0 : return;
226 :
227 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
228 10 : if (header->pt != 201) // 201 = RR PT
229 0 : return;
230 :
231 10 : std::lock_guard lock(rtcpInfo_mutex_);
232 :
233 10 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
234 0 : listRtcpRRHeader_.pop_front();
235 : }
236 :
237 10 : listRtcpRRHeader_.emplace_back(*header);
238 :
239 10 : cvRtcpPacketReadyToRead_.notify_one();
240 10 : }
241 :
242 : void
243 181 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
244 : {
245 181 : if (len < sizeof(rtcpREMBHeader))
246 0 : return;
247 :
248 181 : auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
249 181 : if (header->pt != 206) // 206 = REMB PT
250 0 : return;
251 :
252 181 : if (header->uid != 0x424D4552) // uid must be "REMB"
253 0 : return;
254 :
255 181 : std::lock_guard lock(rtcpInfo_mutex_);
256 :
257 181 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
258 0 : listRtcpREMBHeader_.pop_front();
259 : }
260 :
261 181 : listRtcpREMBHeader_.push_back(*header);
262 :
263 181 : cvRtcpPacketReadyToRead_.notify_one();
264 181 : }
265 :
266 : std::list<rtcpRRHeader>
267 647 : SocketPair::getRtcpRR()
268 : {
269 647 : std::lock_guard lock(rtcpInfo_mutex_);
270 1294 : return std::move(listRtcpRRHeader_);
271 648 : }
272 :
273 : std::list<rtcpREMBHeader>
274 372 : SocketPair::getRtcpREMB()
275 : {
276 372 : std::lock_guard lock(rtcpInfo_mutex_);
277 744 : return std::move(listRtcpREMBHeader_);
278 372 : }
279 :
280 : void
281 304 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
282 : {
283 304 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
284 304 : }
285 :
286 : void
287 608 : SocketPair::interrupt()
288 : {
289 608 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
290 608 : interrupted_ = true;
291 608 : if (rtp_sock_)
292 608 : rtp_sock_->setOnRecv(nullptr);
293 608 : if (rtcp_sock_)
294 608 : rtcp_sock_->setOnRecv(nullptr);
295 608 : cv_.notify_all();
296 608 : cvRtcpPacketReadyToRead_.notify_all();
297 608 : }
298 :
299 : void
300 882 : SocketPair::setReadBlockingMode(bool block)
301 : {
302 882 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
303 882 : readBlockingMode_ = block;
304 882 : cv_.notify_all();
305 882 : cvRtcpPacketReadyToRead_.notify_all();
306 882 : }
307 :
308 : void
309 882 : SocketPair::stopSendOp(bool state)
310 : {
311 882 : noWrite_ = state;
312 882 : }
313 :
314 : void
315 304 : SocketPair::closeSockets()
316 : {
317 304 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
318 0 : strErr();
319 304 : if (rtpHandle_ > 0 and close(rtpHandle_))
320 0 : strErr();
321 304 : }
322 :
323 : void
324 0 : SocketPair::openSockets(const char* uri, int local_rtp_port)
325 : {
326 0 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
327 :
328 : char hostname[256];
329 : char path[1024];
330 : int dst_rtp_port;
331 :
332 0 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
333 :
334 0 : const int local_rtcp_port = local_rtp_port + 1;
335 0 : const int dst_rtcp_port = dst_rtp_port + 1;
336 :
337 0 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
338 0 : rtpDestAddr_.setPort(dst_rtp_port);
339 0 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
340 0 : rtcpDestAddr_.setPort(dst_rtcp_port);
341 :
342 : // Open local sockets (RTP/RTCP)
343 0 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
344 0 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
345 0 : closeSockets();
346 0 : JAMI_ERR("[%p] Sockets creation failed", this);
347 0 : throw std::runtime_error("Sockets creation failed");
348 : }
349 :
350 0 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
351 : local_rtp_port,
352 : local_rtcp_port,
353 : hostname,
354 : dst_rtp_port,
355 : dst_rtcp_port);
356 0 : }
357 :
358 : MediaIOHandle*
359 601 : SocketPair::createIOContext(const uint16_t mtu)
360 : {
361 : unsigned ip_header_size;
362 601 : if (rtp_sock_)
363 601 : ip_header_size = rtp_sock_->getTransportOverhead();
364 0 : else if (rtpDestAddr_.getFamily() == AF_INET6)
365 0 : ip_header_size = 40;
366 : else
367 0 : ip_header_size = 20;
368 : return new MediaIOHandle(
369 601 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
370 : true,
371 6275 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
372 5725 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
373 : 0,
374 600 : reinterpret_cast<void*>(this));
375 : }
376 :
377 : int
378 6276 : SocketPair::waitForData()
379 : {
380 : // System sockets
381 6276 : if (rtpHandle_ >= 0) {
382 : int ret;
383 : do {
384 0 : if (interrupted_) {
385 0 : errno = EINTR;
386 0 : return -1;
387 : }
388 :
389 0 : if (not readBlockingMode_) {
390 0 : return 0;
391 : }
392 :
393 : // work with system socket
394 0 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
395 0 : ret = poll(p, 2, NET_POLL_TIMEOUT);
396 0 : if (ret > 0) {
397 0 : ret = 0;
398 0 : if (p[0].revents & POLLIN)
399 0 : ret |= static_cast<int>(DataType::RTP);
400 0 : if (p[1].revents & POLLIN)
401 0 : ret |= static_cast<int>(DataType::RTCP);
402 : }
403 0 : } while (!ret or (ret < 0 and errno == EAGAIN));
404 :
405 0 : return ret;
406 : }
407 :
408 : // work with IceSocket
409 : {
410 6276 : std::unique_lock lk(dataBuffMutex_);
411 6276 : cv_.wait(lk, [this] {
412 12398 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
413 : });
414 6276 : }
415 :
416 6275 : if (interrupted_) {
417 13 : errno = EINTR;
418 13 : return -1;
419 : }
420 :
421 6263 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
422 : }
423 :
424 : int
425 5999 : SocketPair::readRtpData(void* buf, int buf_size)
426 : {
427 : // handle system socket
428 5999 : if (rtpHandle_ >= 0) {
429 : struct sockaddr_storage from;
430 0 : socklen_t from_len = sizeof(from);
431 0 : return recvfrom(rtpHandle_,
432 : static_cast<char*>(buf),
433 : buf_size,
434 : 0,
435 : reinterpret_cast<struct sockaddr*>(&from),
436 0 : &from_len);
437 : }
438 :
439 : // handle ICE
440 5999 : std::unique_lock lk(dataBuffMutex_);
441 5999 : if (not rtpDataBuff_.empty()) {
442 5609 : auto pkt = std::move(rtpDataBuff_.front());
443 5609 : rtpDataBuff_.pop_front();
444 5609 : lk.unlock(); // to not block our ICE callbacks
445 5609 : int pkt_size = pkt.size();
446 5609 : int len = std::min(pkt_size, buf_size);
447 5609 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
448 5609 : return len;
449 5609 : }
450 :
451 390 : return 0;
452 5999 : }
453 :
454 : int
455 6263 : SocketPair::readRtcpData(void* buf, int buf_size)
456 : {
457 : // handle system socket
458 6263 : if (rtcpHandle_ >= 0) {
459 : struct sockaddr_storage from;
460 0 : socklen_t from_len = sizeof(from);
461 0 : return recvfrom(rtcpHandle_,
462 : static_cast<char*>(buf),
463 : buf_size,
464 : 0,
465 : reinterpret_cast<struct sockaddr*>(&from),
466 0 : &from_len);
467 : }
468 :
469 : // handle ICE
470 6263 : std::unique_lock lk(dataBuffMutex_);
471 6263 : if (not rtcpDataBuff_.empty()) {
472 264 : auto pkt = std::move(rtcpDataBuff_.front());
473 264 : rtcpDataBuff_.pop_front();
474 264 : lk.unlock();
475 264 : int pkt_size = pkt.size();
476 264 : int len = std::min(pkt_size, buf_size);
477 264 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
478 264 : return len;
479 264 : }
480 :
481 5999 : return 0;
482 6263 : }
483 :
484 : int
485 6276 : SocketPair::readCallback(uint8_t* buf, int buf_size)
486 : {
487 6276 : auto datatype = waitForData();
488 6276 : if (datatype < 0)
489 13 : return datatype;
490 :
491 6263 : int len = 0;
492 6263 : bool fromRTCP = false;
493 :
494 6263 : if (datatype & static_cast<int>(DataType::RTCP)) {
495 6263 : len = readRtcpData(buf, buf_size);
496 6263 : if (len > 0) {
497 264 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
498 : // 201 = RR PT
499 264 : if (header->pt == 201) {
500 10 : lastDLSR_ = Swap4Bytes(header->dlsr);
501 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
502 10 : lastRR_time = std::chrono::steady_clock::now();
503 10 : saveRtcpRRPacket(buf, len);
504 : }
505 : // 206 = REMB PT
506 254 : else if (header->pt == 206)
507 181 : saveRtcpREMBPacket(buf, len);
508 : // 200 = SR PT
509 73 : else if (header->pt == 200) {
510 : // not used yet
511 : } else {
512 0 : JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
513 : }
514 264 : fromRTCP = true;
515 : }
516 : }
517 :
518 : // No RTCP… attempt RTP
519 6263 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
520 5999 : len = readRtpData(buf, buf_size);
521 5999 : fromRTCP = false;
522 : }
523 :
524 6263 : if (len <= 0)
525 390 : return len;
526 :
527 5873 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
528 0 : return len;
529 :
530 : // SRTP decrypt
531 5873 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
532 5609 : int32_t gradient = 0;
533 5609 : int32_t deltaT = 0;
534 5609 : float abs = 0.0f;
535 5609 : bool res_parse = false;
536 5609 : bool res_delay = false;
537 :
538 5609 : res_parse = parse_RTP_ext(buf, &abs);
539 5609 : bool marker = (buf[1] & 0x80) >> 7;
540 :
541 5609 : if (res_parse)
542 5609 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
543 :
544 : // rtpDelayCallback_ is not set for audio
545 5609 : if (rtpDelayCallback_ and res_delay)
546 5516 : rtpDelayCallback_(gradient, deltaT);
547 :
548 5609 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
549 5609 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
550 0 : packetLossCallback_();
551 5609 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
552 5609 : if (err < 0)
553 0 : JAMI_WARN("decrypt error %d", err);
554 : }
555 :
556 5873 : if (len != 0)
557 5873 : return len;
558 : else
559 0 : return AVERROR_EOF;
560 : }
561 :
562 : int
563 5906 : SocketPair::writeData(uint8_t* buf, int buf_size)
564 : {
565 5906 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
566 :
567 : // System sockets?
568 5906 : if (rtpHandle_ >= 0) {
569 : int fd;
570 : dhtnet::IpAddr* dest_addr;
571 :
572 0 : if (isRTCP) {
573 0 : fd = rtcpHandle_;
574 0 : dest_addr = &rtcpDestAddr_;
575 : } else {
576 0 : fd = rtpHandle_;
577 0 : dest_addr = &rtpDestAddr_;
578 : }
579 :
580 0 : auto ret = ff_network_wait_fd(fd);
581 0 : if (ret < 0)
582 0 : return ret;
583 :
584 0 : if (noWrite_)
585 0 : return buf_size;
586 0 : return ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength());
587 : }
588 :
589 5906 : if (noWrite_)
590 0 : return buf_size;
591 :
592 : // IceSocket
593 5906 : if (isRTCP)
594 266 : return rtcp_sock_->send(buf, buf_size);
595 : else
596 5640 : return rtp_sock_->send(buf, buf_size);
597 : }
598 :
599 : int
600 5725 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
601 : {
602 5725 : if (noWrite_)
603 0 : return 0;
604 :
605 : int ret;
606 5725 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
607 : unsigned int ts_LSB, ts_MSB;
608 : double currentSRTS, currentLatency;
609 :
610 : // Encrypt?
611 5725 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
612 5640 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
613 : buf,
614 : buf_size,
615 5640 : srtpContext_->encryptbuf,
616 : sizeof(srtpContext_->encryptbuf));
617 5640 : if (buf_size < 0) {
618 0 : JAMI_WARN("encrypt error %d", buf_size);
619 0 : return buf_size;
620 : }
621 :
622 5640 : buf = srtpContext_->encryptbuf;
623 : }
624 :
625 : // check if we're sending an RR, if so, detect packet loss
626 : // buf_size gives length of buffer, not just header
627 5725 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
628 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
629 10 : rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
630 : }
631 :
632 : do {
633 5725 : if (interrupted_)
634 0 : return -EINTR;
635 5725 : ret = writeData(buf, buf_size);
636 5725 : } while (ret < 0 and errno == EAGAIN);
637 :
638 5725 : if (buf[1] == 200) // Sender Report
639 : {
640 75 : auto header = reinterpret_cast<rtcpSRHeader*>(buf);
641 75 : ts_LSB = Swap4Bytes(header->timestampLSB);
642 75 : ts_MSB = Swap4Bytes(header->timestampMSB);
643 :
644 75 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
645 :
646 75 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
647 14 : if (histoLatency_.size() >= MAX_LIST_SIZE)
648 0 : histoLatency_.pop_front();
649 :
650 14 : currentLatency = (currentSRTS - lastSRTS_) / 2;
651 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
652 14 : histoLatency_.push_back(currentLatency);
653 : }
654 :
655 75 : lastSRTS_ = currentSRTS;
656 :
657 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
658 :
659 5650 : } else if (buf[1] == 201) // Receiver Report
660 : {
661 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
662 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
663 : }
664 :
665 5725 : return ret < 0 ? -errno : ret;
666 : }
667 :
668 : double
669 10 : SocketPair::getLastLatency()
670 : {
671 10 : if (not histoLatency_.empty())
672 0 : return histoLatency_.back();
673 : else
674 10 : return -1;
675 : }
676 :
677 : void
678 138 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
679 : {
680 138 : rtpDelayCallback_ = std::move(cb);
681 138 : }
682 :
683 : bool
684 5609 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
685 : {
686 : // Keep only last packet of each frame
687 5609 : if (not marker) {
688 46 : return 0;
689 : }
690 :
691 : // 1st frame
692 5563 : if (not lastSendTS_) {
693 47 : lastSendTS_ = sendTS;
694 47 : lastReceiveTS_ = std::chrono::steady_clock::now();
695 47 : return 0;
696 : }
697 :
698 5516 : int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
699 5516 : if (deltaS < 0)
700 3 : deltaS += 64000;
701 5516 : lastSendTS_ = sendTS;
702 :
703 5516 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
704 5516 : auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count();
705 5516 : lastReceiveTS_ = arrival_TS;
706 :
707 5516 : *gradient = deltaR - deltaS;
708 5516 : *deltaT = deltaR;
709 :
710 5516 : return true;
711 : }
712 :
713 : bool
714 5609 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
715 : {
716 5609 : if (not(buf[0] & 0x10))
717 0 : return false;
718 :
719 5609 : uint16_t magic_word = (buf[12] << 8) + buf[13];
720 5609 : if (magic_word != 0xBEDE)
721 0 : return false;
722 :
723 5609 : uint8_t sec = buf[17] >> 2;
724 5609 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
725 5609 : float milli = fract / pow(2, 32);
726 :
727 5609 : *abs = sec + (milli);
728 5609 : return true;
729 : }
730 :
731 : uint16_t
732 138 : SocketPair::lastSeqValOut()
733 : {
734 138 : if (srtpContext_)
735 138 : return srtpContext_->srtp_out.seq_largest;
736 0 : JAMI_ERR("SRTP context not found.");
737 0 : return 0;
738 : }
739 :
740 : } // namespace jami
|