Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : * Copyright (c) 2007 The FFmpeg Project
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU General Public License as published by
7 : * the Free Software Foundation, either version 3 of the License, or
8 : * (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
20 :
21 : #include "libav_deps.h" // THEN THIS ONE AFTER
22 :
23 : #include "socket_pair.h"
24 : #include "logger.h"
25 : #include "connectivity/security/memory.h"
26 :
27 : #include <dhtnet/ice_socket.h>
28 :
29 : #include <string>
30 : #include <algorithm>
31 :
32 : extern "C" {
33 : #include "srtp.h"
34 : }
35 :
36 : #include <cstring>
37 : #include <stdexcept>
38 : #include <unistd.h>
39 : #include <sys/types.h>
40 :
41 : #ifdef _WIN32
42 : #define SOCK_NONBLOCK FIONBIO
43 : #define poll WSAPoll
44 : #define close(x) closesocket(x)
45 : #endif
46 :
47 : #ifdef __ANDROID__
48 : #include <asm-generic/fcntl.h>
49 : #define SOCK_NONBLOCK O_NONBLOCK
50 : #endif
51 :
52 : #ifdef __APPLE__
53 : #include <fcntl.h>
54 : #endif
55 :
56 : // Swap 2 byte, 16 bit values:
57 : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
58 :
59 : // Swap 4 byte, 32 bit values:
60 : #define Swap4Bytes(val) \
61 : ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
62 : | (((val) << 24) & 0xFF000000))
63 :
64 : // Swap 8 byte, 64 bit values:
65 : #define Swap8Bytes(val) \
66 : ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
67 : | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
68 : | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
69 : | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
70 :
71 : namespace jami {
72 :
73 : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
74 : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
75 : static constexpr auto UDP_HEADER_SIZE = 8;
76 : static constexpr auto SRTP_OVERHEAD = 10;
77 : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
78 : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
79 :
80 : enum class DataType : uint8_t { RTP = 1 << 0, RTCP = 1 << 1 };
81 :
82 : class SRTPProtoContext
83 : {
84 : public:
85 122 : SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
86 122 : {
87 122 : jami_secure_memzero(&srtp_out, sizeof(srtp_out));
88 122 : jami_secure_memzero(&srtp_in, sizeof(srtp_in));
89 122 : if (out_suite && out_key) {
90 : // XXX: see srtp_open from libavformat/srtpproto.c
91 122 : if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
92 0 : srtp_close();
93 0 : throw std::runtime_error("Unable to set crypto on output");
94 : }
95 : }
96 :
97 122 : if (in_suite && in_key) {
98 122 : if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
99 0 : srtp_close();
100 0 : throw std::runtime_error("Unable to set crypto on input");
101 : }
102 : }
103 122 : }
104 :
105 122 : ~SRTPProtoContext() { srtp_close(); }
106 :
107 : SRTPContext srtp_out {};
108 : SRTPContext srtp_in {};
109 : uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
110 :
111 : private:
112 122 : void srtp_close() noexcept
113 : {
114 122 : ff_srtp_free(&srtp_out);
115 122 : ff_srtp_free(&srtp_in);
116 122 : }
117 : };
118 :
119 : static int
120 0 : ff_network_wait_fd(int fd)
121 : {
122 0 : struct pollfd p = {fd, POLLOUT, 0};
123 0 : auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
124 0 : return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
125 : }
126 :
127 : static int
128 8 : udp_socket_create(int family, int port)
129 : {
130 8 : int udp_fd = -1;
131 :
132 : #ifdef __APPLE__
133 : udp_fd = socket(family, SOCK_DGRAM, 0);
134 : if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
135 : close(udp_fd);
136 : udp_fd = -1;
137 : }
138 : #elif defined _WIN32
139 : udp_fd = socket(family, SOCK_DGRAM, 0);
140 : u_long block = 1;
141 : if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
142 : close(udp_fd);
143 : udp_fd = -1;
144 : }
145 : #else
146 8 : udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
147 : #endif
148 :
149 8 : if (udp_fd < 0) {
150 0 : JAMI_ERR("socket() failed");
151 0 : strErr();
152 0 : return -1;
153 : }
154 :
155 8 : auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
156 8 : if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
157 0 : JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
158 0 : close(udp_fd);
159 0 : return -1;
160 : }
161 :
162 8 : bind_addr.setPort(port);
163 8 : JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
164 8 : if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
165 0 : JAMI_ERR("bind() failed");
166 0 : strErr();
167 0 : close(udp_fd);
168 0 : udp_fd = -1;
169 : }
170 :
171 8 : return udp_fd;
172 : }
173 :
174 4 : SocketPair::SocketPair(const char* uri, int localPort)
175 : {
176 4 : openSockets(uri, localPort);
177 4 : }
178 :
179 116 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
180 118 : : rtp_sock_(std::move(rtp_sock))
181 234 : , rtcp_sock_(std::move(rtcp_sock))
182 : {
183 118 : JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
184 : this,
185 : rtp_sock_->getCompId(),
186 : rtcp_sock_->getCompId());
187 :
188 118 : rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
189 5 : std::lock_guard l(dataBuffMutex_);
190 5 : rtpDataBuff_.emplace_back(buf, buf + len);
191 5 : cv_.notify_one();
192 5 : return len;
193 5 : });
194 118 : rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
195 1 : std::lock_guard l(dataBuffMutex_);
196 1 : rtcpDataBuff_.emplace_back(buf, buf + len);
197 1 : cv_.notify_one();
198 1 : return len;
199 1 : });
200 117 : }
201 :
202 122 : SocketPair::~SocketPair()
203 : {
204 122 : interrupt();
205 122 : closeSockets();
206 122 : JAMI_DBG("[%p] Instance destroyed", this);
207 122 : }
208 :
209 : bool
210 162 : SocketPair::waitForRTCP(std::chrono::seconds interval)
211 : {
212 162 : std::unique_lock lock(rtcpInfo_mutex_);
213 162 : return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
214 549 : return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
215 324 : });
216 162 : }
217 :
218 : void
219 0 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
220 : {
221 0 : if (len < sizeof(rtcpRRHeader))
222 0 : return;
223 :
224 0 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
225 0 : if (header->pt != 201) // 201 = RR PT
226 0 : return;
227 :
228 0 : std::lock_guard lock(rtcpInfo_mutex_);
229 :
230 0 : if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
231 0 : listRtcpRRHeader_.pop_front();
232 : }
233 :
234 0 : listRtcpRRHeader_.emplace_back(*header);
235 :
236 0 : cvRtcpPacketReadyToRead_.notify_one();
237 0 : }
238 :
239 : void
240 0 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
241 : {
242 0 : if (len < sizeof(rtcpREMBHeader))
243 0 : return;
244 :
245 0 : auto* header = reinterpret_cast<rtcpREMBHeader*>(buf);
246 0 : if (header->pt != 206) // 206 = REMB PT
247 0 : return;
248 :
249 0 : if (header->uid != 0x424D4552) // uid must be "REMB"
250 0 : return;
251 :
252 0 : std::lock_guard lock(rtcpInfo_mutex_);
253 :
254 0 : if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
255 0 : listRtcpREMBHeader_.pop_front();
256 : }
257 :
258 0 : listRtcpREMBHeader_.push_back(*header);
259 :
260 0 : cvRtcpPacketReadyToRead_.notify_one();
261 0 : }
262 :
263 : std::list<rtcpRRHeader>
264 162 : SocketPair::getRtcpRR()
265 : {
266 162 : std::lock_guard lock(rtcpInfo_mutex_);
267 324 : return std::move(listRtcpRRHeader_);
268 162 : }
269 :
270 : std::list<rtcpREMBHeader>
271 60 : SocketPair::getRtcpREMB()
272 : {
273 60 : std::lock_guard lock(rtcpInfo_mutex_);
274 120 : return std::move(listRtcpREMBHeader_);
275 60 : }
276 :
277 : void
278 122 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
279 : {
280 122 : srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
281 122 : }
282 :
283 : void
284 244 : SocketPair::interrupt()
285 : {
286 244 : JAMI_WARN("[%p] Interrupting RTP sockets", this);
287 244 : interrupted_ = true;
288 244 : if (rtp_sock_)
289 236 : rtp_sock_->setOnRecv(nullptr);
290 244 : if (rtcp_sock_)
291 236 : rtcp_sock_->setOnRecv(nullptr);
292 244 : cv_.notify_all();
293 244 : cvRtcpPacketReadyToRead_.notify_all();
294 244 : }
295 :
296 : void
297 328 : SocketPair::setReadBlockingMode(bool block)
298 : {
299 328 : JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
300 328 : readBlockingMode_ = block;
301 328 : cv_.notify_all();
302 328 : cvRtcpPacketReadyToRead_.notify_all();
303 328 : }
304 :
305 : void
306 313 : SocketPair::stopSendOp(bool state)
307 : {
308 313 : noWrite_ = state;
309 313 : }
310 :
311 : void
312 122 : SocketPair::closeSockets()
313 : {
314 122 : if (rtcpHandle_ > 0 and close(rtcpHandle_))
315 0 : strErr();
316 122 : if (rtpHandle_ > 0 and close(rtpHandle_))
317 0 : strErr();
318 122 : }
319 :
320 : void
321 4 : SocketPair::openSockets(const char* uri, int local_rtp_port)
322 : {
323 4 : JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
324 :
325 : char hostname[256];
326 : char path[1024];
327 : int dst_rtp_port;
328 :
329 4 : av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
330 :
331 4 : const int local_rtcp_port = local_rtp_port + 1;
332 4 : const int dst_rtcp_port = dst_rtp_port + 1;
333 :
334 4 : rtpDestAddr_ = dhtnet::IpAddr {hostname};
335 4 : rtpDestAddr_.setPort(dst_rtp_port);
336 4 : rtcpDestAddr_ = dhtnet::IpAddr {hostname};
337 4 : rtcpDestAddr_.setPort(dst_rtcp_port);
338 :
339 : // Open local sockets (RTP/RTCP)
340 4 : if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
341 4 : or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
342 0 : closeSockets();
343 0 : JAMI_ERR("[%p] Sockets creation failed", this);
344 0 : throw std::runtime_error("Sockets creation failed");
345 : }
346 :
347 4 : JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
348 : local_rtp_port,
349 : local_rtcp_port,
350 : hostname,
351 : dst_rtp_port,
352 : dst_rtcp_port);
353 4 : }
354 :
355 : MediaIOHandle*
356 223 : SocketPair::createIOContext(const uint16_t mtu)
357 : {
358 : unsigned ip_header_size;
359 223 : if (rtp_sock_)
360 215 : ip_header_size = rtp_sock_->getTransportOverhead();
361 8 : else if (rtpDestAddr_.getFamily() == AF_INET6)
362 0 : ip_header_size = 40;
363 : else
364 8 : ip_header_size = 20;
365 : return new MediaIOHandle(
366 223 : mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
367 : true,
368 165 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
369 6 : [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
370 : 0,
371 223 : reinterpret_cast<void*>(this));
372 : }
373 :
374 : int
375 165 : SocketPair::waitForData()
376 : {
377 : // System sockets
378 165 : if (rtpHandle_ >= 0) {
379 : int ret;
380 : do {
381 85 : if (interrupted_) {
382 4 : errno = EINTR;
383 4 : return -1;
384 : }
385 :
386 81 : if (not readBlockingMode_) {
387 0 : return 0;
388 : }
389 :
390 : // work with system socket
391 81 : struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
392 81 : ret = poll(p, 2, NET_POLL_TIMEOUT);
393 81 : if (ret > 0) {
394 0 : ret = 0;
395 0 : if (p[0].revents & POLLIN)
396 0 : ret |= static_cast<int>(DataType::RTP);
397 0 : if (p[1].revents & POLLIN)
398 0 : ret |= static_cast<int>(DataType::RTCP);
399 : }
400 81 : } while (!ret or (ret < 0 and errno == EAGAIN));
401 :
402 0 : return ret;
403 : }
404 :
405 : // work with IceSocket
406 : {
407 161 : std::unique_lock lk(dataBuffMutex_);
408 161 : cv_.wait(lk, [this] {
409 288 : return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
410 : });
411 161 : }
412 :
413 161 : if (interrupted_) {
414 2 : errno = EINTR;
415 2 : return -1;
416 : }
417 :
418 159 : return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
419 : }
420 :
421 : int
422 158 : SocketPair::readRtpData(void* buf, int buf_size)
423 : {
424 : // handle system socket
425 158 : if (rtpHandle_ >= 0) {
426 : struct sockaddr_storage from;
427 0 : socklen_t from_len = sizeof(from);
428 0 : return static_cast<int>(recvfrom(rtpHandle_,
429 : static_cast<char*>(buf),
430 : buf_size,
431 : 0,
432 : reinterpret_cast<struct sockaddr*>(&from),
433 0 : &from_len));
434 : }
435 :
436 : // handle ICE
437 158 : std::unique_lock lk(dataBuffMutex_);
438 158 : if (not rtpDataBuff_.empty()) {
439 5 : auto pkt = std::move(rtpDataBuff_.front());
440 5 : rtpDataBuff_.pop_front();
441 5 : lk.unlock(); // to not block our ICE callbacks
442 5 : int pkt_size = static_cast<int>(pkt.size());
443 5 : int len = std::min(pkt_size, buf_size);
444 5 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
445 5 : return len;
446 5 : }
447 :
448 153 : return 0;
449 158 : }
450 :
451 : int
452 159 : SocketPair::readRtcpData(void* buf, int buf_size)
453 : {
454 : // handle system socket
455 159 : if (rtcpHandle_ >= 0) {
456 : struct sockaddr_storage from;
457 0 : socklen_t from_len = sizeof(from);
458 0 : return static_cast<int>(recvfrom(rtcpHandle_,
459 : static_cast<char*>(buf),
460 : buf_size,
461 : 0,
462 : reinterpret_cast<struct sockaddr*>(&from),
463 0 : &from_len));
464 : }
465 :
466 : // handle ICE
467 159 : std::unique_lock lk(dataBuffMutex_);
468 159 : if (not rtcpDataBuff_.empty()) {
469 1 : auto pkt = std::move(rtcpDataBuff_.front());
470 1 : rtcpDataBuff_.pop_front();
471 1 : lk.unlock();
472 1 : int pkt_size = static_cast<int>(pkt.size());
473 1 : int len = std::min(pkt_size, buf_size);
474 1 : std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
475 1 : return len;
476 1 : }
477 :
478 158 : return 0;
479 159 : }
480 :
481 : int
482 165 : SocketPair::readCallback(uint8_t* buf, int buf_size)
483 : {
484 165 : auto datatype = waitForData();
485 165 : if (datatype < 0)
486 6 : return datatype;
487 :
488 159 : int len = 0;
489 159 : bool fromRTCP = false;
490 :
491 159 : if (datatype & static_cast<int>(DataType::RTCP)) {
492 159 : len = readRtcpData(buf, buf_size);
493 159 : if (len > 0) {
494 1 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
495 : // 201 = RR PT
496 1 : if (header->pt == 201) {
497 0 : lastDLSR_ = Swap4Bytes(header->dlsr);
498 : // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
499 0 : lastRR_time = std::chrono::steady_clock::now();
500 0 : saveRtcpRRPacket(buf, len);
501 : }
502 : // 206 = REMB PT
503 1 : else if (header->pt == 206)
504 0 : saveRtcpREMBPacket(buf, len);
505 : // 200 = SR PT
506 1 : else if (header->pt == 200) {
507 : // not used yet
508 : } else {
509 0 : JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
510 : }
511 1 : fromRTCP = true;
512 : }
513 : }
514 :
515 : // No RTCP… attempt RTP
516 159 : if (!len and (datatype & static_cast<int>(DataType::RTP))) {
517 158 : len = readRtpData(buf, buf_size);
518 158 : fromRTCP = false;
519 : }
520 :
521 159 : if (len <= 0)
522 153 : return len;
523 :
524 6 : if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
525 0 : return len;
526 :
527 : // SRTP decrypt
528 6 : if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
529 5 : int32_t gradient = 0;
530 5 : int32_t deltaT = 0;
531 5 : float abs = 0.0f;
532 5 : bool res_parse = false;
533 5 : bool res_delay = false;
534 :
535 5 : res_parse = parse_RTP_ext(buf, &abs);
536 5 : bool marker = (buf[1] & 0x80) >> 7;
537 :
538 5 : if (res_parse)
539 5 : res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
540 :
541 : // rtpDelayCallback_ is not set for audio
542 5 : if (rtpDelayCallback_ and res_delay)
543 3 : rtpDelayCallback_(gradient, deltaT);
544 :
545 5 : auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
546 5 : if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
547 0 : packetLossCallback_();
548 5 : lastSeqNumIn_ = buf[2] << 8 | buf[3];
549 5 : if (err < 0)
550 0 : JAMI_WARN("decrypt error %d", err);
551 : }
552 :
553 6 : if (len != 0)
554 6 : return len;
555 : else
556 0 : return AVERROR_EOF;
557 : }
558 :
559 : int
560 6 : SocketPair::writeData(uint8_t* buf, int buf_size)
561 : {
562 6 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
563 :
564 : // System sockets?
565 6 : if (rtpHandle_ >= 0) {
566 : int fd;
567 : dhtnet::IpAddr* dest_addr;
568 :
569 0 : if (isRTCP) {
570 0 : fd = rtcpHandle_;
571 0 : dest_addr = &rtcpDestAddr_;
572 : } else {
573 0 : fd = rtpHandle_;
574 0 : dest_addr = &rtpDestAddr_;
575 : }
576 :
577 0 : auto ret = ff_network_wait_fd(fd);
578 0 : if (ret < 0)
579 0 : return ret;
580 :
581 0 : if (noWrite_)
582 0 : return buf_size;
583 : return static_cast<int>(
584 0 : ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength()));
585 : }
586 :
587 6 : if (noWrite_)
588 0 : return buf_size;
589 :
590 : // IceSocket
591 6 : if (isRTCP)
592 1 : return static_cast<int>(rtcp_sock_->send(buf, buf_size));
593 : else
594 5 : return static_cast<int>(rtp_sock_->send(buf, buf_size));
595 : }
596 :
597 : int
598 6 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
599 : {
600 6 : if (noWrite_)
601 0 : return 0;
602 :
603 : int ret;
604 6 : bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
605 : unsigned int ts_LSB, ts_MSB;
606 : double currentSRTS, currentLatency;
607 :
608 : // Encrypt?
609 6 : if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
610 5 : buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
611 : buf,
612 : buf_size,
613 5 : srtpContext_->encryptbuf,
614 : sizeof(srtpContext_->encryptbuf));
615 5 : if (buf_size < 0) {
616 0 : JAMI_WARN("encrypt error %d", buf_size);
617 0 : return buf_size;
618 : }
619 :
620 5 : buf = srtpContext_->encryptbuf;
621 : }
622 :
623 : // check if we're sending an RR, if so, detect packet loss
624 : // buf_size gives length of buffer, not just header
625 6 : if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
626 0 : auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
627 0 : rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
628 : }
629 :
630 : do {
631 6 : if (interrupted_)
632 0 : return -EINTR;
633 6 : ret = writeData(buf, buf_size);
634 6 : } while (ret < 0 and errno == EAGAIN);
635 :
636 6 : if (buf[1] == 200) // Sender Report
637 : {
638 1 : auto* header = reinterpret_cast<rtcpSRHeader*>(buf);
639 1 : ts_LSB = Swap4Bytes(header->timestampLSB);
640 1 : ts_MSB = Swap4Bytes(header->timestampMSB);
641 :
642 1 : currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
643 :
644 1 : if (lastSRTS_ != 0 && lastDLSR_ != 0) {
645 0 : if (histoLatency_.size() >= MAX_LIST_SIZE)
646 0 : histoLatency_.pop_front();
647 :
648 0 : currentLatency = (currentSRTS - lastSRTS_) / 2;
649 : // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
650 0 : histoLatency_.push_back(currentLatency);
651 : }
652 :
653 1 : lastSRTS_ = currentSRTS;
654 :
655 : // JAMI_WARN("SENDING NEW RTCP SR !! ");
656 :
657 5 : } else if (buf[1] == 201) // Receiver Report
658 : {
659 : // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
660 : // JAMI_WARN("SENDING NEW RTCP RR !! ");
661 : }
662 :
663 6 : return ret < 0 ? -errno : ret;
664 : }
665 :
666 : double
667 0 : SocketPair::getLastLatency()
668 : {
669 0 : if (not histoLatency_.empty())
670 0 : return histoLatency_.back();
671 : else
672 0 : return -1;
673 : }
674 :
675 : void
676 50 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
677 : {
678 50 : rtpDelayCallback_ = std::move(cb);
679 50 : }
680 :
681 : bool
682 5 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
683 : {
684 : // Keep only last packet of each frame
685 5 : if (not marker) {
686 1 : return false;
687 : }
688 :
689 : // 1st frame
690 4 : if (lastSendTS_ == 0.0f) {
691 1 : lastSendTS_ = sendTS;
692 1 : lastReceiveTS_ = std::chrono::steady_clock::now();
693 1 : return false;
694 : }
695 :
696 3 : int32_t deltaS = static_cast<int32_t>((sendTS - lastSendTS_) * 1000); // milliseconds
697 3 : if (deltaS < 0)
698 0 : deltaS += 64000;
699 3 : lastSendTS_ = sendTS;
700 :
701 3 : std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
702 : auto deltaR = static_cast<int32_t>(
703 3 : std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count());
704 3 : lastReceiveTS_ = arrival_TS;
705 :
706 3 : *gradient = deltaR - deltaS;
707 3 : *deltaT = deltaR;
708 :
709 3 : return true;
710 : }
711 :
712 : bool
713 5 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
714 : {
715 5 : if (not(buf[0] & 0x10))
716 0 : return false;
717 :
718 5 : uint16_t magic_word = (buf[12] << 8) + buf[13];
719 5 : if (magic_word != 0xBEDE)
720 0 : return false;
721 :
722 5 : uint8_t sec = buf[17] >> 2;
723 5 : uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
724 5 : float milli = static_cast<float>(fract / pow(2, 32));
725 :
726 5 : *abs = static_cast<float>(sec) + (milli);
727 5 : return true;
728 : }
729 :
730 : uint16_t
731 35 : SocketPair::lastSeqValOut()
732 : {
733 35 : if (srtpContext_)
734 35 : return srtpContext_->srtp_out.seq_largest;
735 0 : JAMI_ERR("SRTP context not found.");
736 0 : return 0;
737 : }
738 :
739 : } // namespace jami
|