LCOV - code coverage report
Current view: top level - foo/src/media - socket_pair.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 287 357 80.4 %
Date: 2025-12-18 10:07:43 Functions: 36 37 97.3 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *  Copyright (c) 2007 The FFmpeg Project
       4             :  *
       5             :  *  This program is free software: you can redistribute it and/or modify
       6             :  *  it under the terms of the GNU General Public License as published by
       7             :  *  the Free Software Foundation, either version 3 of the License, or
       8             :  *  (at your option) any later version.
       9             :  *
      10             :  *  This program is distributed in the hope that it will be useful,
      11             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13             :  *  GNU General Public License for more details.
      14             :  *
      15             :  *  You should have received a copy of the GNU General Public License
      16             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
      20             : 
      21             : #include "libav_deps.h" // THEN THIS ONE AFTER
      22             : 
      23             : #include "socket_pair.h"
      24             : #include "libav_utils.h"
      25             : #include "logger.h"
      26             : #include "connectivity/security/memory.h"
      27             : 
      28             : #include <dhtnet/ice_socket.h>
      29             : 
      30             : #include <iostream>
      31             : #include <string>
      32             : #include <algorithm>
      33             : #include <iterator>
      34             : 
      35             : extern "C" {
      36             : #include "srtp.h"
      37             : }
      38             : 
      39             : #include <cstring>
      40             : #include <stdexcept>
      41             : #include <unistd.h>
      42             : #include <sys/types.h>
      43             : #include <ciso646> // fix windows compiler bug
      44             : 
      45             : #ifdef _WIN32
      46             : #define SOCK_NONBLOCK FIONBIO
      47             : #define poll          WSAPoll
      48             : #define close(x)      closesocket(x)
      49             : #endif
      50             : 
      51             : #ifdef __ANDROID__
      52             : #include <asm-generic/fcntl.h>
      53             : #define SOCK_NONBLOCK O_NONBLOCK
      54             : #endif
      55             : 
      56             : #ifdef __APPLE__
      57             : #include <fcntl.h>
      58             : #endif
      59             : 
      60             : // Swap 2 byte, 16 bit values:
      61             : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
      62             : 
      63             : // Swap 4 byte, 32 bit values:
      64             : #define Swap4Bytes(val) \
      65             :     ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
      66             :      | (((val) << 24) & 0xFF000000))
      67             : 
      68             : // Swap 8 byte, 64 bit values:
      69             : #define Swap8Bytes(val) \
      70             :     ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
      71             :      | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
      72             :      | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
      73             :      | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
      74             : 
      75             : namespace jami {
      76             : 
      77             : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
      78             : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
      79             : static constexpr auto UDP_HEADER_SIZE = 8;
      80             : static constexpr auto SRTP_OVERHEAD = 10;
      81             : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
      82             : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
      83             : 
      84             : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
      85             : 
      86             : class SRTPProtoContext
      87             : {
      88             : public:
      89         326 :     SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
      90         326 :     {
      91         326 :         jami_secure_memzero(&srtp_out, sizeof(srtp_out));
      92         326 :         jami_secure_memzero(&srtp_in, sizeof(srtp_in));
      93         326 :         if (out_suite && out_key) {
      94             :             // XXX: see srtp_open from libavformat/srtpproto.c
      95         326 :             if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
      96           0 :                 srtp_close();
      97           0 :                 throw std::runtime_error("Unable to set crypto on output");
      98             :             }
      99             :         }
     100             : 
     101         326 :         if (in_suite && in_key) {
     102         326 :             if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
     103           0 :                 srtp_close();
     104           0 :                 throw std::runtime_error("Unable to set crypto on input");
     105             :             }
     106             :         }
     107         326 :     }
     108             : 
     109         326 :     ~SRTPProtoContext() { srtp_close(); }
     110             : 
     111             :     SRTPContext srtp_out {};
     112             :     SRTPContext srtp_in {};
     113             :     uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
     114             : 
     115             : private:
     116         326 :     void srtp_close() noexcept
     117             :     {
     118         326 :         ff_srtp_free(&srtp_out);
     119         326 :         ff_srtp_free(&srtp_in);
     120         326 :     }
     121             : };
     122             : 
     123             : static int
     124           0 : ff_network_wait_fd(int fd)
     125             : {
     126           0 :     struct pollfd p = {fd, POLLOUT, 0};
     127           0 :     auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
     128           0 :     return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
     129             : }
     130             : 
     131             : static int
     132          16 : udp_socket_create(int family, int port)
     133             : {
     134          16 :     int udp_fd = -1;
     135             : 
     136             : #ifdef __APPLE__
     137             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     138             :     if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
     139             :         close(udp_fd);
     140             :         udp_fd = -1;
     141             :     }
     142             : #elif defined _WIN32
     143             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     144             :     u_long block = 1;
     145             :     if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
     146             :         close(udp_fd);
     147             :         udp_fd = -1;
     148             :     }
     149             : #else
     150          16 :     udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
     151             : #endif
     152             : 
     153          16 :     if (udp_fd < 0) {
     154           0 :         JAMI_ERR("socket() failed");
     155           0 :         strErr();
     156           0 :         return -1;
     157             :     }
     158             : 
     159          16 :     auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
     160          16 :     if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
     161           0 :         JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
     162           0 :         close(udp_fd);
     163           0 :         return -1;
     164             :     }
     165             : 
     166          16 :     bind_addr.setPort(port);
     167          16 :     JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
     168          16 :     if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
     169           0 :         JAMI_ERR("bind() failed");
     170           0 :         strErr();
     171           0 :         close(udp_fd);
     172           0 :         udp_fd = -1;
     173             :     }
     174             : 
     175          16 :     return udp_fd;
     176             : }
     177             : 
     178           7 : SocketPair::SocketPair(const char* uri, int localPort)
     179             : {
     180           8 :     openSockets(uri, localPort);
     181           8 : }
     182             : 
     183         315 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
     184         317 :     : rtp_sock_(std::move(rtp_sock))
     185         632 :     , rtcp_sock_(std::move(rtcp_sock))
     186             : {
     187         317 :     JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
     188             :              this,
     189             :              rtp_sock_->getCompId(),
     190             :              rtcp_sock_->getCompId());
     191             : 
     192         318 :     rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     193        5791 :         std::lock_guard l(dataBuffMutex_);
     194        5791 :         rtpDataBuff_.emplace_back(buf, buf + len);
     195        5791 :         cv_.notify_one();
     196        5791 :         return len;
     197        5791 :     });
     198         318 :     rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     199         269 :         std::lock_guard l(dataBuffMutex_);
     200         269 :         rtcpDataBuff_.emplace_back(buf, buf + len);
     201         269 :         cv_.notify_one();
     202         269 :         return len;
     203         269 :     });
     204         318 : }
     205             : 
     206         326 : SocketPair::~SocketPair()
     207             : {
     208         326 :     interrupt();
     209         326 :     closeSockets();
     210         326 :     JAMI_DBG("[%p] Instance destroyed", this);
     211         326 : }
     212             : 
     213             : bool
     214         745 : SocketPair::waitForRTCP(std::chrono::seconds interval)
     215             : {
     216         745 :     std::unique_lock lock(rtcpInfo_mutex_);
     217         745 :     return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
     218        1991 :         return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
     219        1489 :     });
     220         745 : }
     221             : 
     222             : void
     223          10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
     224             : {
     225          10 :     if (len < sizeof(rtcpRRHeader))
     226           0 :         return;
     227             : 
     228          10 :     auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     229          10 :     if (header->pt != 201) // 201 = RR PT
     230           0 :         return;
     231             : 
     232          10 :     std::lock_guard lock(rtcpInfo_mutex_);
     233             : 
     234          10 :     if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
     235           0 :         listRtcpRRHeader_.pop_front();
     236             :     }
     237             : 
     238          10 :     listRtcpRRHeader_.emplace_back(*header);
     239             : 
     240          10 :     cvRtcpPacketReadyToRead_.notify_one();
     241          10 : }
     242             : 
     243             : void
     244         182 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
     245             : {
     246         182 :     if (len < sizeof(rtcpREMBHeader))
     247           0 :         return;
     248             : 
     249         182 :     auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
     250         182 :     if (header->pt != 206) // 206 = REMB PT
     251           0 :         return;
     252             : 
     253         182 :     if (header->uid != 0x424D4552) // uid must be "REMB"
     254           0 :         return;
     255             : 
     256         182 :     std::lock_guard lock(rtcpInfo_mutex_);
     257             : 
     258         182 :     if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
     259           0 :         listRtcpREMBHeader_.pop_front();
     260             :     }
     261             : 
     262         182 :     listRtcpREMBHeader_.push_back(*header);
     263             : 
     264         182 :     cvRtcpPacketReadyToRead_.notify_one();
     265         182 : }
     266             : 
     267             : std::list<rtcpRRHeader>
     268         745 : SocketPair::getRtcpRR()
     269             : {
     270         745 :     std::lock_guard lock(rtcpInfo_mutex_);
     271        1490 :     return std::move(listRtcpRRHeader_);
     272         745 : }
     273             : 
     274             : std::list<rtcpREMBHeader>
     275         382 : SocketPair::getRtcpREMB()
     276             : {
     277         382 :     std::lock_guard lock(rtcpInfo_mutex_);
     278         764 :     return std::move(listRtcpREMBHeader_);
     279         382 : }
     280             : 
     281             : void
     282         326 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
     283             : {
     284         326 :     srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
     285         326 : }
     286             : 
     287             : void
     288         652 : SocketPair::interrupt()
     289             : {
     290         652 :     JAMI_WARN("[%p] Interrupting RTP sockets", this);
     291         652 :     interrupted_ = true;
     292         652 :     if (rtp_sock_)
     293         636 :         rtp_sock_->setOnRecv(nullptr);
     294         652 :     if (rtcp_sock_)
     295         636 :         rtcp_sock_->setOnRecv(nullptr);
     296         652 :     cv_.notify_all();
     297         652 :     cvRtcpPacketReadyToRead_.notify_all();
     298         652 : }
     299             : 
     300             : void
     301         943 : SocketPair::setReadBlockingMode(bool block)
     302             : {
     303         943 :     JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
     304         943 :     readBlockingMode_ = block;
     305         943 :     cv_.notify_all();
     306         943 :     cvRtcpPacketReadyToRead_.notify_all();
     307         943 : }
     308             : 
     309             : void
     310         944 : SocketPair::stopSendOp(bool state)
     311             : {
     312         944 :     noWrite_ = state;
     313         944 : }
     314             : 
     315             : void
     316         326 : SocketPair::closeSockets()
     317             : {
     318         326 :     if (rtcpHandle_ > 0 and close(rtcpHandle_))
     319           0 :         strErr();
     320         326 :     if (rtpHandle_ > 0 and close(rtpHandle_))
     321           0 :         strErr();
     322         326 : }
     323             : 
     324             : void
     325           8 : SocketPair::openSockets(const char* uri, int local_rtp_port)
     326             : {
     327           8 :     JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
     328             : 
     329             :     char hostname[256];
     330             :     char path[1024];
     331             :     int dst_rtp_port;
     332             : 
     333           8 :     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
     334             : 
     335           8 :     const int local_rtcp_port = local_rtp_port + 1;
     336           8 :     const int dst_rtcp_port = dst_rtp_port + 1;
     337             : 
     338           8 :     rtpDestAddr_ = dhtnet::IpAddr {hostname};
     339           8 :     rtpDestAddr_.setPort(dst_rtp_port);
     340           8 :     rtcpDestAddr_ = dhtnet::IpAddr {hostname};
     341           8 :     rtcpDestAddr_.setPort(dst_rtcp_port);
     342             : 
     343             :     // Open local sockets (RTP/RTCP)
     344           8 :     if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
     345           8 :         or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
     346           0 :         closeSockets();
     347           0 :         JAMI_ERR("[%p] Sockets creation failed", this);
     348           0 :         throw std::runtime_error("Sockets creation failed");
     349             :     }
     350             : 
     351           8 :     JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
     352             :               local_rtp_port,
     353             :               local_rtcp_port,
     354             :               hostname,
     355             :               dst_rtp_port,
     356             :               dst_rtcp_port);
     357           8 : }
     358             : 
     359             : MediaIOHandle*
     360         646 : SocketPair::createIOContext(const uint16_t mtu)
     361             : {
     362             :     unsigned ip_header_size;
     363         646 :     if (rtp_sock_)
     364         630 :         ip_header_size = rtp_sock_->getTransportOverhead();
     365          16 :     else if (rtpDestAddr_.getFamily() == AF_INET6)
     366           0 :         ip_header_size = 40;
     367             :     else
     368          16 :         ip_header_size = 20;
     369             :     return new MediaIOHandle(
     370         646 :         mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
     371             :         true,
     372        6486 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
     373        5887 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
     374             :         0,
     375         646 :         reinterpret_cast<void*>(this));
     376             : }
     377             : 
     378             : int
     379        6486 : SocketPair::waitForData()
     380             : {
     381             :     // System sockets
     382        6486 :     if (rtpHandle_ >= 0) {
     383             :         int ret;
     384             :         do {
     385          87 :             if (interrupted_) {
     386           4 :                 errno = EINTR;
     387           6 :                 return -1;
     388             :             }
     389             : 
     390          83 :             if (not readBlockingMode_) {
     391           2 :                 return 0;
     392             :             }
     393             : 
     394             :             // work with system socket
     395          81 :             struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
     396          81 :             ret = poll(p, 2, NET_POLL_TIMEOUT);
     397          81 :             if (ret > 0) {
     398           0 :                 ret = 0;
     399           0 :                 if (p[0].revents & POLLIN)
     400           0 :                     ret |= static_cast<int>(DataType::RTP);
     401           0 :                 if (p[1].revents & POLLIN)
     402           0 :                     ret |= static_cast<int>(DataType::RTCP);
     403             :             }
     404          81 :         } while (!ret or (ret < 0 and errno == EAGAIN));
     405             : 
     406           0 :         return ret;
     407             :     }
     408             : 
     409             :     // work with IceSocket
     410             :     {
     411        6480 :         std::unique_lock lk(dataBuffMutex_);
     412        6480 :         cv_.wait(lk, [this] {
     413       12807 :             return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
     414             :         });
     415        6480 :     }
     416             : 
     417        6480 :     if (interrupted_) {
     418          28 :         errno = EINTR;
     419          28 :         return -1;
     420             :     }
     421             : 
     422        6452 :     return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
     423             : }
     424             : 
     425             : int
     426        6184 : SocketPair::readRtpData(void* buf, int buf_size)
     427             : {
     428             :     // handle system socket
     429        6184 :     if (rtpHandle_ >= 0) {
     430             :         struct sockaddr_storage from;
     431           0 :         socklen_t from_len = sizeof(from);
     432           0 :         return recvfrom(rtpHandle_,
     433             :                         static_cast<char*>(buf),
     434             :                         buf_size,
     435             :                         0,
     436             :                         reinterpret_cast<struct sockaddr*>(&from),
     437           0 :                         &from_len);
     438             :     }
     439             : 
     440             :     // handle ICE
     441        6184 :     std::unique_lock lk(dataBuffMutex_);
     442        6184 :     if (not rtpDataBuff_.empty()) {
     443        5790 :         auto pkt = std::move(rtpDataBuff_.front());
     444        5790 :         rtpDataBuff_.pop_front();
     445        5790 :         lk.unlock(); // to not block our ICE callbacks
     446        5790 :         int pkt_size = pkt.size();
     447        5790 :         int len = std::min(pkt_size, buf_size);
     448        5790 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     449        5790 :         return len;
     450        5790 :     }
     451             : 
     452         393 :     return 0;
     453        6183 : }
     454             : 
     455             : int
     456        6452 : SocketPair::readRtcpData(void* buf, int buf_size)
     457             : {
     458             :     // handle system socket
     459        6452 :     if (rtcpHandle_ >= 0) {
     460             :         struct sockaddr_storage from;
     461           0 :         socklen_t from_len = sizeof(from);
     462           0 :         return recvfrom(rtcpHandle_,
     463             :                         static_cast<char*>(buf),
     464             :                         buf_size,
     465             :                         0,
     466             :                         reinterpret_cast<struct sockaddr*>(&from),
     467           0 :                         &from_len);
     468             :     }
     469             : 
     470             :     // handle ICE
     471        6452 :     std::unique_lock lk(dataBuffMutex_);
     472        6452 :     if (not rtcpDataBuff_.empty()) {
     473         268 :         auto pkt = std::move(rtcpDataBuff_.front());
     474         268 :         rtcpDataBuff_.pop_front();
     475         268 :         lk.unlock();
     476         268 :         int pkt_size = pkt.size();
     477         268 :         int len = std::min(pkt_size, buf_size);
     478         268 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     479         268 :         return len;
     480         268 :     }
     481             : 
     482        6184 :     return 0;
     483        6452 : }
     484             : 
     485             : int
     486        6486 : SocketPair::readCallback(uint8_t* buf, int buf_size)
     487             : {
     488        6486 :     auto datatype = waitForData();
     489        6486 :     if (datatype < 0)
     490          32 :         return datatype;
     491             : 
     492        6454 :     int len = 0;
     493        6454 :     bool fromRTCP = false;
     494             : 
     495        6454 :     if (datatype & static_cast<int>(DataType::RTCP)) {
     496        6452 :         len = readRtcpData(buf, buf_size);
     497        6452 :         if (len > 0) {
     498         268 :             auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     499             :             // 201 = RR PT
     500         268 :             if (header->pt == 201) {
     501          10 :                 lastDLSR_ = Swap4Bytes(header->dlsr);
     502             :                 // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
     503          10 :                 lastRR_time = std::chrono::steady_clock::now();
     504          10 :                 saveRtcpRRPacket(buf, len);
     505             :             }
     506             :             // 206 = REMB PT
     507         258 :             else if (header->pt == 206)
     508         182 :                 saveRtcpREMBPacket(buf, len);
     509             :             // 200 = SR PT
     510          76 :             else if (header->pt == 200) {
     511             :                 // not used yet
     512             :             } else {
     513           0 :                 JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
     514             :             }
     515         268 :             fromRTCP = true;
     516             :         }
     517             :     }
     518             : 
     519             :     // No RTCP… attempt RTP
     520        6454 :     if (!len and (datatype & static_cast<int>(DataType::RTP))) {
     521        6184 :         len = readRtpData(buf, buf_size);
     522        6183 :         fromRTCP = false;
     523             :     }
     524             : 
     525        6453 :     if (len <= 0)
     526         395 :         return len;
     527             : 
     528        6058 :     if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
     529           0 :         return len;
     530             : 
     531             :     // SRTP decrypt
     532        6058 :     if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
     533        5790 :         int32_t gradient = 0;
     534        5790 :         int32_t deltaT = 0;
     535        5790 :         float abs = 0.0f;
     536        5790 :         bool res_parse = false;
     537        5790 :         bool res_delay = false;
     538             : 
     539        5790 :         res_parse = parse_RTP_ext(buf, &abs);
     540        5790 :         bool marker = (buf[1] & 0x80) >> 7;
     541             : 
     542        5790 :         if (res_parse)
     543        5790 :             res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
     544             : 
     545             :         // rtpDelayCallback_ is not set for audio
     546        5790 :         if (rtpDelayCallback_ and res_delay)
     547        5694 :             rtpDelayCallback_(gradient, deltaT);
     548             : 
     549        5790 :         auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
     550        5790 :         if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
     551           0 :             packetLossCallback_();
     552        5790 :         lastSeqNumIn_ = buf[2] << 8 | buf[3];
     553        5790 :         if (err < 0)
     554           0 :             JAMI_WARN("decrypt error %d", err);
     555             :     }
     556             : 
     557        6058 :     if (len != 0)
     558        6058 :         return len;
     559             :     else
     560           0 :         return AVERROR_EOF;
     561             : }
     562             : 
     563             : int
     564        6069 : SocketPair::writeData(uint8_t* buf, int buf_size)
     565             : {
     566        6069 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     567             : 
     568             :     // System sockets?
     569        6069 :     if (rtpHandle_ >= 0) {
     570             :         int fd;
     571             :         dhtnet::IpAddr* dest_addr;
     572             : 
     573           0 :         if (isRTCP) {
     574           0 :             fd = rtcpHandle_;
     575           0 :             dest_addr = &rtcpDestAddr_;
     576             :         } else {
     577           0 :             fd = rtpHandle_;
     578           0 :             dest_addr = &rtpDestAddr_;
     579             :         }
     580             : 
     581           0 :         auto ret = ff_network_wait_fd(fd);
     582           0 :         if (ret < 0)
     583           0 :             return ret;
     584             : 
     585           0 :         if (noWrite_)
     586           0 :             return buf_size;
     587           0 :         return ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength());
     588             :     }
     589             : 
     590        6069 :     if (noWrite_)
     591           0 :         return buf_size;
     592             : 
     593             :     // IceSocket
     594        6069 :     if (isRTCP)
     595         269 :         return rtcp_sock_->send(buf, buf_size);
     596             :     else
     597        5800 :         return rtp_sock_->send(buf, buf_size);
     598             : }
     599             : 
     600             : int
     601        5887 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
     602             : {
     603        5887 :     if (noWrite_)
     604           0 :         return 0;
     605             : 
     606             :     int ret;
     607        5887 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     608             :     unsigned int ts_LSB, ts_MSB;
     609             :     double currentSRTS, currentLatency;
     610             : 
     611             :     // Encrypt?
     612        5887 :     if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
     613        5800 :         buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
     614             :                                    buf,
     615             :                                    buf_size,
     616        5800 :                                    srtpContext_->encryptbuf,
     617             :                                    sizeof(srtpContext_->encryptbuf));
     618        5800 :         if (buf_size < 0) {
     619           0 :             JAMI_WARN("encrypt error %d", buf_size);
     620           0 :             return buf_size;
     621             :         }
     622             : 
     623        5800 :         buf = srtpContext_->encryptbuf;
     624             :     }
     625             : 
     626             :     // check if we're sending an RR, if so, detect packet loss
     627             :     // buf_size gives length of buffer, not just header
     628        5887 :     if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
     629          10 :         auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     630          10 :         rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
     631             :     }
     632             : 
     633             :     do {
     634        5887 :         if (interrupted_)
     635           0 :             return -EINTR;
     636        5887 :         ret = writeData(buf, buf_size);
     637        5887 :     } while (ret < 0 and errno == EAGAIN);
     638             : 
     639        5887 :     if (buf[1] == 200) // Sender Report
     640             :     {
     641          77 :         auto header = reinterpret_cast<rtcpSRHeader*>(buf);
     642          77 :         ts_LSB = Swap4Bytes(header->timestampLSB);
     643          77 :         ts_MSB = Swap4Bytes(header->timestampMSB);
     644             : 
     645          77 :         currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
     646             : 
     647          77 :         if (lastSRTS_ != 0 && lastDLSR_ != 0) {
     648          14 :             if (histoLatency_.size() >= MAX_LIST_SIZE)
     649           0 :                 histoLatency_.pop_front();
     650             : 
     651          14 :             currentLatency = (currentSRTS - lastSRTS_) / 2;
     652             :             // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
     653          14 :             histoLatency_.push_back(currentLatency);
     654             :         }
     655             : 
     656          77 :         lastSRTS_ = currentSRTS;
     657             : 
     658             :         // JAMI_WARN("SENDING NEW RTCP SR !! ");
     659             : 
     660        5810 :     } else if (buf[1] == 201) // Receiver Report
     661             :     {
     662             :         // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     663             :         // JAMI_WARN("SENDING NEW RTCP RR !! ");
     664             :     }
     665             : 
     666        5887 :     return ret < 0 ? -errno : ret;
     667             : }
     668             : 
     669             : double
     670          10 : SocketPair::getLastLatency()
     671             : {
     672          10 :     if (not histoLatency_.empty())
     673           0 :         return histoLatency_.back();
     674             :     else
     675          10 :         return -1;
     676             : }
     677             : 
     678             : void
     679         146 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
     680             : {
     681         146 :     rtpDelayCallback_ = std::move(cb);
     682         146 : }
     683             : 
     684             : bool
     685        5790 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
     686             : {
     687             :     // Keep only last packet of each frame
     688        5790 :     if (not marker) {
     689          49 :         return 0;
     690             :     }
     691             : 
     692             :     // 1st frame
     693        5741 :     if (not lastSendTS_) {
     694          47 :         lastSendTS_ = sendTS;
     695          47 :         lastReceiveTS_ = std::chrono::steady_clock::now();
     696          47 :         return 0;
     697             :     }
     698             : 
     699        5694 :     int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
     700        5694 :     if (deltaS < 0)
     701           1 :         deltaS += 64000;
     702        5694 :     lastSendTS_ = sendTS;
     703             : 
     704        5694 :     std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
     705        5694 :     auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count();
     706        5694 :     lastReceiveTS_ = arrival_TS;
     707             : 
     708        5694 :     *gradient = deltaR - deltaS;
     709        5694 :     *deltaT = deltaR;
     710             : 
     711        5694 :     return true;
     712             : }
     713             : 
     714             : bool
     715        5790 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
     716             : {
     717        5790 :     if (not(buf[0] & 0x10))
     718           0 :         return false;
     719             : 
     720        5790 :     uint16_t magic_word = (buf[12] << 8) + buf[13];
     721        5790 :     if (magic_word != 0xBEDE)
     722           0 :         return false;
     723             : 
     724        5790 :     uint8_t sec = buf[17] >> 2;
     725        5790 :     uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
     726        5790 :     float milli = fract / pow(2, 32);
     727             : 
     728        5790 :     *abs = sec + (milli);
     729        5790 :     return true;
     730             : }
     731             : 
     732             : uint16_t
     733         147 : SocketPair::lastSeqValOut()
     734             : {
     735         147 :     if (srtpContext_)
     736         147 :         return srtpContext_->srtp_out.seq_largest;
     737           0 :     JAMI_ERR("SRTP context not found.");
     738           0 :     return 0;
     739             : }
     740             : 
     741             : } // namespace jami

Generated by: LCOV version 1.14