Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
6 : * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
7 : *
8 : * This program is free software; you can redistribute it and/or modify
9 : * it under the terms of the GNU General Public License as published by
10 : * the Free Software Foundation; either version 3 of the License, or
11 : * (at your option) any later version.
12 : *
13 : * This program is distributed in the hope that it will be useful,
14 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 : * GNU General Public License for more details.
17 : *
18 : * You should have received a copy of the GNU General Public License
19 : * along with this program; if not, write to the Free Software
20 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 : */
22 :
23 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
24 :
25 : #include "libav_deps.h" // THEN THIS ONE AFTER
26 :
27 : #include "socket_pair.h"
28 : #include "libav_utils.h"
29 : #include "logger.h"
30 : #include "connectivity/security/memory.h"
31 :
32 : #include <dhtnet/ice_socket.h>
33 :
34 : #include <iostream>
35 : #include <string>
36 : #include <algorithm>
37 : #include <iterator>
38 :
39 : extern "C" {
40 : #include "srtp.h"
41 : }
42 :
43 : #include <cstring>
44 : #include <stdexcept>
45 : #include <unistd.h>
46 : #include <sys/types.h>
47 : #include <ciso646> // fix windows compiler bug
48 :
49 : #ifdef _WIN32
50 : #define SOCK_NONBLOCK FIONBIO
51 : #define poll WSAPoll
52 : #define close(x) closesocket(x)
53 : #endif
54 :
55 : #ifdef __ANDROID__
56 : #include <asm-generic/fcntl.h>
57 : #define SOCK_NONBLOCK O_NONBLOCK
58 : #endif
59 :
60 : #ifdef __APPLE__
61 : #include <fcntl.h>
62 : #endif
63 :
64 : // Swap 2 byte, 16 bit values:
65 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
66 :
67 : // Swap 4 byte, 32 bit values:
68 : #define Swap4Bytes(val) \
69 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
70 : | (((val) << 24) & 0xFF000000))
71 :
72 : // Swap 8 byte, 64 bit values:
73 : #define Swap8Bytes(val) \
74 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
75 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
76 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
77 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
78 :
79 : namespace jami {
80 :
81 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
82 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
83 : static constexpr auto UDP_HEADER_SIZE = 8;
84 : static constexpr auto SRTP_OVERHEAD = 10;
85 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
86 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
87 :
88 : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
89 :
90 : class SRTPProtoContext
91 : {
92 : public:
93 334 : SRTPProtoContext(const char* out_suite,
94 : const char* out_key,
95 : const char* in_suite,
96 : const char* in_key)
97 334 : {
98 334 : ring_secure_memzero(&srtp_out, sizeof(srtp_out));
99 334 : ring_secure_memzero(&srtp_in, sizeof(srtp_in));
100 334 : if (out_suite && out_key) {
101 : // XXX: see srtp_open from libavformat/srtpproto.c
102 334 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
103 0 : srtp_close();
104 0 : throw std::runtime_error("Could not set crypto on output");
105 : }
106 : }
107 :
108 334 : if (in_suite && in_key) {
109 334 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
110 0 : srtp_close();
111 0 : throw std::runtime_error("Could not set crypto on input");
112 : }
113 : }
114 334 : }
115 :
116 334 : ~SRTPProtoContext() { srtp_close(); }
117 :
118 : SRTPContext srtp_out {};
119 : SRTPContext srtp_in {};
120 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
121 :
122 : private:
123 334 : void srtp_close() noexcept
124 : {
125 334 : ff_srtp_free(&srtp_out);
126 334 : ff_srtp_free(&srtp_in);
127 334 : }
128 : };
129 :
130 : static int
131 0 : ff_network_wait_fd(int fd)
132 : {
133 0 : struct pollfd p = {fd, POLLOUT, 0};
134 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
135 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
136 : }
137 :
138 : static int
139 20 : udp_socket_create(int family, int port)
140 : {
141 20 : int udp_fd = -1;
142 :
143 : #ifdef __APPLE__
144 : udp_fd = socket(family, SOCK_DGRAM, 0);
145 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
146 : close(udp_fd);
147 : udp_fd = -1;
148 : }
149 : #elif defined _WIN32
150 : udp_fd = socket(family, SOCK_DGRAM, 0);
151 : u_long block = 1;
152 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
153 : close(udp_fd);
154 : udp_fd = -1;
155 : }
156 : #else
157 20 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
158 : #endif
159 :
160 20 : if (udp_fd < 0) {
161 0 : JAMI_ERR("socket() failed");
162 0 : strErr();
163 0 : return -1;
164 : }
165 :
166 20 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
167 20 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
168 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
169 0 : close(udp_fd);
170 0 : return -1;
171 : }
172 :
173 20 : bind_addr.setPort(port);
174 20 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
175 20 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
176 0 : JAMI_ERR("bind() failed");
177 0 : strErr();
178 0 : close(udp_fd);
179 0 : udp_fd = -1;
180 : }
181 :
182 20 : return udp_fd;
183 : }
184 :
185 10 : SocketPair::SocketPair(const char* uri, int localPort)
186 : {
187 10 : openSockets(uri, localPort);
188 10 : }
189 :
190 324 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
191 324 : : rtp_sock_(std::move(rtp_sock))
192 648 : , rtcp_sock_(std::move(rtcp_sock))
193 : {
194 324 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
195 : this,
196 : rtp_sock_->getCompId(),
197 : rtcp_sock_->getCompId());
198 :
199 324 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
200 5435 : std::lock_guard l(dataBuffMutex_);
201 5435 : rtpDataBuff_.emplace_back(buf, buf + len);
202 5435 : cv_.notify_one();
203 5435 : return len;
204 5435 : });
205 324 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
206 250 : std::lock_guard l(dataBuffMutex_);
207 250 : rtcpDataBuff_.emplace_back(buf, buf + len);
208 250 : cv_.notify_one();
209 250 : return len;
210 250 : });
211 324 : }
212 :
213 334 : SocketPair::~SocketPair()
214 : {
215 334 : interrupt();
216 334 : closeSockets();
217 334 : JAMI_DBG("[%p] Instance destroyed", this);
218 334 : }
219 :
220 : bool
221 731 : SocketPair::waitForRTCP(std::chrono::seconds interval)
222 : {
223 731 : std::unique_lock lock(rtcpInfo_mutex_);
224 731 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
225 2048 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
226 1462 : });
227 731 : }
228 :
229 : void
230 10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
231 : {
232 10 : if (len < sizeof(rtcpRRHeader))
233 0 : return;
234 :
235 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
236 10 : if (header->pt != 201) // 201 = RR PT
237 0 : return;
238 :
239 10 : std::lock_guard lock(rtcpInfo_mutex_);
240 :
241 10 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
242 0 : listRtcpRRHeader_.pop_front();
243 : }
244 :
245 10 : listRtcpRRHeader_.emplace_back(*header);
246 :
247 10 : cvRtcpPacketReadyToRead_.notify_one();
248 10 : }
249 :
250 : void
251 166 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
252 : {
253 166 : if (len < sizeof(rtcpREMBHeader))
254 0 : return;
255 :
256 166 : auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
257 166 : if (header->pt != 206) // 206 = REMB PT
258 0 : return;
259 :
260 166 : if (header->uid != 0x424D4552) // uid must be "REMB"
261 0 : return;
262 :
263 166 : std::lock_guard lock(rtcpInfo_mutex_);
264 :
265 166 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
266 0 : listRtcpREMBHeader_.pop_front();
267 : }
268 :
269 166 : listRtcpREMBHeader_.push_back(*header);
270 :
271 166 : cvRtcpPacketReadyToRead_.notify_one();
272 166 : }
273 :
274 : std::list<rtcpRRHeader>
275 731 : SocketPair::getRtcpRR()
276 : {
277 731 : std::lock_guard lock(rtcpInfo_mutex_);
278 1462 : return std::move(listRtcpRRHeader_);
279 731 : }
280 :
281 : std::list<rtcpREMBHeader>
282 397 : SocketPair::getRtcpREMB()
283 : {
284 397 : std::lock_guard lock(rtcpInfo_mutex_);
285 794 : return std::move(listRtcpREMBHeader_);
286 397 : }
287 :
288 : void
289 334 : SocketPair::createSRTP(const char* out_suite,
290 : const char* out_key,
291 : const char* in_suite,
292 : const char* in_key)
293 : {
294 334 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
295 334 : }
296 :
297 : void
298 668 : SocketPair::interrupt()
299 : {
300 668 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
301 668 : interrupted_ = true;
302 668 : if (rtp_sock_)
303 648 : rtp_sock_->setOnRecv(nullptr);
304 668 : if (rtcp_sock_)
305 648 : rtcp_sock_->setOnRecv(nullptr);
306 668 : cv_.notify_all();
307 668 : cvRtcpPacketReadyToRead_.notify_all();
308 668 : }
309 :
310 : void
311 968 : SocketPair::setReadBlockingMode(bool block)
312 : {
313 968 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
314 968 : readBlockingMode_ = block;
315 968 : cv_.notify_all();
316 968 : cvRtcpPacketReadyToRead_.notify_all();
317 968 : }
318 :
319 : void
320 972 : SocketPair::stopSendOp(bool state)
321 : {
322 972 : noWrite_ = state;
323 972 : }
324 :
325 : void
326 334 : SocketPair::closeSockets()
327 : {
328 334 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
329 0 : strErr();
330 334 : if (rtpHandle_ > 0 and close(rtpHandle_))
331 0 : strErr();
332 334 : }
333 :
334 : void
335 10 : SocketPair::openSockets(const char* uri, int local_rtp_port)
336 : {
337 10 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
338 :
339 : char hostname[256];
340 : char path[1024];
341 : int dst_rtp_port;
342 :
343 10 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
344 :
345 10 : const int local_rtcp_port = local_rtp_port + 1;
346 10 : const int dst_rtcp_port = dst_rtp_port + 1;
347 :
348 10 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
349 10 : rtpDestAddr_.setPort(dst_rtp_port);
350 10 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
351 10 : rtcpDestAddr_.setPort(dst_rtcp_port);
352 :
353 : // Open local sockets (RTP/RTCP)
354 10 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
355 10 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
356 0 : closeSockets();
357 0 : JAMI_ERR("[%p] Sockets creation failed", this);
358 0 : throw std::runtime_error("Sockets creation failed");
359 : }
360 :
361 10 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
362 : local_rtp_port,
363 : local_rtcp_port,
364 : hostname,
365 : dst_rtp_port,
366 : dst_rtcp_port);
367 10 : }
368 :
369 : MediaIOHandle*
370 669 : SocketPair::createIOContext(const uint16_t mtu)
371 : {
372 : unsigned ip_header_size;
373 669 : if (rtp_sock_)
374 652 : ip_header_size = rtp_sock_->getTransportOverhead();
375 17 : else if (rtpDestAddr_.getFamily() == AF_INET6)
376 0 : ip_header_size = 40;
377 : else
378 17 : ip_header_size = 20;
379 : return new MediaIOHandle(
380 669 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
381 : true,
382 6125 : [](void* sp, uint8_t* buf, int len) {
383 6125 : return static_cast<SocketPair*>(sp)->readCallback(buf, len);
384 : },
385 5521 : [](void* sp, uint8_t* buf, int len) {
386 5521 : return static_cast<SocketPair*>(sp)->writeCallback(buf, len);
387 : },
388 : 0,
389 669 : reinterpret_cast<void*>(this));
390 : }
391 :
392 : int
393 6125 : SocketPair::waitForData()
394 : {
395 : // System sockets
396 6125 : if (rtpHandle_ >= 0) {
397 : int ret;
398 : do {
399 129 : if (interrupted_) {
400 7 : errno = EINTR;
401 9 : return -1;
402 : }
403 :
404 122 : if (not readBlockingMode_) {
405 2 : return 0;
406 : }
407 :
408 : // work with system socket
409 120 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
410 120 : ret = poll(p, 2, NET_POLL_TIMEOUT);
411 120 : if (ret > 0) {
412 0 : ret = 0;
413 0 : if (p[0].revents & POLLIN)
414 0 : ret |= static_cast<int>(DataType::RTP);
415 0 : if (p[1].revents & POLLIN)
416 0 : ret |= static_cast<int>(DataType::RTCP);
417 : }
418 120 : } while (!ret or (ret < 0 and errno == EAGAIN));
419 :
420 0 : return ret;
421 : }
422 :
423 : // work with IceSocket
424 : {
425 6116 : std::unique_lock lk(dataBuffMutex_);
426 6116 : cv_.wait(lk, [this] {
427 24017 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty()
428 24017 : or not readBlockingMode_;
429 : });
430 6116 : }
431 :
432 6116 : if (interrupted_) {
433 60 : errno = EINTR;
434 60 : return -1;
435 : }
436 :
437 6056 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
438 : }
439 :
440 : int
441 5806 : SocketPair::readRtpData(void* buf, int buf_size)
442 : {
443 : // handle system socket
444 5806 : if (rtpHandle_ >= 0) {
445 : struct sockaddr_storage from;
446 0 : socklen_t from_len = sizeof(from);
447 0 : return recvfrom(rtpHandle_,
448 : static_cast<char*>(buf),
449 : buf_size,
450 : 0,
451 : reinterpret_cast<struct sockaddr*>(&from),
452 0 : &from_len);
453 : }
454 :
455 : // handle ICE
456 5806 : std::unique_lock lk(dataBuffMutex_);
457 5806 : if (not rtpDataBuff_.empty()) {
458 5433 : auto pkt = std::move(rtpDataBuff_.front());
459 5433 : rtpDataBuff_.pop_front();
460 5433 : lk.unlock(); // to not block our ICE callbacks
461 5433 : int pkt_size = pkt.size();
462 5433 : int len = std::min(pkt_size, buf_size);
463 5433 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
464 5433 : return len;
465 5433 : }
466 :
467 373 : return 0;
468 5806 : }
469 :
470 : int
471 6056 : SocketPair::readRtcpData(void* buf, int buf_size)
472 : {
473 : // handle system socket
474 6056 : if (rtcpHandle_ >= 0) {
475 : struct sockaddr_storage from;
476 0 : socklen_t from_len = sizeof(from);
477 0 : return recvfrom(rtcpHandle_,
478 : static_cast<char*>(buf),
479 : buf_size,
480 : 0,
481 : reinterpret_cast<struct sockaddr*>(&from),
482 0 : &from_len);
483 : }
484 :
485 : // handle ICE
486 6056 : std::unique_lock lk(dataBuffMutex_);
487 6056 : if (not rtcpDataBuff_.empty()) {
488 250 : auto pkt = std::move(rtcpDataBuff_.front());
489 250 : rtcpDataBuff_.pop_front();
490 250 : lk.unlock();
491 250 : int pkt_size = pkt.size();
492 250 : int len = std::min(pkt_size, buf_size);
493 250 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
494 250 : return len;
495 250 : }
496 :
497 5806 : return 0;
498 6056 : }
499 :
500 : int
501 6125 : SocketPair::readCallback(uint8_t* buf, int buf_size)
502 : {
503 6125 : auto datatype = waitForData();
504 6125 : if (datatype < 0)
505 67 : return datatype;
506 :
507 6058 : int len = 0;
508 6058 : bool fromRTCP = false;
509 :
510 6058 : if (datatype & static_cast<int>(DataType::RTCP)) {
511 6056 : len = readRtcpData(buf, buf_size);
512 6056 : if (len > 0) {
513 250 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
514 : // 201 = RR PT
515 250 : if (header->pt == 201) {
516 10 : lastDLSR_ = Swap4Bytes(header->dlsr);
517 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
518 10 : lastRR_time = std::chrono::steady_clock::now();
519 10 : saveRtcpRRPacket(buf, len);
520 : }
521 : // 206 = REMB PT
522 240 : else if (header->pt == 206)
523 166 : saveRtcpREMBPacket(buf, len);
524 : // 200 = SR PT
525 74 : else if (header->pt == 200) {
526 : // not used yet
527 : } else {
528 0 : JAMI_DBG("Can't read RTCP: unknown packet type %u", header->pt);
529 : }
530 250 : fromRTCP = true;
531 : }
532 : }
533 :
534 : // No RTCP... try RTP
535 6058 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
536 5806 : len = readRtpData(buf, buf_size);
537 5806 : fromRTCP = false;
538 : }
539 :
540 6058 : if (len <= 0)
541 375 : return len;
542 :
543 5683 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
544 0 : return len;
545 :
546 : // SRTP decrypt
547 5683 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
548 5433 : int32_t gradient = 0;
549 5433 : int32_t deltaT = 0;
550 5433 : float abs = 0.0f;
551 5433 : bool res_parse = false;
552 5433 : bool res_delay = false;
553 :
554 5433 : res_parse = parse_RTP_ext(buf, &abs);
555 5433 : bool marker = (buf[1] & 0x80) >> 7;
556 :
557 5433 : if (res_parse)
558 5433 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
559 :
560 : // rtpDelayCallback_ is not set for audio
561 5433 : if (rtpDelayCallback_ and res_delay)
562 5339 : rtpDelayCallback_(gradient, deltaT);
563 :
564 5433 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
565 5433 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
566 0 : packetLossCallback_();
567 5433 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
568 5433 : if (err < 0)
569 0 : JAMI_WARN("decrypt error %d", err);
570 : }
571 :
572 5683 : if (len != 0)
573 5683 : return len;
574 : else
575 0 : return AVERROR_EOF;
576 : }
577 :
578 : int
579 5687 : SocketPair::writeData(uint8_t* buf, int buf_size)
580 : {
581 5687 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
582 :
583 : // System sockets?
584 5687 : if (rtpHandle_ >= 0) {
585 : int fd;
586 : dhtnet::IpAddr* dest_addr;
587 :
588 0 : if (isRTCP) {
589 0 : fd = rtcpHandle_;
590 0 : dest_addr = &rtcpDestAddr_;
591 : } else {
592 0 : fd = rtpHandle_;
593 0 : dest_addr = &rtpDestAddr_;
594 : }
595 :
596 0 : auto ret = ff_network_wait_fd(fd);
597 0 : if (ret < 0)
598 0 : return ret;
599 :
600 0 : if (noWrite_)
601 0 : return buf_size;
602 0 : return ::sendto(fd,
603 : reinterpret_cast<const char*>(buf),
604 : buf_size,
605 : 0,
606 : *dest_addr,
607 0 : dest_addr->getLength());
608 : }
609 :
610 5687 : if (noWrite_)
611 0 : return buf_size;
612 :
613 : // IceSocket
614 5687 : if (isRTCP)
615 250 : return rtcp_sock_->send(buf, buf_size);
616 : else
617 5437 : return rtp_sock_->send(buf, buf_size);
618 : }
619 :
620 : int
621 5521 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
622 : {
623 5521 : if (noWrite_)
624 0 : return 0;
625 :
626 : int ret;
627 5521 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
628 : unsigned int ts_LSB, ts_MSB;
629 : double currentSRTS, currentLatency;
630 :
631 : // Encrypt?
632 5521 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
633 5437 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
634 : buf,
635 : buf_size,
636 5437 : srtpContext_->encryptbuf,
637 : sizeof(srtpContext_->encryptbuf));
638 5437 : if (buf_size < 0) {
639 0 : JAMI_WARN("encrypt error %d", buf_size);
640 0 : return buf_size;
641 : }
642 :
643 5437 : buf = srtpContext_->encryptbuf;
644 : }
645 :
646 : // check if we're sending an RR, if so, detect packet loss
647 : // buf_size gives length of buffer, not just header
648 5521 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
649 10 : auto header = reinterpret_cast<rtcpRRHeader*>(buf);
650 10 : rtcpPacketLoss_ = (header->pt == 201
651 10 : && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
652 : }
653 :
654 : do {
655 5521 : if (interrupted_)
656 0 : return -EINTR;
657 5521 : ret = writeData(buf, buf_size);
658 5521 : } while (ret < 0 and errno == EAGAIN);
659 :
660 5521 : if (buf[1] == 200) // Sender Report
661 : {
662 74 : auto header = reinterpret_cast<rtcpSRHeader*>(buf);
663 74 : ts_LSB = Swap4Bytes(header->timestampLSB);
664 74 : ts_MSB = Swap4Bytes(header->timestampMSB);
665 :
666 74 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
667 :
668 74 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
669 15 : if (histoLatency_.size() >= MAX_LIST_SIZE)
670 0 : histoLatency_.pop_front();
671 :
672 15 : currentLatency = (currentSRTS - lastSRTS_) / 2;
673 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
674 15 : histoLatency_.push_back(currentLatency);
675 : }
676 :
677 74 : lastSRTS_ = currentSRTS;
678 :
679 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
680 :
681 5447 : } else if (buf[1] == 201) // Receiver Report
682 : {
683 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
684 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
685 : }
686 :
687 5521 : return ret < 0 ? -errno : ret;
688 : }
689 :
690 : double
691 10 : SocketPair::getLastLatency()
692 : {
693 10 : if (not histoLatency_.empty())
694 0 : return histoLatency_.back();
695 : else
696 10 : return -1;
697 : }
698 :
699 : void
700 147 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
701 : {
702 147 : rtpDelayCallback_ = std::move(cb);
703 147 : }
704 :
705 : bool
706 5433 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
707 : {
708 : // Keep only last packet of each frame
709 5433 : if (not marker) {
710 48 : return 0;
711 : }
712 :
713 : // 1st frame
714 5385 : if (not lastSendTS_) {
715 46 : lastSendTS_ = sendTS;
716 46 : lastReceiveTS_ = std::chrono::steady_clock::now();
717 46 : return 0;
718 : }
719 :
720 5339 : int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
721 5339 : if (deltaS < 0)
722 4 : deltaS += 64000;
723 5339 : lastSendTS_ = sendTS;
724 :
725 5339 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
726 5339 : auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_)
727 5339 : .count();
728 5339 : lastReceiveTS_ = arrival_TS;
729 :
730 5339 : *gradient = deltaR - deltaS;
731 5339 : *deltaT = deltaR;
732 :
733 5339 : return true;
734 : }
735 :
736 : bool
737 5433 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
738 : {
739 5433 : if (not(buf[0] & 0x10))
740 0 : return false;
741 :
742 5433 : uint16_t magic_word = (buf[12] << 8) + buf[13];
743 5433 : if (magic_word != 0xBEDE)
744 0 : return false;
745 :
746 5433 : uint8_t sec = buf[17] >> 2;
747 5433 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
748 5433 : float milli = fract / pow(2, 32);
749 :
750 5433 : *abs = sec + (milli);
751 5433 : return true;
752 : }
753 :
754 : uint16_t
755 152 : SocketPair::lastSeqValOut()
756 : {
757 152 : if (srtpContext_)
758 152 : return srtpContext_->srtp_out.seq_largest;
759 0 : JAMI_ERR("SRTP context not found.");
760 0 : return 0;
761 : }
762 :
763 : } // namespace jami
|