LCOV - code coverage report
Current view: top level - foo/src/media - socket_pair.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 288 357 80.7 %
Date: 2026-04-01 09:29:43 Functions: 36 37 97.3 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *  Copyright (c) 2007 The FFmpeg Project
       4             :  *
       5             :  *  This program is free software: you can redistribute it and/or modify
       6             :  *  it under the terms of the GNU General Public License as published by
       7             :  *  the Free Software Foundation, either version 3 of the License, or
       8             :  *  (at your option) any later version.
       9             :  *
      10             :  *  This program is distributed in the hope that it will be useful,
      11             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13             :  *  GNU General Public License for more details.
      14             :  *
      15             :  *  You should have received a copy of the GNU General Public License
      16             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
      20             : 
      21             : #include "libav_deps.h" // THEN THIS ONE AFTER
      22             : 
      23             : #include "socket_pair.h"
      24             : #include "logger.h"
      25             : #include "connectivity/security/memory.h"
      26             : 
      27             : #include <dhtnet/ice_socket.h>
      28             : 
      29             : #include <string>
      30             : #include <algorithm>
      31             : 
      32             : extern "C" {
      33             : #include "srtp.h"
      34             : }
      35             : 
      36             : #include <cstring>
      37             : #include <stdexcept>
      38             : #include <unistd.h>
      39             : #include <sys/types.h>
      40             : 
      41             : #ifdef _WIN32
      42             : #define SOCK_NONBLOCK FIONBIO
      43             : #define poll          WSAPoll
      44             : #define close(x)      closesocket(x)
      45             : #endif
      46             : 
      47             : #ifdef __ANDROID__
      48             : #include <asm-generic/fcntl.h>
      49             : #define SOCK_NONBLOCK O_NONBLOCK
      50             : #endif
      51             : 
      52             : #ifdef __APPLE__
      53             : #include <fcntl.h>
      54             : #endif
      55             : 
      56             : // Swap 2 byte, 16 bit values:
      57             : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
      58             : 
      59             : // Swap 4 byte, 32 bit values:
      60             : #define Swap4Bytes(val) \
      61             :     ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
      62             :      | (((val) << 24) & 0xFF000000))
      63             : 
      64             : // Swap 8 byte, 64 bit values:
      65             : #define Swap8Bytes(val) \
      66             :     ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
      67             :      | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
      68             :      | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
      69             :      | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
      70             : 
      71             : namespace jami {
      72             : 
      73             : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
      74             : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
      75             : static constexpr auto UDP_HEADER_SIZE = 8;
      76             : static constexpr auto SRTP_OVERHEAD = 10;
      77             : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
      78             : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
      79             : 
      80             : enum class DataType : uint8_t { RTP = 1 << 0, RTCP = 1 << 1 };
      81             : 
      82             : class SRTPProtoContext
      83             : {
      84             : public:
      85         316 :     SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
      86         316 :     {
      87         316 :         jami_secure_memzero(&srtp_out, sizeof(srtp_out));
      88         316 :         jami_secure_memzero(&srtp_in, sizeof(srtp_in));
      89         315 :         if (out_suite && out_key) {
      90             :             // XXX: see srtp_open from libavformat/srtpproto.c
      91         315 :             if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
      92           0 :                 srtp_close();
      93           0 :                 throw std::runtime_error("Unable to set crypto on output");
      94             :             }
      95             :         }
      96             : 
      97         316 :         if (in_suite && in_key) {
      98         316 :             if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
      99           0 :                 srtp_close();
     100           0 :                 throw std::runtime_error("Unable to set crypto on input");
     101             :             }
     102             :         }
     103         316 :     }
     104             : 
     105         316 :     ~SRTPProtoContext() { srtp_close(); }
     106             : 
     107             :     SRTPContext srtp_out {};
     108             :     SRTPContext srtp_in {};
     109             :     uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
     110             : 
     111             : private:
     112         316 :     void srtp_close() noexcept
     113             :     {
     114         316 :         ff_srtp_free(&srtp_out);
     115         316 :         ff_srtp_free(&srtp_in);
     116         316 :     }
     117             : };
     118             : 
     119             : static int
     120           0 : ff_network_wait_fd(int fd)
     121             : {
     122           0 :     struct pollfd p = {fd, POLLOUT, 0};
     123           0 :     auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
     124           0 :     return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
     125             : }
     126             : 
     127             : static int
     128          12 : udp_socket_create(int family, int port)
     129             : {
     130          12 :     int udp_fd = -1;
     131             : 
     132             : #ifdef __APPLE__
     133             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     134             :     if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
     135             :         close(udp_fd);
     136             :         udp_fd = -1;
     137             :     }
     138             : #elif defined _WIN32
     139             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     140             :     u_long block = 1;
     141             :     if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
     142             :         close(udp_fd);
     143             :         udp_fd = -1;
     144             :     }
     145             : #else
     146          12 :     udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
     147             : #endif
     148             : 
     149          12 :     if (udp_fd < 0) {
     150           0 :         JAMI_ERR("socket() failed");
     151           0 :         strErr();
     152           0 :         return -1;
     153             :     }
     154             : 
     155          12 :     auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
     156          12 :     if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
     157           0 :         JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
     158           0 :         close(udp_fd);
     159           0 :         return -1;
     160             :     }
     161             : 
     162          12 :     bind_addr.setPort(port);
     163          12 :     JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
     164          12 :     if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
     165           0 :         JAMI_ERR("bind() failed");
     166           0 :         strErr();
     167           0 :         close(udp_fd);
     168           0 :         udp_fd = -1;
     169             :     }
     170             : 
     171          12 :     return udp_fd;
     172             : }
     173             : 
     174           6 : SocketPair::SocketPair(const char* uri, int localPort)
     175             : {
     176           6 :     openSockets(uri, localPort);
     177           6 : }
     178             : 
     179         308 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
     180         308 :     : rtp_sock_(std::move(rtp_sock))
     181         618 :     , rtcp_sock_(std::move(rtcp_sock))
     182             : {
     183         309 :     JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
     184             :              this,
     185             :              rtp_sock_->getCompId(),
     186             :              rtcp_sock_->getCompId());
     187             : 
     188         310 :     rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     189        5146 :         std::lock_guard l(dataBuffMutex_);
     190        5146 :         rtpDataBuff_.emplace_back(buf, buf + len);
     191        5146 :         cv_.notify_one();
     192        5146 :         return len;
     193        5146 :     });
     194         310 :     rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     195         240 :         std::lock_guard l(dataBuffMutex_);
     196         240 :         rtcpDataBuff_.emplace_back(buf, buf + len);
     197         240 :         cv_.notify_one();
     198         240 :         return len;
     199         240 :     });
     200         309 : }
     201             : 
     202         316 : SocketPair::~SocketPair()
     203             : {
     204         316 :     interrupt();
     205         316 :     closeSockets();
     206         316 :     JAMI_DBG("[%p] Instance destroyed", this);
     207         316 : }
     208             : 
     209             : bool
     210         621 : SocketPair::waitForRTCP(std::chrono::seconds interval)
     211             : {
     212         621 :     std::unique_lock lock(rtcpInfo_mutex_);
     213         621 :     return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
     214        1888 :         return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
     215        1242 :     });
     216         621 : }
     217             : 
     218             : void
     219           9 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
     220             : {
     221           9 :     if (len < sizeof(rtcpRRHeader))
     222           0 :         return;
     223             : 
     224           9 :     auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
     225           9 :     if (header->pt != 201) // 201 = RR PT
     226           0 :         return;
     227             : 
     228           9 :     std::lock_guard lock(rtcpInfo_mutex_);
     229             : 
     230           9 :     if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
     231           0 :         listRtcpRRHeader_.pop_front();
     232             :     }
     233             : 
     234           9 :     listRtcpRRHeader_.emplace_back(*header);
     235             : 
     236           9 :     cvRtcpPacketReadyToRead_.notify_one();
     237           9 : }
     238             : 
     239             : void
     240         159 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
     241             : {
     242         159 :     if (len < sizeof(rtcpREMBHeader))
     243           0 :         return;
     244             : 
     245         159 :     auto* header = reinterpret_cast<rtcpREMBHeader*>(buf);
     246         159 :     if (header->pt != 206) // 206 = REMB PT
     247           0 :         return;
     248             : 
     249         159 :     if (header->uid != 0x424D4552) // uid must be "REMB"
     250           0 :         return;
     251             : 
     252         159 :     std::lock_guard lock(rtcpInfo_mutex_);
     253             : 
     254         159 :     if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
     255           0 :         listRtcpREMBHeader_.pop_front();
     256             :     }
     257             : 
     258         159 :     listRtcpREMBHeader_.push_back(*header);
     259             : 
     260         159 :     cvRtcpPacketReadyToRead_.notify_one();
     261         159 : }
     262             : 
     263             : std::list<rtcpRRHeader>
     264         621 : SocketPair::getRtcpRR()
     265             : {
     266         621 :     std::lock_guard lock(rtcpInfo_mutex_);
     267        1242 :     return std::move(listRtcpRRHeader_);
     268         621 : }
     269             : 
     270             : std::list<rtcpREMBHeader>
     271         348 : SocketPair::getRtcpREMB()
     272             : {
     273         348 :     std::lock_guard lock(rtcpInfo_mutex_);
     274         696 :     return std::move(listRtcpREMBHeader_);
     275         348 : }
     276             : 
     277             : void
     278         316 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
     279             : {
     280         316 :     srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
     281         316 : }
     282             : 
     283             : void
     284         632 : SocketPair::interrupt()
     285             : {
     286         632 :     JAMI_WARN("[%p] Interrupting RTP sockets", this);
     287         632 :     interrupted_ = true;
     288         632 :     if (rtp_sock_)
     289         620 :         rtp_sock_->setOnRecv(nullptr);
     290         632 :     if (rtcp_sock_)
     291         620 :         rtcp_sock_->setOnRecv(nullptr);
     292         632 :     cv_.notify_all();
     293         632 :     cvRtcpPacketReadyToRead_.notify_all();
     294         632 : }
     295             : 
     296             : void
     297         910 : SocketPair::setReadBlockingMode(bool block)
     298             : {
     299         910 :     JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
     300         910 :     readBlockingMode_ = block;
     301         910 :     cv_.notify_all();
     302         910 :     cvRtcpPacketReadyToRead_.notify_all();
     303         910 : }
     304             : 
     305             : void
     306         908 : SocketPair::stopSendOp(bool state)
     307             : {
     308         908 :     noWrite_ = state;
     309         908 : }
     310             : 
     311             : void
     312         316 : SocketPair::closeSockets()
     313             : {
     314         316 :     if (rtcpHandle_ > 0 and close(rtcpHandle_))
     315           0 :         strErr();
     316         316 :     if (rtpHandle_ > 0 and close(rtpHandle_))
     317           0 :         strErr();
     318         316 : }
     319             : 
     320             : void
     321           5 : SocketPair::openSockets(const char* uri, int local_rtp_port)
     322             : {
     323           5 :     JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
     324             : 
     325             :     char hostname[256];
     326             :     char path[1024];
     327             :     int dst_rtp_port;
     328             : 
     329           6 :     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
     330             : 
     331           6 :     const int local_rtcp_port = local_rtp_port + 1;
     332           6 :     const int dst_rtcp_port = dst_rtp_port + 1;
     333             : 
     334           6 :     rtpDestAddr_ = dhtnet::IpAddr {hostname};
     335           6 :     rtpDestAddr_.setPort(dst_rtp_port);
     336           6 :     rtcpDestAddr_ = dhtnet::IpAddr {hostname};
     337           6 :     rtcpDestAddr_.setPort(dst_rtcp_port);
     338             : 
     339             :     // Open local sockets (RTP/RTCP)
     340           6 :     if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
     341           6 :         or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
     342           0 :         closeSockets();
     343           0 :         JAMI_ERR("[%p] Sockets creation failed", this);
     344           0 :         throw std::runtime_error("Sockets creation failed");
     345             :     }
     346             : 
     347           6 :     JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
     348             :               local_rtp_port,
     349             :               local_rtcp_port,
     350             :               hostname,
     351             :               dst_rtp_port,
     352             :               dst_rtcp_port);
     353           6 : }
     354             : 
     355             : MediaIOHandle*
     356         623 : SocketPair::createIOContext(const uint16_t mtu)
     357             : {
     358             :     unsigned ip_header_size;
     359         623 :     if (rtp_sock_)
     360         611 :         ip_header_size = rtp_sock_->getTransportOverhead();
     361          12 :     else if (rtpDestAddr_.getFamily() == AF_INET6)
     362           0 :         ip_header_size = 40;
     363             :     else
     364          12 :         ip_header_size = 20;
     365             :     return new MediaIOHandle(
     366         623 :         mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
     367             :         true,
     368        5806 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
     369        5232 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
     370             :         0,
     371         623 :         reinterpret_cast<void*>(this));
     372             : }
     373             : 
     374             : int
     375        5806 : SocketPair::waitForData()
     376             : {
     377             :     // System sockets
     378        5806 :     if (rtpHandle_ >= 0) {
     379             :         int ret;
     380             :         do {
     381          86 :             if (interrupted_) {
     382           4 :                 errno = EINTR;
     383           5 :                 return -1;
     384             :             }
     385             : 
     386          82 :             if (not readBlockingMode_) {
     387           1 :                 return 0;
     388             :             }
     389             : 
     390             :             // work with system socket
     391          81 :             struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
     392          81 :             ret = poll(p, 2, NET_POLL_TIMEOUT);
     393          81 :             if (ret > 0) {
     394           0 :                 ret = 0;
     395           0 :                 if (p[0].revents & POLLIN)
     396           0 :                     ret |= static_cast<int>(DataType::RTP);
     397           0 :                 if (p[1].revents & POLLIN)
     398           0 :                     ret |= static_cast<int>(DataType::RTCP);
     399             :             }
     400          81 :         } while (!ret or (ret < 0 and errno == EAGAIN));
     401             : 
     402           0 :         return ret;
     403             :     }
     404             : 
     405             :     // work with IceSocket
     406             :     {
     407        5801 :         std::unique_lock lk(dataBuffMutex_);
     408        5801 :         cv_.wait(lk, [this] {
     409       11449 :             return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
     410             :         });
     411        5801 :     }
     412             : 
     413        5801 :     if (interrupted_) {
     414           3 :         errno = EINTR;
     415           3 :         return -1;
     416             :     }
     417             : 
     418        5798 :     return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
     419             : }
     420             : 
     421             : int
     422        5558 : SocketPair::readRtpData(void* buf, int buf_size)
     423             : {
     424             :     // handle system socket
     425        5558 :     if (rtpHandle_ >= 0) {
     426             :         struct sockaddr_storage from;
     427           0 :         socklen_t from_len = sizeof(from);
     428           0 :         return static_cast<int>(recvfrom(rtpHandle_,
     429             :                                          static_cast<char*>(buf),
     430             :                                          buf_size,
     431             :                                          0,
     432             :                                          reinterpret_cast<struct sockaddr*>(&from),
     433           0 :                                          &from_len));
     434             :     }
     435             : 
     436             :     // handle ICE
     437        5558 :     std::unique_lock lk(dataBuffMutex_);
     438        5558 :     if (not rtpDataBuff_.empty()) {
     439        5143 :         auto pkt = std::move(rtpDataBuff_.front());
     440        5143 :         rtpDataBuff_.pop_front();
     441        5143 :         lk.unlock(); // to not block our ICE callbacks
     442        5143 :         int pkt_size = static_cast<int>(pkt.size());
     443        5143 :         int len = std::min(pkt_size, buf_size);
     444        5143 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     445        5143 :         return len;
     446        5143 :     }
     447             : 
     448         415 :     return 0;
     449        5558 : }
     450             : 
     451             : int
     452        5798 : SocketPair::readRtcpData(void* buf, int buf_size)
     453             : {
     454             :     // handle system socket
     455        5798 :     if (rtcpHandle_ >= 0) {
     456             :         struct sockaddr_storage from;
     457           0 :         socklen_t from_len = sizeof(from);
     458           0 :         return static_cast<int>(recvfrom(rtcpHandle_,
     459             :                                          static_cast<char*>(buf),
     460             :                                          buf_size,
     461             :                                          0,
     462             :                                          reinterpret_cast<struct sockaddr*>(&from),
     463           0 :                                          &from_len));
     464             :     }
     465             : 
     466             :     // handle ICE
     467        5798 :     std::unique_lock lk(dataBuffMutex_);
     468        5798 :     if (not rtcpDataBuff_.empty()) {
     469         240 :         auto pkt = std::move(rtcpDataBuff_.front());
     470         240 :         rtcpDataBuff_.pop_front();
     471         240 :         lk.unlock();
     472         240 :         int pkt_size = static_cast<int>(pkt.size());
     473         240 :         int len = std::min(pkt_size, buf_size);
     474         240 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     475         240 :         return len;
     476         240 :     }
     477             : 
     478        5558 :     return 0;
     479        5798 : }
     480             : 
     481             : int
     482        5806 : SocketPair::readCallback(uint8_t* buf, int buf_size)
     483             : {
     484        5806 :     auto datatype = waitForData();
     485        5806 :     if (datatype < 0)
     486           7 :         return datatype;
     487             : 
     488        5799 :     int len = 0;
     489        5799 :     bool fromRTCP = false;
     490             : 
     491        5799 :     if (datatype & static_cast<int>(DataType::RTCP)) {
     492        5798 :         len = readRtcpData(buf, buf_size);
     493        5798 :         if (len > 0) {
     494         240 :             auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
     495             :             // 201 = RR PT
     496         240 :             if (header->pt == 201) {
     497           9 :                 lastDLSR_ = Swap4Bytes(header->dlsr);
     498             :                 // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
     499           9 :                 lastRR_time = std::chrono::steady_clock::now();
     500           9 :                 saveRtcpRRPacket(buf, len);
     501             :             }
     502             :             // 206 = REMB PT
     503         231 :             else if (header->pt == 206)
     504         159 :                 saveRtcpREMBPacket(buf, len);
     505             :             // 200 = SR PT
     506          72 :             else if (header->pt == 200) {
     507             :                 // not used yet
     508             :             } else {
     509           0 :                 JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
     510             :             }
     511         240 :             fromRTCP = true;
     512             :         }
     513             :     }
     514             : 
     515             :     // No RTCP… attempt RTP
     516        5799 :     if (!len and (datatype & static_cast<int>(DataType::RTP))) {
     517        5558 :         len = readRtpData(buf, buf_size);
     518        5558 :         fromRTCP = false;
     519             :     }
     520             : 
     521        5799 :     if (len <= 0)
     522         416 :         return len;
     523             : 
     524        5383 :     if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
     525           0 :         return len;
     526             : 
     527             :     // SRTP decrypt
     528        5383 :     if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
     529        5143 :         int32_t gradient = 0;
     530        5143 :         int32_t deltaT = 0;
     531        5143 :         float abs = 0.0f;
     532        5143 :         bool res_parse = false;
     533        5143 :         bool res_delay = false;
     534             : 
     535        5143 :         res_parse = parse_RTP_ext(buf, &abs);
     536        5143 :         bool marker = (buf[1] & 0x80) >> 7;
     537             : 
     538        5143 :         if (res_parse)
     539        5143 :             res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
     540             : 
     541             :         // rtpDelayCallback_ is not set for audio
     542        5143 :         if (rtpDelayCallback_ and res_delay)
     543        5051 :             rtpDelayCallback_(gradient, deltaT);
     544             : 
     545        5143 :         auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
     546        5143 :         if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
     547           0 :             packetLossCallback_();
     548        5143 :         lastSeqNumIn_ = buf[2] << 8 | buf[3];
     549        5143 :         if (err < 0)
     550           0 :             JAMI_WARN("decrypt error %d", err);
     551             :     }
     552             : 
     553        5383 :     if (len != 0)
     554        5383 :         return len;
     555             :     else
     556           0 :         return AVERROR_EOF;
     557             : }
     558             : 
     559             : int
     560        5391 : SocketPair::writeData(uint8_t* buf, int buf_size)
     561             : {
     562        5391 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     563             : 
     564             :     // System sockets?
     565        5391 :     if (rtpHandle_ >= 0) {
     566             :         int fd;
     567             :         dhtnet::IpAddr* dest_addr;
     568             : 
     569           0 :         if (isRTCP) {
     570           0 :             fd = rtcpHandle_;
     571           0 :             dest_addr = &rtcpDestAddr_;
     572             :         } else {
     573           0 :             fd = rtpHandle_;
     574           0 :             dest_addr = &rtpDestAddr_;
     575             :         }
     576             : 
     577           0 :         auto ret = ff_network_wait_fd(fd);
     578           0 :         if (ret < 0)
     579           0 :             return ret;
     580             : 
     581           0 :         if (noWrite_)
     582           0 :             return buf_size;
     583             :         return static_cast<int>(
     584           0 :             ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength()));
     585             :     }
     586             : 
     587        5391 :     if (noWrite_)
     588           0 :         return buf_size;
     589             : 
     590             :     // IceSocket
     591        5391 :     if (isRTCP)
     592         240 :         return static_cast<int>(rtcp_sock_->send(buf, buf_size));
     593             :     else
     594        5151 :         return static_cast<int>(rtp_sock_->send(buf, buf_size));
     595             : }
     596             : 
     597             : int
     598        5232 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
     599             : {
     600        5232 :     if (noWrite_)
     601           0 :         return 0;
     602             : 
     603             :     int ret;
     604        5232 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     605             :     unsigned int ts_LSB, ts_MSB;
     606             :     double currentSRTS, currentLatency;
     607             : 
     608             :     // Encrypt?
     609        5232 :     if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
     610        5151 :         buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
     611             :                                    buf,
     612             :                                    buf_size,
     613        5151 :                                    srtpContext_->encryptbuf,
     614             :                                    sizeof(srtpContext_->encryptbuf));
     615        5151 :         if (buf_size < 0) {
     616           0 :             JAMI_WARN("encrypt error %d", buf_size);
     617           0 :             return buf_size;
     618             :         }
     619             : 
     620        5151 :         buf = srtpContext_->encryptbuf;
     621             :     }
     622             : 
     623             :     // check if we're sending an RR, if so, detect packet loss
     624             :     // buf_size gives length of buffer, not just header
     625        5232 :     if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
     626           9 :         auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
     627           9 :         rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
     628             :     }
     629             : 
     630             :     do {
     631        5232 :         if (interrupted_)
     632           0 :             return -EINTR;
     633        5232 :         ret = writeData(buf, buf_size);
     634        5232 :     } while (ret < 0 and errno == EAGAIN);
     635             : 
     636        5232 :     if (buf[1] == 200) // Sender Report
     637             :     {
     638          72 :         auto* header = reinterpret_cast<rtcpSRHeader*>(buf);
     639          72 :         ts_LSB = Swap4Bytes(header->timestampLSB);
     640          72 :         ts_MSB = Swap4Bytes(header->timestampMSB);
     641             : 
     642          72 :         currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
     643             : 
     644          72 :         if (lastSRTS_ != 0 && lastDLSR_ != 0) {
     645          14 :             if (histoLatency_.size() >= MAX_LIST_SIZE)
     646           0 :                 histoLatency_.pop_front();
     647             : 
     648          14 :             currentLatency = (currentSRTS - lastSRTS_) / 2;
     649             :             // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
     650          14 :             histoLatency_.push_back(currentLatency);
     651             :         }
     652             : 
     653          72 :         lastSRTS_ = currentSRTS;
     654             : 
     655             :         // JAMI_WARN("SENDING NEW RTCP SR !! ");
     656             : 
     657        5160 :     } else if (buf[1] == 201) // Receiver Report
     658             :     {
     659             :         // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     660             :         // JAMI_WARN("SENDING NEW RTCP RR !! ");
     661             :     }
     662             : 
     663        5232 :     return ret < 0 ? -errno : ret;
     664             : }
     665             : 
     666             : double
     667           9 : SocketPair::getLastLatency()
     668             : {
     669           9 :     if (not histoLatency_.empty())
     670           1 :         return histoLatency_.back();
     671             :     else
     672           8 :         return -1;
     673             : }
     674             : 
     675             : void
     676         141 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
     677             : {
     678         141 :     rtpDelayCallback_ = std::move(cb);
     679         141 : }
     680             : 
     681             : bool
     682        5143 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
     683             : {
     684             :     // Keep only last packet of each frame
     685        5143 :     if (not marker) {
     686          47 :         return 0;
     687             :     }
     688             : 
     689             :     // 1st frame
     690        5096 :     if (lastSendTS_ == 0.0f) {
     691          45 :         lastSendTS_ = sendTS;
     692          45 :         lastReceiveTS_ = std::chrono::steady_clock::now();
     693          45 :         return 0;
     694             :     }
     695             : 
     696        5051 :     int32_t deltaS = static_cast<int32_t>((sendTS - lastSendTS_) * 1000); // milliseconds
     697        5051 :     if (deltaS < 0)
     698           3 :         deltaS += 64000;
     699        5051 :     lastSendTS_ = sendTS;
     700             : 
     701        5051 :     std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
     702             :     auto deltaR = static_cast<int32_t>(
     703        5051 :         std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count());
     704        5051 :     lastReceiveTS_ = arrival_TS;
     705             : 
     706        5051 :     *gradient = deltaR - deltaS;
     707        5051 :     *deltaT = deltaR;
     708             : 
     709        5051 :     return true;
     710             : }
     711             : 
     712             : bool
     713        5143 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
     714             : {
     715        5143 :     if (not(buf[0] & 0x10))
     716           0 :         return false;
     717             : 
     718        5143 :     uint16_t magic_word = (buf[12] << 8) + buf[13];
     719        5143 :     if (magic_word != 0xBEDE)
     720           0 :         return false;
     721             : 
     722        5143 :     uint8_t sec = buf[17] >> 2;
     723        5143 :     uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
     724        5143 :     float milli = static_cast<float>(fract / pow(2, 32));
     725             : 
     726        5143 :     *abs = static_cast<float>(sec) + (milli);
     727        5143 :     return true;
     728             : }
     729             : 
     730             : uint16_t
     731         139 : SocketPair::lastSeqValOut()
     732             : {
     733         139 :     if (srtpContext_)
     734         139 :         return srtpContext_->srtp_out.seq_largest;
     735           0 :     JAMI_ERR("SRTP context not found.");
     736           0 :     return 0;
     737             : }
     738             : 
     739             : } // namespace jami

Generated by: LCOV version 1.14