Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "logger.h"
25 : #include "connectivity/security/memory.h"
26 :
27 : #include <dhtnet/ice_socket.h>
28 :
29 : #include <string>
30 : #include <algorithm>
31 :
32 : extern "C" {
33 : #include "srtp.h"
34 : }
35 :
36 : #include <cstring>
37 : #include <stdexcept>
38 : #include <unistd.h>
39 : #include <sys/types.h>
40 :
41 : #ifdef _WIN32
42 : #define SOCK_NONBLOCK FIONBIO
43 : #define poll WSAPoll
44 : #define close(x) closesocket(x)
45 : #endif
46 :
47 : #ifdef __ANDROID__
48 : #include <asm-generic/fcntl.h>
49 : #define SOCK_NONBLOCK O_NONBLOCK
50 : #endif
51 :
52 : #ifdef __APPLE__
53 : #include <fcntl.h>
54 : #endif
55 :
56 : // Swap 2 byte, 16 bit values:
57 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
58 :
59 : // Swap 4 byte, 32 bit values:
60 : #define Swap4Bytes(val) \
61 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
62 : | (((val) << 24) & 0xFF000000))
63 :
64 : // Swap 8 byte, 64 bit values:
65 : #define Swap8Bytes(val) \
66 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
67 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
68 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
69 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
70 :
71 : namespace jami {
72 :
73 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
74 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
75 : static constexpr auto UDP_HEADER_SIZE = 8;
76 : static constexpr auto SRTP_OVERHEAD = 10;
77 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
78 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
79 :
80 : enum class DataType : uint8_t { RTP = 1 << 0, RTCP = 1 << 1 };
81 :
82 : class SRTPProtoContext
83 : {
84 : public:
85 318 : SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
86 318 : {
87 318 : jami_secure_memzero(&srtp_out, sizeof(srtp_out));
88 318 : jami_secure_memzero(&srtp_in, sizeof(srtp_in));
89 318 : if (out_suite && out_key) {
90 : // XXX: see srtp_open from libavformat/srtpproto.c
91 318 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
92 0 : srtp_close();
93 0 : throw std::runtime_error("Unable to set crypto on output");
94 : }
95 : }
96 :
97 318 : if (in_suite && in_key) {
98 318 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
99 0 : srtp_close();
100 0 : throw std::runtime_error("Unable to set crypto on input");
101 : }
102 : }
103 318 : }
104 :
105 318 : ~SRTPProtoContext() { srtp_close(); }
106 :
107 : SRTPContext srtp_out {};
108 : SRTPContext srtp_in {};
109 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
110 :
111 : private:
112 318 : void srtp_close() noexcept
113 : {
114 318 : ff_srtp_free(&srtp_out);
115 318 : ff_srtp_free(&srtp_in);
116 318 : }
117 : };
118 :
119 : static int
120 0 : ff_network_wait_fd(int fd)
121 : {
122 0 : struct pollfd p = {fd, POLLOUT, 0};
123 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
124 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
125 : }
126 :
127 : static int
128 8 : udp_socket_create(int family, int port)
129 : {
130 8 : int udp_fd = -1;
131 :
132 : #ifdef __APPLE__
133 : udp_fd = socket(family, SOCK_DGRAM, 0);
134 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
135 : close(udp_fd);
136 : udp_fd = -1;
137 : }
138 : #elif defined _WIN32
139 : udp_fd = socket(family, SOCK_DGRAM, 0);
140 : u_long block = 1;
141 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
142 : close(udp_fd);
143 : udp_fd = -1;
144 : }
145 : #else
146 8 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
147 : #endif
148 :
149 8 : if (udp_fd < 0) {
150 0 : JAMI_ERROR("socket() failed");
151 0 : strErr();
152 0 : return -1;
153 : }
154 :
155 8 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
156 8 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
157 0 : JAMI_ERROR("No IPv4/IPv6 host found for family {}", family);
158 0 : close(udp_fd);
159 0 : return -1;
160 : }
161 :
162 8 : bind_addr.setPort(port);
163 32 : JAMI_LOG("use local address: {}", bind_addr.toString(true, true));
164 8 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
165 0 : JAMI_ERROR("bind() failed");
166 0 : strErr();
167 0 : close(udp_fd);
168 0 : udp_fd = -1;
169 : }
170 :
171 8 : return udp_fd;
172 : }
173 :
174 4 : SocketPair::SocketPair(const char* uri, int localPort)
175 : {
176 4 : openSockets(uri, localPort);
177 4 : }
178 :
179 314 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
180 313 : : rtp_sock_(std::move(rtp_sock))
181 628 : , rtcp_sock_(std::move(rtcp_sock))
182 : {
183 1255 : JAMI_LOG("[{}] Creating instance using ICE sockets for comp {} and {}",
184 : fmt::ptr(this),
185 : rtp_sock_->getCompId(),
186 : rtcp_sock_->getCompId());
187 :
188 313 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
189 5678 : std::lock_guard l(dataBuffMutex_);
190 5678 : rtpDataBuff_.emplace_back(buf, buf + len);
191 5678 : cv_.notify_one();
192 5678 : return len;
193 5678 : });
194 314 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
195 261 : std::lock_guard l(dataBuffMutex_);
196 261 : rtcpDataBuff_.emplace_back(buf, buf + len);
197 261 : cv_.notify_one();
198 261 : return len;
199 261 : });
200 314 : }
201 :
202 318 : SocketPair::~SocketPair()
203 : {
204 318 : interrupt();
205 318 : closeSockets();
206 1272 : JAMI_LOG("[{}] Instance destroyed", fmt::ptr(this));
207 318 : }
208 :
209 : bool
210 658 : SocketPair::waitForRTCP(std::chrono::seconds interval)
211 : {
212 658 : std::unique_lock lock(rtcpInfo_mutex_);
213 658 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
214 2038 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
215 1316 : });
216 658 : }
217 :
218 : void
219 10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
220 : {
221 10 : if (len < sizeof(rtcpRRHeader))
222 0 : return;
223 :
224 10 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
225 10 : if (header->pt != 201) // 201 = RR PT
226 0 : return;
227 :
228 10 : std::lock_guard lock(rtcpInfo_mutex_);
229 :
230 10 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
231 0 : listRtcpRRHeader_.pop_front();
232 : }
233 :
234 10 : listRtcpRRHeader_.emplace_back(*header);
235 :
236 10 : cvRtcpPacketReadyToRead_.notify_one();
237 10 : }
238 :
239 : void
240 176 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
241 : {
242 176 : if (len < sizeof(rtcpREMBHeader))
243 0 : return;
244 :
245 176 : auto* header = reinterpret_cast<rtcpREMBHeader*>(buf);
246 176 : if (header->pt != 206) // 206 = REMB PT
247 0 : return;
248 :
249 176 : if (header->uid != 0x424D4552) // uid must be "REMB"
250 0 : return;
251 :
252 176 : std::lock_guard lock(rtcpInfo_mutex_);
253 :
254 176 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
255 0 : listRtcpREMBHeader_.pop_front();
256 : }
257 :
258 176 : listRtcpREMBHeader_.push_back(*header);
259 :
260 176 : cvRtcpPacketReadyToRead_.notify_one();
261 176 : }
262 :
263 : std::list<rtcpRRHeader>
264 658 : SocketPair::getRtcpRR()
265 : {
266 658 : std::lock_guard lock(rtcpInfo_mutex_);
267 1316 : return std::move(listRtcpRRHeader_);
268 658 : }
269 :
270 : std::list<rtcpREMBHeader>
271 373 : SocketPair::getRtcpREMB()
272 : {
273 373 : std::lock_guard lock(rtcpInfo_mutex_);
274 746 : return std::move(listRtcpREMBHeader_);
275 373 : }
276 :
277 : void
278 317 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
279 : {
280 317 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
281 318 : }
282 :
283 : void
284 636 : SocketPair::interrupt()
285 : {
286 2544 : JAMI_WARNING("[{}] Interrupting RTP sockets", fmt::ptr(this));
287 636 : interrupted_ = true;
288 636 : if (rtp_sock_)
289 628 : rtp_sock_->setOnRecv(nullptr);
290 636 : if (rtcp_sock_)
291 628 : rtcp_sock_->setOnRecv(nullptr);
292 636 : cv_.notify_all();
293 636 : cvRtcpPacketReadyToRead_.notify_all();
294 636 : }
295 :
296 : void
297 917 : SocketPair::setReadBlockingMode(bool block)
298 : {
299 3668 : JAMI_LOG("[{}] Read operations in blocking mode [{}]", fmt::ptr(this), block ? "YES" : "NO");
300 917 : readBlockingMode_ = block;
301 917 : cv_.notify_all();
302 917 : cvRtcpPacketReadyToRead_.notify_all();
303 917 : }
304 :
305 : void
306 915 : SocketPair::stopSendOp(bool state)
307 : {
308 915 : noWrite_ = state;
309 915 : }
310 :
311 : void
312 318 : SocketPair::closeSockets()
313 : {
314 318 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
315 0 : strErr();
316 318 : if (rtpHandle_ > 0 and close(rtpHandle_))
317 0 : strErr();
318 318 : }
319 :
320 : void
321 4 : SocketPair::openSockets(const char* uri, int local_rtp_port)
322 : {
323 16 : JAMI_LOG("[{}] Creating rtp socket for uri {} on port {}", fmt::ptr(this), uri, local_rtp_port);
324 :
325 : char hostname[256];
326 : char path[1024];
327 : int dst_rtp_port;
328 :
329 4 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
330 :
331 4 : const int local_rtcp_port = local_rtp_port + 1;
332 4 : const int dst_rtcp_port = dst_rtp_port + 1;
333 :
334 4 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
335 4 : rtpDestAddr_.setPort(dst_rtp_port);
336 4 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
337 4 : rtcpDestAddr_.setPort(dst_rtcp_port);
338 :
339 : // Open local sockets (RTP/RTCP)
340 4 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
341 4 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
342 0 : closeSockets();
343 0 : JAMI_ERROR("[{}] Sockets creation failed", fmt::ptr(this));
344 0 : throw std::runtime_error("Sockets creation failed");
345 : }
346 :
347 16 : JAMI_WARNING("SocketPair: local({},{}) / {}({},{})",
348 : local_rtp_port,
349 : local_rtcp_port,
350 : hostname,
351 : dst_rtp_port,
352 : dst_rtcp_port);
353 4 : }
354 :
355 : MediaIOHandle*
356 628 : SocketPair::createIOContext(const uint16_t mtu)
357 : {
358 : unsigned ip_header_size;
359 628 : if (rtp_sock_)
360 620 : ip_header_size = rtp_sock_->getTransportOverhead();
361 8 : else if (rtpDestAddr_.getFamily() == AF_INET6)
362 0 : ip_header_size = 40;
363 : else
364 8 : ip_header_size = 20;
365 : return new MediaIOHandle(
366 628 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
367 : true,
368 6377 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
369 6397 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
370 : 0,
371 1884 : reinterpret_cast<void*>(this));
372 : }
373 :
374 : int
375 6377 : SocketPair::waitForData()
376 : {
377 : // System sockets
378 6377 : if (rtpHandle_ >= 0) {
379 : int ret;
380 : do {
381 85 : if (interrupted_) {
382 4 : errno = EINTR;
383 4 : return -1;
384 : }
385 :
386 81 : if (not readBlockingMode_) {
387 0 : return 0;
388 : }
389 :
390 : // work with system socket
391 81 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
392 81 : ret = poll(p, 2, NET_POLL_TIMEOUT);
393 81 : if (ret > 0) {
394 0 : ret = 0;
395 0 : if (p[0].revents & POLLIN)
396 0 : ret |= static_cast<int>(DataType::RTP);
397 0 : if (p[1].revents & POLLIN)
398 0 : ret |= static_cast<int>(DataType::RTCP);
399 : }
400 81 : } while (!ret or (ret < 0 and errno == EAGAIN));
401 :
402 0 : return ret;
403 : }
404 :
405 : // work with IceSocket
406 : {
407 6373 : std::unique_lock lk(dataBuffMutex_);
408 6373 : cv_.wait(lk, [this] {
409 12584 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
410 : });
411 6373 : }
412 :
413 6373 : if (interrupted_) {
414 1 : errno = EINTR;
415 1 : return -1;
416 : }
417 :
418 6371 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
419 : }
420 :
421 : int
422 6111 : SocketPair::readRtpData(void* buf, int buf_size)
423 : {
424 : // handle system socket
425 6111 : if (rtpHandle_ >= 0) {
426 : struct sockaddr_storage from;
427 0 : socklen_t from_len = sizeof(from);
428 0 : return static_cast<int>(recvfrom(rtpHandle_,
429 : static_cast<char*>(buf),
430 : buf_size,
431 : 0,
432 : reinterpret_cast<struct sockaddr*>(&from),
433 0 : &from_len));
434 : }
435 :
436 : // handle ICE
437 6111 : std::unique_lock lk(dataBuffMutex_);
438 6111 : if (not rtpDataBuff_.empty()) {
439 5675 : auto pkt = std::move(rtpDataBuff_.front());
440 5675 : rtpDataBuff_.pop_front();
441 5675 : lk.unlock(); // to not block our ICE callbacks
442 5675 : int pkt_size = static_cast<int>(pkt.size());
443 5675 : int len = std::min(pkt_size, buf_size);
444 5675 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
445 5675 : return len;
446 5675 : }
447 :
448 436 : return 0;
449 6111 : }
450 :
451 : int
452 6371 : SocketPair::readRtcpData(void* buf, int buf_size)
453 : {
454 : // handle system socket
455 6371 : if (rtcpHandle_ >= 0) {
456 : struct sockaddr_storage from;
457 0 : socklen_t from_len = sizeof(from);
458 0 : return static_cast<int>(recvfrom(rtcpHandle_,
459 : static_cast<char*>(buf),
460 : buf_size,
461 : 0,
462 : reinterpret_cast<struct sockaddr*>(&from),
463 0 : &from_len));
464 : }
465 :
466 : // handle ICE
467 6371 : std::unique_lock lk(dataBuffMutex_);
468 6372 : if (not rtcpDataBuff_.empty()) {
469 261 : auto pkt = std::move(rtcpDataBuff_.front());
470 261 : rtcpDataBuff_.pop_front();
471 261 : lk.unlock();
472 261 : int pkt_size = static_cast<int>(pkt.size());
473 261 : int len = std::min(pkt_size, buf_size);
474 261 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
475 261 : return len;
476 261 : }
477 :
478 6111 : return 0;
479 6372 : }
480 :
481 : int
482 6377 : SocketPair::readCallback(uint8_t* buf, int buf_size)
483 : {
484 6377 : auto datatype = waitForData();
485 6376 : if (datatype < 0)
486 5 : return datatype;
487 :
488 6371 : int len = 0;
489 6371 : bool fromRTCP = false;
490 :
491 6371 : if (datatype & static_cast<int>(DataType::RTCP)) {
492 6371 : len = readRtcpData(buf, buf_size);
493 6372 : if (len > 0) {
494 261 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
495 : // 201 = RR PT
496 261 : if (header->pt == 201) {
497 10 : lastDLSR_ = Swap4Bytes(header->dlsr);
498 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
499 10 : lastRR_time = std::chrono::steady_clock::now();
500 10 : saveRtcpRRPacket(buf, len);
501 : }
502 : // 206 = REMB PT
503 251 : else if (header->pt == 206)
504 176 : saveRtcpREMBPacket(buf, len);
505 : // 200 = SR PT
506 75 : else if (header->pt == 200) {
507 : // not used yet
508 : } else {
509 0 : unsigned pt = header->pt;
510 0 : JAMI_LOG("Unable to read RTCP: unknown packet type {}", pt);
511 : }
512 261 : fromRTCP = true;
513 : }
514 : }
515 :
516 : // No RTCP… attempt RTP
517 6372 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
518 6111 : len = readRtpData(buf, buf_size);
519 6111 : fromRTCP = false;
520 : }
521 :
522 6372 : if (len <= 0)
523 436 : return len;
524 :
525 5936 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
526 0 : return len;
527 :
528 : // SRTP decrypt
529 5936 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
530 5675 : int32_t gradient = 0;
531 5675 : int32_t deltaT = 0;
532 5675 : float abs = 0.0f;
533 5675 : bool res_parse = false;
534 5675 : bool res_delay = false;
535 :
536 5675 : res_parse = parse_RTP_ext(buf, &abs);
537 5675 : bool marker = (buf[1] & 0x80) >> 7;
538 :
539 5675 : if (res_parse)
540 5675 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
541 :
542 : // rtpDelayCallback_ is not set for audio
543 5675 : if (rtpDelayCallback_ and res_delay)
544 5579 : rtpDelayCallback_(gradient, deltaT);
545 :
546 5675 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
547 5675 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
548 0 : packetLossCallback_();
549 5675 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
550 5675 : if (err < 0)
551 0 : JAMI_WARNING("decrypt error {}", err);
552 : }
553 :
554 5936 : if (len != 0)
555 5936 : return len;
556 : else
557 0 : return AVERROR_EOF;
558 : }
559 :
560 : int
561 5945 : SocketPair::writeData(uint8_t* buf, int buf_size)
562 : {
563 5945 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
564 :
565 : // System sockets?
566 5945 : if (rtpHandle_ >= 0) {
567 : int fd;
568 : dhtnet::IpAddr* dest_addr;
569 :
570 0 : if (isRTCP) {
571 0 : fd = rtcpHandle_;
572 0 : dest_addr = &rtcpDestAddr_;
573 : } else {
574 0 : fd = rtpHandle_;
575 0 : dest_addr = &rtpDestAddr_;
576 : }
577 :
578 0 : auto ret = ff_network_wait_fd(fd);
579 0 : if (ret < 0)
580 0 : return ret;
581 :
582 0 : if (noWrite_)
583 0 : return buf_size;
584 : return static_cast<int>(
585 0 : ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength()));
586 : }
587 :
588 5945 : if (noWrite_)
589 0 : return buf_size;
590 :
591 : // IceSocket
592 5945 : if (isRTCP)
593 261 : return static_cast<int>(rtcp_sock_->send(buf, buf_size));
594 : else
595 5684 : return static_cast<int>(rtp_sock_->send(buf, buf_size));
596 : }
597 :
598 : int
599 5769 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
600 : {
601 5769 : if (noWrite_)
602 0 : return 0;
603 :
604 : int ret;
605 5769 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
606 : unsigned int ts_LSB, ts_MSB;
607 : double currentSRTS, currentLatency;
608 :
609 : // Encrypt?
610 5769 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
611 5684 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
612 : buf,
613 : buf_size,
614 5684 : srtpContext_->encryptbuf,
615 : sizeof(srtpContext_->encryptbuf));
616 5684 : if (buf_size < 0) {
617 0 : JAMI_WARNING("encrypt error {}", buf_size);
618 0 : return buf_size;
619 : }
620 :
621 5684 : buf = srtpContext_->encryptbuf;
622 : }
623 :
624 : // check if we're sending an RR, if so, detect packet loss
625 : // buf_size gives length of buffer, not just header
626 5769 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
627 10 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
628 10 : rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
629 : }
630 :
631 : do {
632 5769 : if (interrupted_)
633 0 : return -EINTR;
634 5769 : ret = writeData(buf, buf_size);
635 5769 : } while (ret < 0 and errno == EAGAIN);
636 :
637 5769 : if (buf[1] == 200) // Sender Report
638 : {
639 75 : auto* header = reinterpret_cast<rtcpSRHeader*>(buf);
640 75 : ts_LSB = Swap4Bytes(header->timestampLSB);
641 75 : ts_MSB = Swap4Bytes(header->timestampMSB);
642 :
643 75 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
644 :
645 75 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
646 14 : if (histoLatency_.size() >= MAX_LIST_SIZE)
647 0 : histoLatency_.pop_front();
648 :
649 14 : currentLatency = (currentSRTS - lastSRTS_) / 2;
650 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
651 14 : histoLatency_.push_back(currentLatency);
652 : }
653 :
654 75 : lastSRTS_ = currentSRTS;
655 :
656 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
657 :
658 5694 : } else if (buf[1] == 201) // Receiver Report
659 : {
660 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
661 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
662 : }
663 :
664 5769 : return ret < 0 ? -errno : ret;
665 : }
666 :
667 : double
668 10 : SocketPair::getLastLatency()
669 : {
670 10 : if (not histoLatency_.empty())
671 0 : return histoLatency_.back();
672 : else
673 10 : return -1;
674 : }
675 :
676 : void
677 142 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
678 : {
679 142 : rtpDelayCallback_ = std::move(cb);
680 142 : }
681 :
682 : bool
683 5675 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
684 : {
685 : // Keep only last packet of each frame
686 5675 : if (not marker) {
687 49 : return false;
688 : }
689 :
690 : // 1st frame
691 5626 : if (lastSendTS_ == 0.0f) {
692 47 : lastSendTS_ = sendTS;
693 47 : lastReceiveTS_ = std::chrono::steady_clock::now();
694 47 : return false;
695 : }
696 :
697 5579 : int32_t deltaS = static_cast<int32_t>((sendTS - lastSendTS_) * 1000); // milliseconds
698 5579 : if (deltaS < 0)
699 4 : deltaS += 64000;
700 5579 : lastSendTS_ = sendTS;
701 :
702 5579 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
703 : auto deltaR = static_cast<int32_t>(
704 5579 : std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count());
705 5579 : lastReceiveTS_ = arrival_TS;
706 :
707 5579 : *gradient = deltaR - deltaS;
708 5579 : *deltaT = deltaR;
709 :
710 5579 : return true;
711 : }
712 :
713 : bool
714 5675 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
715 : {
716 5675 : if (not(buf[0] & 0x10))
717 0 : return false;
718 :
719 5675 : uint16_t magic_word = (buf[12] << 8) + buf[13];
720 5675 : if (magic_word != 0xBEDE)
721 0 : return false;
722 :
723 5675 : uint8_t sec = buf[17] >> 2;
724 5675 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
725 5675 : float milli = static_cast<float>(fract / pow(2, 32));
726 :
727 5675 : *abs = static_cast<float>(sec) + (milli);
728 5675 : return true;
729 : }
730 :
731 : uint16_t
732 140 : SocketPair::lastSeqValOut()
733 : {
734 140 : if (srtpContext_)
735 140 : return srtpContext_->srtp_out.seq_largest;
736 0 : JAMI_ERROR("SRTP context not found.");
737 0 : return 0;
738 : }
739 :
740 : } // namespace jami
|