Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "libav_utils.h"
25 : #include "logger.h"
26 : #include "connectivity/security/memory.h"
27 :
28 : #include <dhtnet/ice_socket.h>
29 :
30 : #include <iostream>
31 : #include <string>
32 : #include <algorithm>
33 : #include <iterator>
34 :
35 : extern "C" {
36 : #include "srtp.h"
37 : }
38 :
39 : #include <cstring>
40 : #include <stdexcept>
41 : #include <unistd.h>
42 : #include <sys/types.h>
43 : #include <ciso646> // fix windows compiler bug
44 :
45 : #ifdef _WIN32
46 : #define SOCK_NONBLOCK FIONBIO
47 : #define poll WSAPoll
48 : #define close(x) closesocket(x)
49 : #endif
50 :
51 : #ifdef __ANDROID__
52 : #include <asm-generic/fcntl.h>
53 : #define SOCK_NONBLOCK O_NONBLOCK
54 : #endif
55 :
56 : #ifdef __APPLE__
57 : #include <fcntl.h>
58 : #endif
59 :
60 : // Swap 2 byte, 16 bit values:
61 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
62 :
63 : // Swap 4 byte, 32 bit values:
64 : #define Swap4Bytes(val) \
65 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
66 : | (((val) << 24) & 0xFF000000))
67 :
68 : // Swap 8 byte, 64 bit values:
69 : #define Swap8Bytes(val) \
70 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
71 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
72 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
73 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
74 :
75 : namespace jami {
76 :
77 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
78 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
79 : static constexpr auto UDP_HEADER_SIZE = 8;
80 : static constexpr auto SRTP_OVERHEAD = 10;
81 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
82 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
83 :
84 : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
85 :
86 : class SRTPProtoContext
87 : {
88 : public:
89 340 : SRTPProtoContext(const char* out_suite,
90 : const char* out_key,
91 : const char* in_suite,
92 : const char* in_key)
93 340 : {
94 340 : ring_secure_memzero(&srtp_out, sizeof(srtp_out));
95 340 : ring_secure_memzero(&srtp_in, sizeof(srtp_in));
96 340 : if (out_suite && out_key) {
97 : // XXX: see srtp_open from libavformat/srtpproto.c
98 340 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
99 0 : srtp_close();
100 0 : throw std::runtime_error("Unable to set crypto on output");
101 : }
102 : }
103 :
104 340 : if (in_suite && in_key) {
105 340 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
106 0 : srtp_close();
107 0 : throw std::runtime_error("Unable to set crypto on input");
108 : }
109 : }
110 340 : }
111 :
112 340 : ~SRTPProtoContext() { srtp_close(); }
113 :
114 : SRTPContext srtp_out {};
115 : SRTPContext srtp_in {};
116 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
117 :
118 : private:
119 340 : void srtp_close() noexcept
120 : {
121 340 : ff_srtp_free(&srtp_out);
122 340 : ff_srtp_free(&srtp_in);
123 340 : }
124 : };
125 :
126 : static int
127 0 : ff_network_wait_fd(int fd)
128 : {
129 0 : struct pollfd p = {fd, POLLOUT, 0};
130 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
131 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
132 : }
133 :
134 : static int
135 24 : udp_socket_create(int family, int port)
136 : {
137 24 : int udp_fd = -1;
138 :
139 : #ifdef __APPLE__
140 : udp_fd = socket(family, SOCK_DGRAM, 0);
141 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
142 : close(udp_fd);
143 : udp_fd = -1;
144 : }
145 : #elif defined _WIN32
146 : udp_fd = socket(family, SOCK_DGRAM, 0);
147 : u_long block = 1;
148 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
149 : close(udp_fd);
150 : udp_fd = -1;
151 : }
152 : #else
153 24 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
154 : #endif
155 :
156 24 : if (udp_fd < 0) {
157 0 : JAMI_ERR("socket() failed");
158 0 : strErr();
159 0 : return -1;
160 : }
161 :
162 24 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
163 24 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
164 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
165 0 : close(udp_fd);
166 0 : return -1;
167 : }
168 :
169 24 : bind_addr.setPort(port);
170 24 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
171 24 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
172 0 : JAMI_ERR("bind() failed");
173 0 : strErr();
174 0 : close(udp_fd);
175 0 : udp_fd = -1;
176 : }
177 :
178 24 : return udp_fd;
179 : }
180 :
181 12 : SocketPair::SocketPair(const char* uri, int localPort)
182 : {
183 12 : openSockets(uri, localPort);
184 12 : }
185 :
186 328 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
187 328 : : rtp_sock_(std::move(rtp_sock))
188 656 : , rtcp_sock_(std::move(rtcp_sock))
189 : {
190 328 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
191 : this,
192 : rtp_sock_->getCompId(),
193 : rtcp_sock_->getCompId());
194 :
195 328 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
196 5333 : std::lock_guard l(dataBuffMutex_);
197 5333 : rtpDataBuff_.emplace_back(buf, buf + len);
198 5333 : cv_.notify_one();
199 5333 : return len;
200 5333 : });
201 328 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
202 244 : std::lock_guard l(dataBuffMutex_);
203 244 : rtcpDataBuff_.emplace_back(buf, buf + len);
204 244 : cv_.notify_one();
205 244 : return len;
206 244 : });
207 328 : }
208 :
209 340 : SocketPair::~SocketPair()
210 : {
211 340 : interrupt();
212 340 : closeSockets();
213 340 : JAMI_DBG("[%p] Instance destroyed", this);
214 340 : }
215 :
216 : bool
217 730 : SocketPair::waitForRTCP(std::chrono::seconds interval)
218 : {
219 730 : std::unique_lock lock(rtcpInfo_mutex_);
220 730 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
221 2152 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
222 1460 : });
223 730 : }
224 :
225 : void
226 10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
227 : {
228 10 : if (len < sizeof(rtcpRRHeader))
229 0 : return;
230 :
231 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
232 10 : if (header->pt != 201) // 201 = RR PT
233 0 : return;
234 :
235 10 : std::lock_guard lock(rtcpInfo_mutex_);
236 :
237 10 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
238 0 : listRtcpRRHeader_.pop_front();
239 : }
240 :
241 10 : listRtcpRRHeader_.emplace_back(*header);
242 :
243 10 : cvRtcpPacketReadyToRead_.notify_one();
244 10 : }
245 :
246 : void
247 163 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
248 : {
249 163 : if (len < sizeof(rtcpREMBHeader))
250 0 : return;
251 :
252 163 : auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
253 163 : if (header->pt != 206) // 206 = REMB PT
254 0 : return;
255 :
256 163 : if (header->uid != 0x424D4552) // uid must be "REMB"
257 0 : return;
258 :
259 163 : std::lock_guard lock(rtcpInfo_mutex_);
260 :
261 163 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
262 0 : listRtcpREMBHeader_.pop_front();
263 : }
264 :
265 163 : listRtcpREMBHeader_.push_back(*header);
266 :
267 163 : cvRtcpPacketReadyToRead_.notify_one();
268 163 : }
269 :
270 : std::list<rtcpRRHeader>
271 730 : SocketPair::getRtcpRR()
272 : {
273 730 : std::lock_guard lock(rtcpInfo_mutex_);
274 1460 : return std::move(listRtcpRRHeader_);
275 730 : }
276 :
277 : std::list<rtcpREMBHeader>
278 393 : SocketPair::getRtcpREMB()
279 : {
280 393 : std::lock_guard lock(rtcpInfo_mutex_);
281 786 : return std::move(listRtcpREMBHeader_);
282 393 : }
283 :
284 : void
285 340 : SocketPair::createSRTP(const char* out_suite,
286 : const char* out_key,
287 : const char* in_suite,
288 : const char* in_key)
289 : {
290 340 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
291 340 : }
292 :
293 : void
294 680 : SocketPair::interrupt()
295 : {
296 680 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
297 680 : interrupted_ = true;
298 680 : if (rtp_sock_)
299 656 : rtp_sock_->setOnRecv(nullptr);
300 680 : if (rtcp_sock_)
301 656 : rtcp_sock_->setOnRecv(nullptr);
302 680 : cv_.notify_all();
303 680 : cvRtcpPacketReadyToRead_.notify_all();
304 680 : }
305 :
306 : void
307 982 : SocketPair::setReadBlockingMode(bool block)
308 : {
309 982 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
310 982 : readBlockingMode_ = block;
311 982 : cv_.notify_all();
312 982 : cvRtcpPacketReadyToRead_.notify_all();
313 982 : }
314 :
315 : void
316 983 : SocketPair::stopSendOp(bool state)
317 : {
318 983 : noWrite_ = state;
319 983 : }
320 :
321 : void
322 340 : SocketPair::closeSockets()
323 : {
324 340 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
325 0 : strErr();
326 340 : if (rtpHandle_ > 0 and close(rtpHandle_))
327 0 : strErr();
328 340 : }
329 :
330 : void
331 12 : SocketPair::openSockets(const char* uri, int local_rtp_port)
332 : {
333 12 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
334 :
335 : char hostname[256];
336 : char path[1024];
337 : int dst_rtp_port;
338 :
339 12 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
340 :
341 12 : const int local_rtcp_port = local_rtp_port + 1;
342 12 : const int dst_rtcp_port = dst_rtp_port + 1;
343 :
344 12 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
345 12 : rtpDestAddr_.setPort(dst_rtp_port);
346 12 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
347 12 : rtcpDestAddr_.setPort(dst_rtcp_port);
348 :
349 : // Open local sockets (RTP/RTCP)
350 12 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
351 12 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
352 0 : closeSockets();
353 0 : JAMI_ERR("[%p] Sockets creation failed", this);
354 0 : throw std::runtime_error("Sockets creation failed");
355 : }
356 :
357 12 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
358 : local_rtp_port,
359 : local_rtcp_port,
360 : hostname,
361 : dst_rtp_port,
362 : dst_rtcp_port);
363 12 : }
364 :
365 : MediaIOHandle*
366 677 : SocketPair::createIOContext(const uint16_t mtu)
367 : {
368 : unsigned ip_header_size;
369 677 : if (rtp_sock_)
370 657 : ip_header_size = rtp_sock_->getTransportOverhead();
371 20 : else if (rtpDestAddr_.getFamily() == AF_INET6)
372 0 : ip_header_size = 40;
373 : else
374 20 : ip_header_size = 20;
375 : return new MediaIOHandle(
376 677 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
377 : true,
378 6030 : [](void* sp, uint8_t* buf, int len) {
379 6030 : return static_cast<SocketPair*>(sp)->readCallback(buf, len);
380 : },
381 5427 : [](void* sp, uint8_t* buf, int len) {
382 5427 : return static_cast<SocketPair*>(sp)->writeCallback(buf, len);
383 : },
384 : 0,
385 677 : reinterpret_cast<void*>(this));
386 : }
387 :
388 : int
389 6030 : SocketPair::waitForData()
390 : {
391 : // System sockets
392 6030 : if (rtpHandle_ >= 0) {
393 : int ret;
394 : do {
395 155 : if (interrupted_) {
396 8 : errno = EINTR;
397 12 : return -1;
398 : }
399 :
400 147 : if (not readBlockingMode_) {
401 4 : return 0;
402 : }
403 :
404 : // work with system socket
405 143 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
406 143 : ret = poll(p, 2, NET_POLL_TIMEOUT);
407 143 : if (ret > 0) {
408 0 : ret = 0;
409 0 : if (p[0].revents & POLLIN)
410 0 : ret |= static_cast<int>(DataType::RTP);
411 0 : if (p[1].revents & POLLIN)
412 0 : ret |= static_cast<int>(DataType::RTCP);
413 : }
414 143 : } while (!ret or (ret < 0 and errno == EAGAIN));
415 :
416 0 : return ret;
417 : }
418 :
419 : // work with IceSocket
420 : {
421 6018 : std::unique_lock lk(dataBuffMutex_);
422 6018 : cv_.wait(lk, [this] {
423 23654 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty()
424 23652 : or not readBlockingMode_;
425 : });
426 6018 : }
427 :
428 6018 : if (interrupted_) {
429 34 : errno = EINTR;
430 34 : return -1;
431 : }
432 :
433 5984 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
434 : }
435 :
436 : int
437 5740 : SocketPair::readRtpData(void* buf, int buf_size)
438 : {
439 : // handle system socket
440 5740 : if (rtpHandle_ >= 0) {
441 : struct sockaddr_storage from;
442 0 : socklen_t from_len = sizeof(from);
443 0 : return recvfrom(rtpHandle_,
444 : static_cast<char*>(buf),
445 : buf_size,
446 : 0,
447 : reinterpret_cast<struct sockaddr*>(&from),
448 0 : &from_len);
449 : }
450 :
451 : // handle ICE
452 5740 : std::unique_lock lk(dataBuffMutex_);
453 5740 : if (not rtpDataBuff_.empty()) {
454 5333 : auto pkt = std::move(rtpDataBuff_.front());
455 5333 : rtpDataBuff_.pop_front();
456 5333 : lk.unlock(); // to not block our ICE callbacks
457 5333 : int pkt_size = pkt.size();
458 5333 : int len = std::min(pkt_size, buf_size);
459 5333 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
460 5333 : return len;
461 5333 : }
462 :
463 407 : return 0;
464 5740 : }
465 :
466 : int
467 5984 : SocketPair::readRtcpData(void* buf, int buf_size)
468 : {
469 : // handle system socket
470 5984 : if (rtcpHandle_ >= 0) {
471 : struct sockaddr_storage from;
472 0 : socklen_t from_len = sizeof(from);
473 0 : return recvfrom(rtcpHandle_,
474 : static_cast<char*>(buf),
475 : buf_size,
476 : 0,
477 : reinterpret_cast<struct sockaddr*>(&from),
478 0 : &from_len);
479 : }
480 :
481 : // handle ICE
482 5984 : std::unique_lock lk(dataBuffMutex_);
483 5984 : if (not rtcpDataBuff_.empty()) {
484 244 : auto pkt = std::move(rtcpDataBuff_.front());
485 244 : rtcpDataBuff_.pop_front();
486 244 : lk.unlock();
487 244 : int pkt_size = pkt.size();
488 244 : int len = std::min(pkt_size, buf_size);
489 244 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
490 244 : return len;
491 244 : }
492 :
493 5740 : return 0;
494 5984 : }
495 :
496 : int
497 6030 : SocketPair::readCallback(uint8_t* buf, int buf_size)
498 : {
499 6030 : auto datatype = waitForData();
500 6030 : if (datatype < 0)
501 42 : return datatype;
502 :
503 5988 : int len = 0;
504 5988 : bool fromRTCP = false;
505 :
506 5988 : if (datatype & static_cast<int>(DataType::RTCP)) {
507 5984 : len = readRtcpData(buf, buf_size);
508 5984 : if (len > 0) {
509 244 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
510 : // 201 = RR PT
511 244 : if (header->pt == 201) {
512 10 : lastDLSR_ = Swap4Bytes(header->dlsr);
513 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
514 10 : lastRR_time = std::chrono::steady_clock::now();
515 10 : saveRtcpRRPacket(buf, len);
516 : }
517 : // 206 = REMB PT
518 234 : else if (header->pt == 206)
519 163 : saveRtcpREMBPacket(buf, len);
520 : // 200 = SR PT
521 71 : else if (header->pt == 200) {
522 : // not used yet
523 : } else {
524 0 : JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
525 : }
526 244 : fromRTCP = true;
527 : }
528 : }
529 :
530 : // No RTCP… attempt RTP
531 5988 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
532 5740 : len = readRtpData(buf, buf_size);
533 5740 : fromRTCP = false;
534 : }
535 :
536 5988 : if (len <= 0)
537 411 : return len;
538 :
539 5577 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
540 0 : return len;
541 :
542 : // SRTP decrypt
543 5577 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
544 5333 : int32_t gradient = 0;
545 5333 : int32_t deltaT = 0;
546 5333 : float abs = 0.0f;
547 5333 : bool res_parse = false;
548 5333 : bool res_delay = false;
549 :
550 5333 : res_parse = parse_RTP_ext(buf, &abs);
551 5333 : bool marker = (buf[1] & 0x80) >> 7;
552 :
553 5333 : if (res_parse)
554 5333 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
555 :
556 : // rtpDelayCallback_ is not set for audio
557 5333 : if (rtpDelayCallback_ and res_delay)
558 5240 : rtpDelayCallback_(gradient, deltaT);
559 :
560 5333 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
561 5333 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
562 0 : packetLossCallback_();
563 5333 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
564 5333 : if (err < 0)
565 0 : JAMI_WARN("decrypt error %d", err);
566 : }
567 :
568 5577 : if (len != 0)
569 5577 : return len;
570 : else
571 0 : return AVERROR_EOF;
572 : }
573 :
574 : int
575 5590 : SocketPair::writeData(uint8_t* buf, int buf_size)
576 : {
577 5590 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
578 :
579 : // System sockets?
580 5590 : if (rtpHandle_ >= 0) {
581 : int fd;
582 : dhtnet::IpAddr* dest_addr;
583 :
584 0 : if (isRTCP) {
585 0 : fd = rtcpHandle_;
586 0 : dest_addr = &rtcpDestAddr_;
587 : } else {
588 0 : fd = rtpHandle_;
589 0 : dest_addr = &rtpDestAddr_;
590 : }
591 :
592 0 : auto ret = ff_network_wait_fd(fd);
593 0 : if (ret < 0)
594 0 : return ret;
595 :
596 0 : if (noWrite_)
597 0 : return buf_size;
598 0 : return ::sendto(fd,
599 : reinterpret_cast<const char*>(buf),
600 : buf_size,
601 : 0,
602 : *dest_addr,
603 0 : dest_addr->getLength());
604 : }
605 :
606 5590 : if (noWrite_)
607 0 : return buf_size;
608 :
609 : // IceSocket
610 5590 : if (isRTCP)
611 246 : return rtcp_sock_->send(buf, buf_size);
612 : else
613 5344 : return rtp_sock_->send(buf, buf_size);
614 : }
615 :
616 : int
617 5427 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
618 : {
619 5427 : if (noWrite_)
620 0 : return 0;
621 :
622 : int ret;
623 5427 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
624 : unsigned int ts_LSB, ts_MSB;
625 : double currentSRTS, currentLatency;
626 :
627 : // Encrypt?
628 5427 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
629 5344 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
630 : buf,
631 : buf_size,
632 5344 : srtpContext_->encryptbuf,
633 : sizeof(srtpContext_->encryptbuf));
634 5344 : if (buf_size < 0) {
635 0 : JAMI_WARN("encrypt error %d", buf_size);
636 0 : return buf_size;
637 : }
638 :
639 5344 : buf = srtpContext_->encryptbuf;
640 : }
641 :
642 : // check if we're sending an RR, if so, detect packet loss
643 : // buf_size gives length of buffer, not just header
644 5427 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
645 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
646 10 : rtcpPacketLoss_ = (header->pt == 201
647 10 : && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
648 : }
649 :
650 : do {
651 5427 : if (interrupted_)
652 0 : return -EINTR;
653 5427 : ret = writeData(buf, buf_size);
654 5427 : } while (ret < 0 and errno == EAGAIN);
655 :
656 5427 : if (buf[1] == 200) // Sender Report
657 : {
658 73 : auto header = reinterpret_cast<rtcpSRHeader*>(buf);
659 73 : ts_LSB = Swap4Bytes(header->timestampLSB);
660 73 : ts_MSB = Swap4Bytes(header->timestampMSB);
661 :
662 73 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
663 :
664 73 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
665 14 : if (histoLatency_.size() >= MAX_LIST_SIZE)
666 0 : histoLatency_.pop_front();
667 :
668 14 : currentLatency = (currentSRTS - lastSRTS_) / 2;
669 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
670 14 : histoLatency_.push_back(currentLatency);
671 : }
672 :
673 73 : lastSRTS_ = currentSRTS;
674 :
675 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
676 :
677 5354 : } else if (buf[1] == 201) // Receiver Report
678 : {
679 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
680 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
681 : }
682 :
683 5427 : return ret < 0 ? -errno : ret;
684 : }
685 :
686 : double
687 10 : SocketPair::getLastLatency()
688 : {
689 10 : if (not histoLatency_.empty())
690 0 : return histoLatency_.back();
691 : else
692 10 : return -1;
693 : }
694 :
695 : void
696 150 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
697 : {
698 150 : rtpDelayCallback_ = std::move(cb);
699 150 : }
700 :
701 : bool
702 5333 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
703 : {
704 : // Keep only last packet of each frame
705 5333 : if (not marker) {
706 46 : return 0;
707 : }
708 :
709 : // 1st frame
710 5287 : if (not lastSendTS_) {
711 47 : lastSendTS_ = sendTS;
712 47 : lastReceiveTS_ = std::chrono::steady_clock::now();
713 47 : return 0;
714 : }
715 :
716 5240 : int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
717 5240 : if (deltaS < 0)
718 4 : deltaS += 64000;
719 5240 : lastSendTS_ = sendTS;
720 :
721 5240 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
722 5240 : auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_)
723 5240 : .count();
724 5240 : lastReceiveTS_ = arrival_TS;
725 :
726 5240 : *gradient = deltaR - deltaS;
727 5240 : *deltaT = deltaR;
728 :
729 5240 : return true;
730 : }
731 :
732 : bool
733 5333 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
734 : {
735 5333 : if (not(buf[0] & 0x10))
736 0 : return false;
737 :
738 5333 : uint16_t magic_word = (buf[12] << 8) + buf[13];
739 5333 : if (magic_word != 0xBEDE)
740 0 : return false;
741 :
742 5333 : uint8_t sec = buf[17] >> 2;
743 5333 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
744 5333 : float milli = fract / pow(2, 32);
745 :
746 5333 : *abs = sec + (milli);
747 5333 : return true;
748 : }
749 :
750 : uint16_t
751 152 : SocketPair::lastSeqValOut()
752 : {
753 152 : if (srtpContext_)
754 152 : return srtpContext_->srtp_out.seq_largest;
755 0 : JAMI_ERR("SRTP context not found.");
756 0 : return 0;
757 : }
758 :
759 : } // namespace jami
|