LCOV - code coverage report
Current view: top level - foo/src/media - socket_pair.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 250 357 70.0 %
Date: 2026-02-28 10:41:24 Functions: 33 37 89.2 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *  Copyright (c) 2007 The FFmpeg Project
       4             :  *
       5             :  *  This program is free software: you can redistribute it and/or modify
       6             :  *  it under the terms of the GNU General Public License as published by
       7             :  *  the Free Software Foundation, either version 3 of the License, or
       8             :  *  (at your option) any later version.
       9             :  *
      10             :  *  This program is distributed in the hope that it will be useful,
      11             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13             :  *  GNU General Public License for more details.
      14             :  *
      15             :  *  You should have received a copy of the GNU General Public License
      16             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
      20             : 
      21             : #include "libav_deps.h" // THEN THIS ONE AFTER
      22             : 
      23             : #include "socket_pair.h"
      24             : #include "libav_utils.h"
      25             : #include "logger.h"
      26             : #include "connectivity/security/memory.h"
      27             : 
      28             : #include <dhtnet/ice_socket.h>
      29             : 
      30             : #include <iostream>
      31             : #include <string>
      32             : #include <algorithm>
      33             : #include <iterator>
      34             : 
      35             : extern "C" {
      36             : #include "srtp.h"
      37             : }
      38             : 
      39             : #include <cstring>
      40             : #include <stdexcept>
      41             : #include <unistd.h>
      42             : #include <sys/types.h>
      43             : 
      44             : #ifdef _WIN32
      45             : #define SOCK_NONBLOCK FIONBIO
      46             : #define poll          WSAPoll
      47             : #define close(x)      closesocket(x)
      48             : #endif
      49             : 
      50             : #ifdef __ANDROID__
      51             : #include <asm-generic/fcntl.h>
      52             : #define SOCK_NONBLOCK O_NONBLOCK
      53             : #endif
      54             : 
      55             : #ifdef __APPLE__
      56             : #include <fcntl.h>
      57             : #endif
      58             : 
      59             : // Swap 2 byte, 16 bit values:
      60             : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
      61             : 
      62             : // Swap 4 byte, 32 bit values:
      63             : #define Swap4Bytes(val) \
      64             :     ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
      65             :      | (((val) << 24) & 0xFF000000))
      66             : 
      67             : // Swap 8 byte, 64 bit values:
      68             : #define Swap8Bytes(val) \
      69             :     ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
      70             :      | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
      71             :      | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
      72             :      | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
      73             : 
      74             : namespace jami {
      75             : 
      76             : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
      77             : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
      78             : static constexpr auto UDP_HEADER_SIZE = 8;
      79             : static constexpr auto SRTP_OVERHEAD = 10;
      80             : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
      81             : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
      82             : 
      83             : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
      84             : 
      85             : class SRTPProtoContext
      86             : {
      87             : public:
      88         304 :     SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
      89         304 :     {
      90         304 :         jami_secure_memzero(&srtp_out, sizeof(srtp_out));
      91         304 :         jami_secure_memzero(&srtp_in, sizeof(srtp_in));
      92         304 :         if (out_suite && out_key) {
      93             :             // XXX: see srtp_open from libavformat/srtpproto.c
      94         304 :             if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
      95           0 :                 srtp_close();
      96           0 :                 throw std::runtime_error("Unable to set crypto on output");
      97             :             }
      98             :         }
      99             : 
     100         304 :         if (in_suite && in_key) {
     101         304 :             if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
     102           0 :                 srtp_close();
     103           0 :                 throw std::runtime_error("Unable to set crypto on input");
     104             :             }
     105             :         }
     106         303 :     }
     107             : 
     108         304 :     ~SRTPProtoContext() { srtp_close(); }
     109             : 
     110             :     SRTPContext srtp_out {};
     111             :     SRTPContext srtp_in {};
     112             :     uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
     113             : 
     114             : private:
     115         304 :     void srtp_close() noexcept
     116             :     {
     117         304 :         ff_srtp_free(&srtp_out);
     118         304 :         ff_srtp_free(&srtp_in);
     119         304 :     }
     120             : };
     121             : 
     122             : static int
     123           0 : ff_network_wait_fd(int fd)
     124             : {
     125           0 :     struct pollfd p = {fd, POLLOUT, 0};
     126           0 :     auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
     127           0 :     return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
     128             : }
     129             : 
     130             : static int
     131           0 : udp_socket_create(int family, int port)
     132             : {
     133           0 :     int udp_fd = -1;
     134             : 
     135             : #ifdef __APPLE__
     136             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     137             :     if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
     138             :         close(udp_fd);
     139             :         udp_fd = -1;
     140             :     }
     141             : #elif defined _WIN32
     142             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     143             :     u_long block = 1;
     144             :     if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
     145             :         close(udp_fd);
     146             :         udp_fd = -1;
     147             :     }
     148             : #else
     149           0 :     udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
     150             : #endif
     151             : 
     152           0 :     if (udp_fd < 0) {
     153           0 :         JAMI_ERR("socket() failed");
     154           0 :         strErr();
     155           0 :         return -1;
     156             :     }
     157             : 
     158           0 :     auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
     159           0 :     if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
     160           0 :         JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
     161           0 :         close(udp_fd);
     162           0 :         return -1;
     163             :     }
     164             : 
     165           0 :     bind_addr.setPort(port);
     166           0 :     JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
     167           0 :     if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
     168           0 :         JAMI_ERR("bind() failed");
     169           0 :         strErr();
     170           0 :         close(udp_fd);
     171           0 :         udp_fd = -1;
     172             :     }
     173             : 
     174           0 :     return udp_fd;
     175             : }
     176             : 
     177           0 : SocketPair::SocketPair(const char* uri, int localPort)
     178             : {
     179           0 :     openSockets(uri, localPort);
     180           0 : }
     181             : 
     182         299 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
     183         304 :     : rtp_sock_(std::move(rtp_sock))
     184         603 :     , rtcp_sock_(std::move(rtcp_sock))
     185             : {
     186         303 :     JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
     187             :              this,
     188             :              rtp_sock_->getCompId(),
     189             :              rtcp_sock_->getCompId());
     190             : 
     191         304 :     rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     192        5609 :         std::lock_guard l(dataBuffMutex_);
     193        5609 :         rtpDataBuff_.emplace_back(buf, buf + len);
     194        5609 :         cv_.notify_one();
     195        5609 :         return len;
     196        5609 :     });
     197         304 :     rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     198         264 :         std::lock_guard l(dataBuffMutex_);
     199         264 :         rtcpDataBuff_.emplace_back(buf, buf + len);
     200         264 :         cv_.notify_one();
     201         264 :         return len;
     202         264 :     });
     203         303 : }
     204             : 
     205         304 : SocketPair::~SocketPair()
     206             : {
     207         304 :     interrupt();
     208         304 :     closeSockets();
     209         304 :     JAMI_DBG("[%p] Instance destroyed", this);
     210         304 : }
     211             : 
     212             : bool
     213         648 : SocketPair::waitForRTCP(std::chrono::seconds interval)
     214             : {
     215         648 :     std::unique_lock lock(rtcpInfo_mutex_);
     216         648 :     return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
     217        1881 :         return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
     218        1296 :     });
     219         648 : }
     220             : 
     221             : void
     222          10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
     223             : {
     224          10 :     if (len < sizeof(rtcpRRHeader))
     225           0 :         return;
     226             : 
     227          10 :     auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     228          10 :     if (header->pt != 201) // 201 = RR PT
     229           0 :         return;
     230             : 
     231          10 :     std::lock_guard lock(rtcpInfo_mutex_);
     232             : 
     233          10 :     if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
     234           0 :         listRtcpRRHeader_.pop_front();
     235             :     }
     236             : 
     237          10 :     listRtcpRRHeader_.emplace_back(*header);
     238             : 
     239          10 :     cvRtcpPacketReadyToRead_.notify_one();
     240          10 : }
     241             : 
     242             : void
     243         181 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
     244             : {
     245         181 :     if (len < sizeof(rtcpREMBHeader))
     246           0 :         return;
     247             : 
     248         181 :     auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
     249         181 :     if (header->pt != 206) // 206 = REMB PT
     250           0 :         return;
     251             : 
     252         181 :     if (header->uid != 0x424D4552) // uid must be "REMB"
     253           0 :         return;
     254             : 
     255         181 :     std::lock_guard lock(rtcpInfo_mutex_);
     256             : 
     257         181 :     if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
     258           0 :         listRtcpREMBHeader_.pop_front();
     259             :     }
     260             : 
     261         181 :     listRtcpREMBHeader_.push_back(*header);
     262             : 
     263         181 :     cvRtcpPacketReadyToRead_.notify_one();
     264         181 : }
     265             : 
     266             : std::list<rtcpRRHeader>
     267         647 : SocketPair::getRtcpRR()
     268             : {
     269         647 :     std::lock_guard lock(rtcpInfo_mutex_);
     270        1294 :     return std::move(listRtcpRRHeader_);
     271         648 : }
     272             : 
     273             : std::list<rtcpREMBHeader>
     274         372 : SocketPair::getRtcpREMB()
     275             : {
     276         372 :     std::lock_guard lock(rtcpInfo_mutex_);
     277         744 :     return std::move(listRtcpREMBHeader_);
     278         372 : }
     279             : 
     280             : void
     281         304 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
     282             : {
     283         304 :     srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
     284         304 : }
     285             : 
     286             : void
     287         608 : SocketPair::interrupt()
     288             : {
     289         608 :     JAMI_WARN("[%p] Interrupting RTP sockets", this);
     290         608 :     interrupted_ = true;
     291         608 :     if (rtp_sock_)
     292         608 :         rtp_sock_->setOnRecv(nullptr);
     293         608 :     if (rtcp_sock_)
     294         608 :         rtcp_sock_->setOnRecv(nullptr);
     295         608 :     cv_.notify_all();
     296         608 :     cvRtcpPacketReadyToRead_.notify_all();
     297         608 : }
     298             : 
     299             : void
     300         882 : SocketPair::setReadBlockingMode(bool block)
     301             : {
     302         882 :     JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
     303         882 :     readBlockingMode_ = block;
     304         882 :     cv_.notify_all();
     305         882 :     cvRtcpPacketReadyToRead_.notify_all();
     306         882 : }
     307             : 
     308             : void
     309         882 : SocketPair::stopSendOp(bool state)
     310             : {
     311         882 :     noWrite_ = state;
     312         882 : }
     313             : 
     314             : void
     315         304 : SocketPair::closeSockets()
     316             : {
     317         304 :     if (rtcpHandle_ > 0 and close(rtcpHandle_))
     318           0 :         strErr();
     319         304 :     if (rtpHandle_ > 0 and close(rtpHandle_))
     320           0 :         strErr();
     321         304 : }
     322             : 
     323             : void
     324           0 : SocketPair::openSockets(const char* uri, int local_rtp_port)
     325             : {
     326           0 :     JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
     327             : 
     328             :     char hostname[256];
     329             :     char path[1024];
     330             :     int dst_rtp_port;
     331             : 
     332           0 :     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
     333             : 
     334           0 :     const int local_rtcp_port = local_rtp_port + 1;
     335           0 :     const int dst_rtcp_port = dst_rtp_port + 1;
     336             : 
     337           0 :     rtpDestAddr_ = dhtnet::IpAddr {hostname};
     338           0 :     rtpDestAddr_.setPort(dst_rtp_port);
     339           0 :     rtcpDestAddr_ = dhtnet::IpAddr {hostname};
     340           0 :     rtcpDestAddr_.setPort(dst_rtcp_port);
     341             : 
     342             :     // Open local sockets (RTP/RTCP)
     343           0 :     if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
     344           0 :         or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
     345           0 :         closeSockets();
     346           0 :         JAMI_ERR("[%p] Sockets creation failed", this);
     347           0 :         throw std::runtime_error("Sockets creation failed");
     348             :     }
     349             : 
     350           0 :     JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
     351             :               local_rtp_port,
     352             :               local_rtcp_port,
     353             :               hostname,
     354             :               dst_rtp_port,
     355             :               dst_rtcp_port);
     356           0 : }
     357             : 
     358             : MediaIOHandle*
     359         601 : SocketPair::createIOContext(const uint16_t mtu)
     360             : {
     361             :     unsigned ip_header_size;
     362         601 :     if (rtp_sock_)
     363         601 :         ip_header_size = rtp_sock_->getTransportOverhead();
     364           0 :     else if (rtpDestAddr_.getFamily() == AF_INET6)
     365           0 :         ip_header_size = 40;
     366             :     else
     367           0 :         ip_header_size = 20;
     368             :     return new MediaIOHandle(
     369         601 :         mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
     370             :         true,
     371        6275 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
     372        5725 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
     373             :         0,
     374         600 :         reinterpret_cast<void*>(this));
     375             : }
     376             : 
     377             : int
     378        6276 : SocketPair::waitForData()
     379             : {
     380             :     // System sockets
     381        6276 :     if (rtpHandle_ >= 0) {
     382             :         int ret;
     383             :         do {
     384           0 :             if (interrupted_) {
     385           0 :                 errno = EINTR;
     386           0 :                 return -1;
     387             :             }
     388             : 
     389           0 :             if (not readBlockingMode_) {
     390           0 :                 return 0;
     391             :             }
     392             : 
     393             :             // work with system socket
     394           0 :             struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
     395           0 :             ret = poll(p, 2, NET_POLL_TIMEOUT);
     396           0 :             if (ret > 0) {
     397           0 :                 ret = 0;
     398           0 :                 if (p[0].revents & POLLIN)
     399           0 :                     ret |= static_cast<int>(DataType::RTP);
     400           0 :                 if (p[1].revents & POLLIN)
     401           0 :                     ret |= static_cast<int>(DataType::RTCP);
     402             :             }
     403           0 :         } while (!ret or (ret < 0 and errno == EAGAIN));
     404             : 
     405           0 :         return ret;
     406             :     }
     407             : 
     408             :     // work with IceSocket
     409             :     {
     410        6276 :         std::unique_lock lk(dataBuffMutex_);
     411        6276 :         cv_.wait(lk, [this] {
     412       12398 :             return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
     413             :         });
     414        6276 :     }
     415             : 
     416        6275 :     if (interrupted_) {
     417          13 :         errno = EINTR;
     418          13 :         return -1;
     419             :     }
     420             : 
     421        6263 :     return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
     422             : }
     423             : 
     424             : int
     425        5999 : SocketPair::readRtpData(void* buf, int buf_size)
     426             : {
     427             :     // handle system socket
     428        5999 :     if (rtpHandle_ >= 0) {
     429             :         struct sockaddr_storage from;
     430           0 :         socklen_t from_len = sizeof(from);
     431           0 :         return recvfrom(rtpHandle_,
     432             :                         static_cast<char*>(buf),
     433             :                         buf_size,
     434             :                         0,
     435             :                         reinterpret_cast<struct sockaddr*>(&from),
     436           0 :                         &from_len);
     437             :     }
     438             : 
     439             :     // handle ICE
     440        5999 :     std::unique_lock lk(dataBuffMutex_);
     441        5999 :     if (not rtpDataBuff_.empty()) {
     442        5609 :         auto pkt = std::move(rtpDataBuff_.front());
     443        5609 :         rtpDataBuff_.pop_front();
     444        5609 :         lk.unlock(); // to not block our ICE callbacks
     445        5609 :         int pkt_size = pkt.size();
     446        5609 :         int len = std::min(pkt_size, buf_size);
     447        5609 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     448        5609 :         return len;
     449        5609 :     }
     450             : 
     451         390 :     return 0;
     452        5999 : }
     453             : 
     454             : int
     455        6263 : SocketPair::readRtcpData(void* buf, int buf_size)
     456             : {
     457             :     // handle system socket
     458        6263 :     if (rtcpHandle_ >= 0) {
     459             :         struct sockaddr_storage from;
     460           0 :         socklen_t from_len = sizeof(from);
     461           0 :         return recvfrom(rtcpHandle_,
     462             :                         static_cast<char*>(buf),
     463             :                         buf_size,
     464             :                         0,
     465             :                         reinterpret_cast<struct sockaddr*>(&from),
     466           0 :                         &from_len);
     467             :     }
     468             : 
     469             :     // handle ICE
     470        6263 :     std::unique_lock lk(dataBuffMutex_);
     471        6263 :     if (not rtcpDataBuff_.empty()) {
     472         264 :         auto pkt = std::move(rtcpDataBuff_.front());
     473         264 :         rtcpDataBuff_.pop_front();
     474         264 :         lk.unlock();
     475         264 :         int pkt_size = pkt.size();
     476         264 :         int len = std::min(pkt_size, buf_size);
     477         264 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     478         264 :         return len;
     479         264 :     }
     480             : 
     481        5999 :     return 0;
     482        6263 : }
     483             : 
     484             : int
     485        6276 : SocketPair::readCallback(uint8_t* buf, int buf_size)
     486             : {
     487        6276 :     auto datatype = waitForData();
     488        6276 :     if (datatype < 0)
     489          13 :         return datatype;
     490             : 
     491        6263 :     int len = 0;
     492        6263 :     bool fromRTCP = false;
     493             : 
     494        6263 :     if (datatype & static_cast<int>(DataType::RTCP)) {
     495        6263 :         len = readRtcpData(buf, buf_size);
     496        6263 :         if (len > 0) {
     497         264 :             auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     498             :             // 201 = RR PT
     499         264 :             if (header->pt == 201) {
     500          10 :                 lastDLSR_ = Swap4Bytes(header->dlsr);
     501             :                 // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
     502          10 :                 lastRR_time = std::chrono::steady_clock::now();
     503          10 :                 saveRtcpRRPacket(buf, len);
     504             :             }
     505             :             // 206 = REMB PT
     506         254 :             else if (header->pt == 206)
     507         181 :                 saveRtcpREMBPacket(buf, len);
     508             :             // 200 = SR PT
     509          73 :             else if (header->pt == 200) {
     510             :                 // not used yet
     511             :             } else {
     512           0 :                 JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
     513             :             }
     514         264 :             fromRTCP = true;
     515             :         }
     516             :     }
     517             : 
     518             :     // No RTCP… attempt RTP
     519        6263 :     if (!len and (datatype & static_cast<int>(DataType::RTP))) {
     520        5999 :         len = readRtpData(buf, buf_size);
     521        5999 :         fromRTCP = false;
     522             :     }
     523             : 
     524        6263 :     if (len <= 0)
     525         390 :         return len;
     526             : 
     527        5873 :     if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
     528           0 :         return len;
     529             : 
     530             :     // SRTP decrypt
     531        5873 :     if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
     532        5609 :         int32_t gradient = 0;
     533        5609 :         int32_t deltaT = 0;
     534        5609 :         float abs = 0.0f;
     535        5609 :         bool res_parse = false;
     536        5609 :         bool res_delay = false;
     537             : 
     538        5609 :         res_parse = parse_RTP_ext(buf, &abs);
     539        5609 :         bool marker = (buf[1] & 0x80) >> 7;
     540             : 
     541        5609 :         if (res_parse)
     542        5609 :             res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
     543             : 
     544             :         // rtpDelayCallback_ is not set for audio
     545        5609 :         if (rtpDelayCallback_ and res_delay)
     546        5516 :             rtpDelayCallback_(gradient, deltaT);
     547             : 
     548        5609 :         auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
     549        5609 :         if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
     550           0 :             packetLossCallback_();
     551        5609 :         lastSeqNumIn_ = buf[2] << 8 | buf[3];
     552        5609 :         if (err < 0)
     553           0 :             JAMI_WARN("decrypt error %d", err);
     554             :     }
     555             : 
     556        5873 :     if (len != 0)
     557        5873 :         return len;
     558             :     else
     559           0 :         return AVERROR_EOF;
     560             : }
     561             : 
     562             : int
     563        5906 : SocketPair::writeData(uint8_t* buf, int buf_size)
     564             : {
     565        5906 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     566             : 
     567             :     // System sockets?
     568        5906 :     if (rtpHandle_ >= 0) {
     569             :         int fd;
     570             :         dhtnet::IpAddr* dest_addr;
     571             : 
     572           0 :         if (isRTCP) {
     573           0 :             fd = rtcpHandle_;
     574           0 :             dest_addr = &rtcpDestAddr_;
     575             :         } else {
     576           0 :             fd = rtpHandle_;
     577           0 :             dest_addr = &rtpDestAddr_;
     578             :         }
     579             : 
     580           0 :         auto ret = ff_network_wait_fd(fd);
     581           0 :         if (ret < 0)
     582           0 :             return ret;
     583             : 
     584           0 :         if (noWrite_)
     585           0 :             return buf_size;
     586           0 :         return ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength());
     587             :     }
     588             : 
     589        5906 :     if (noWrite_)
     590           0 :         return buf_size;
     591             : 
     592             :     // IceSocket
     593        5906 :     if (isRTCP)
     594         266 :         return rtcp_sock_->send(buf, buf_size);
     595             :     else
     596        5640 :         return rtp_sock_->send(buf, buf_size);
     597             : }
     598             : 
     599             : int
     600        5725 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
     601             : {
     602        5725 :     if (noWrite_)
     603           0 :         return 0;
     604             : 
     605             :     int ret;
     606        5725 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     607             :     unsigned int ts_LSB, ts_MSB;
     608             :     double currentSRTS, currentLatency;
     609             : 
     610             :     // Encrypt?
     611        5725 :     if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
     612        5640 :         buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
     613             :                                    buf,
     614             :                                    buf_size,
     615        5640 :                                    srtpContext_->encryptbuf,
     616             :                                    sizeof(srtpContext_->encryptbuf));
     617        5640 :         if (buf_size < 0) {
     618           0 :             JAMI_WARN("encrypt error %d", buf_size);
     619           0 :             return buf_size;
     620             :         }
     621             : 
     622        5640 :         buf = srtpContext_->encryptbuf;
     623             :     }
     624             : 
     625             :     // check if we're sending an RR, if so, detect packet loss
     626             :     // buf_size gives length of buffer, not just header
     627        5725 :     if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
     628          10 :         auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     629          10 :         rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
     630             :     }
     631             : 
     632             :     do {
     633        5725 :         if (interrupted_)
     634           0 :             return -EINTR;
     635        5725 :         ret = writeData(buf, buf_size);
     636        5725 :     } while (ret < 0 and errno == EAGAIN);
     637             : 
     638        5725 :     if (buf[1] == 200) // Sender Report
     639             :     {
     640          75 :         auto header = reinterpret_cast<rtcpSRHeader*>(buf);
     641          75 :         ts_LSB = Swap4Bytes(header->timestampLSB);
     642          75 :         ts_MSB = Swap4Bytes(header->timestampMSB);
     643             : 
     644          75 :         currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
     645             : 
     646          75 :         if (lastSRTS_ != 0 && lastDLSR_ != 0) {
     647          14 :             if (histoLatency_.size() >= MAX_LIST_SIZE)
     648           0 :                 histoLatency_.pop_front();
     649             : 
     650          14 :             currentLatency = (currentSRTS - lastSRTS_) / 2;
     651             :             // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
     652          14 :             histoLatency_.push_back(currentLatency);
     653             :         }
     654             : 
     655          75 :         lastSRTS_ = currentSRTS;
     656             : 
     657             :         // JAMI_WARN("SENDING NEW RTCP SR !! ");
     658             : 
     659        5650 :     } else if (buf[1] == 201) // Receiver Report
     660             :     {
     661             :         // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     662             :         // JAMI_WARN("SENDING NEW RTCP RR !! ");
     663             :     }
     664             : 
     665        5725 :     return ret < 0 ? -errno : ret;
     666             : }
     667             : 
     668             : double
     669          10 : SocketPair::getLastLatency()
     670             : {
     671          10 :     if (not histoLatency_.empty())
     672           0 :         return histoLatency_.back();
     673             :     else
     674          10 :         return -1;
     675             : }
     676             : 
     677             : void
     678         138 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
     679             : {
     680         138 :     rtpDelayCallback_ = std::move(cb);
     681         138 : }
     682             : 
     683             : bool
     684        5609 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
     685             : {
     686             :     // Keep only last packet of each frame
     687        5609 :     if (not marker) {
     688          46 :         return 0;
     689             :     }
     690             : 
     691             :     // 1st frame
     692        5563 :     if (not lastSendTS_) {
     693          47 :         lastSendTS_ = sendTS;
     694          47 :         lastReceiveTS_ = std::chrono::steady_clock::now();
     695          47 :         return 0;
     696             :     }
     697             : 
     698        5516 :     int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
     699        5516 :     if (deltaS < 0)
     700           3 :         deltaS += 64000;
     701        5516 :     lastSendTS_ = sendTS;
     702             : 
     703        5516 :     std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
     704        5516 :     auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count();
     705        5516 :     lastReceiveTS_ = arrival_TS;
     706             : 
     707        5516 :     *gradient = deltaR - deltaS;
     708        5516 :     *deltaT = deltaR;
     709             : 
     710        5516 :     return true;
     711             : }
     712             : 
     713             : bool
     714        5609 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
     715             : {
     716        5609 :     if (not(buf[0] & 0x10))
     717           0 :         return false;
     718             : 
     719        5609 :     uint16_t magic_word = (buf[12] << 8) + buf[13];
     720        5609 :     if (magic_word != 0xBEDE)
     721           0 :         return false;
     722             : 
     723        5609 :     uint8_t sec = buf[17] >> 2;
     724        5609 :     uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
     725        5609 :     float milli = fract / pow(2, 32);
     726             : 
     727        5609 :     *abs = sec + (milli);
     728        5609 :     return true;
     729             : }
     730             : 
     731             : uint16_t
     732         138 : SocketPair::lastSeqValOut()
     733             : {
     734         138 :     if (srtpContext_)
     735         138 :         return srtpContext_->srtp_out.seq_largest;
     736           0 :     JAMI_ERR("SRTP context not found.");
     737           0 :     return 0;
     738             : }
     739             : 
     740             : } // namespace jami

Generated by: LCOV version 1.14