LCOV - code coverage report
Current view: top level - src/media - socket_pair.cpp (source / functions) Coverage Total Hit
Test: jami-coverage-filtered.info Lines: 79.9 % 358 286
Test Date: 2026-06-13 09:18:46 Functions: 74.6 % 67 50

            Line data    Source code
       1              : /*
       2              :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3              :  *  Copyright (c) 2007 The FFmpeg Project
       4              :  *
       5              :  *  This program is free software: you can redistribute it and/or modify
       6              :  *  it under the terms of the GNU General Public License as published by
       7              :  *  the Free Software Foundation, either version 3 of the License, or
       8              :  *  (at your option) any later version.
       9              :  *
      10              :  *  This program is distributed in the hope that it will be useful,
      11              :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      12              :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13              :  *  GNU General Public License for more details.
      14              :  *
      15              :  *  You should have received a copy of the GNU General Public License
      16              :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      17              :  */
      18              : 
      19              : #include <dhtnet/ip_utils.h> // MUST BE INCLUDED FIRST
      20              : 
      21              : #include "libav_deps.h" // THEN THIS ONE AFTER
      22              : 
      23              : #include "socket_pair.h"
      24              : #include "logger.h"
      25              : #include "connectivity/security/memory.h"
      26              : 
      27              : #include <dhtnet/ice_socket.h>
      28              : 
      29              : #include <string>
      30              : #include <algorithm>
      31              : 
      32              : extern "C" {
      33              : #include "srtp.h"
      34              : }
      35              : 
      36              : #include <cstring>
      37              : #include <stdexcept>
      38              : #include <unistd.h>
      39              : #include <sys/types.h>
      40              : 
      41              : #ifdef _WIN32
      42              : #define SOCK_NONBLOCK FIONBIO
      43              : #define poll          WSAPoll
      44              : #define close(x)      closesocket(x)
      45              : #endif
      46              : 
      47              : #ifdef __ANDROID__
      48              : #include <asm-generic/fcntl.h>
      49              : #define SOCK_NONBLOCK O_NONBLOCK
      50              : #endif
      51              : 
      52              : #ifdef __APPLE__
      53              : #include <fcntl.h>
      54              : #endif
      55              : 
      56              : // Swap 2 byte, 16 bit values:
      57              : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
      58              : 
      59              : // Swap 4 byte, 32 bit values:
      60              : #define Swap4Bytes(val) \
      61              :     ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
      62              :      | (((val) << 24) & 0xFF000000))
      63              : 
      64              : // Swap 8 byte, 64 bit values:
      65              : #define Swap8Bytes(val) \
      66              :     ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
      67              :      | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
      68              :      | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
      69              :      | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
      70              : 
      71              : namespace jami {
      72              : 
      73              : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
      74              : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
      75              : static constexpr auto UDP_HEADER_SIZE = 8;
      76              : static constexpr auto SRTP_OVERHEAD = 10;
      77              : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
      78              : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
      79              : 
      80              : enum class DataType : uint8_t { RTP = 1 << 0, RTCP = 1 << 1 };
      81              : 
      82              : class SRTPProtoContext
      83              : {
      84              : public:
      85          318 :     SRTPProtoContext(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
      86          318 :     {
      87          318 :         jami_secure_memzero(&srtp_out, sizeof(srtp_out));
      88          318 :         jami_secure_memzero(&srtp_in, sizeof(srtp_in));
      89          318 :         if (out_suite && out_key) {
      90              :             // XXX: see srtp_open from libavformat/srtpproto.c
      91          318 :             if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
      92            0 :                 srtp_close();
      93            0 :                 throw std::runtime_error("Unable to set crypto on output");
      94              :             }
      95              :         }
      96              : 
      97          318 :         if (in_suite && in_key) {
      98          318 :             if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
      99            0 :                 srtp_close();
     100            0 :                 throw std::runtime_error("Unable to set crypto on input");
     101              :             }
     102              :         }
     103          318 :     }
     104              : 
     105          318 :     ~SRTPProtoContext() { srtp_close(); }
     106              : 
     107              :     SRTPContext srtp_out {};
     108              :     SRTPContext srtp_in {};
     109              :     uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
     110              : 
     111              : private:
     112          318 :     void srtp_close() noexcept
     113              :     {
     114          318 :         ff_srtp_free(&srtp_out);
     115          318 :         ff_srtp_free(&srtp_in);
     116          318 :     }
     117              : };
     118              : 
     119              : static int
     120            0 : ff_network_wait_fd(int fd)
     121              : {
     122            0 :     struct pollfd p = {fd, POLLOUT, 0};
     123            0 :     auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
     124            0 :     return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
     125              : }
     126              : 
     127              : static int
     128            8 : udp_socket_create(int family, int port)
     129              : {
     130            8 :     int udp_fd = -1;
     131              : 
     132              : #ifdef __APPLE__
     133              :     udp_fd = socket(family, SOCK_DGRAM, 0);
     134              :     if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
     135              :         close(udp_fd);
     136              :         udp_fd = -1;
     137              :     }
     138              : #elif defined _WIN32
     139              :     udp_fd = socket(family, SOCK_DGRAM, 0);
     140              :     u_long block = 1;
     141              :     if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
     142              :         close(udp_fd);
     143              :         udp_fd = -1;
     144              :     }
     145              : #else
     146            8 :     udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
     147              : #endif
     148              : 
     149            8 :     if (udp_fd < 0) {
     150            0 :         JAMI_ERROR("socket() failed");
     151            0 :         strErr();
     152            0 :         return -1;
     153              :     }
     154              : 
     155            8 :     auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
     156            8 :     if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
     157            0 :         JAMI_ERROR("No IPv4/IPv6 host found for family {}", family);
     158            0 :         close(udp_fd);
     159            0 :         return -1;
     160              :     }
     161              : 
     162            8 :     bind_addr.setPort(port);
     163           32 :     JAMI_LOG("use local address: {}", bind_addr.toString(true, true));
     164            8 :     if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
     165            0 :         JAMI_ERROR("bind() failed");
     166            0 :         strErr();
     167            0 :         close(udp_fd);
     168            0 :         udp_fd = -1;
     169              :     }
     170              : 
     171            8 :     return udp_fd;
     172              : }
     173              : 
     174            4 : SocketPair::SocketPair(const char* uri, int localPort)
     175              : {
     176            4 :     openSockets(uri, localPort);
     177            4 : }
     178              : 
     179          314 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
     180          313 :     : rtp_sock_(std::move(rtp_sock))
     181          628 :     , rtcp_sock_(std::move(rtcp_sock))
     182              : {
     183         1255 :     JAMI_LOG("[{}] Creating instance using ICE sockets for comp {} and {}",
     184              :              fmt::ptr(this),
     185              :              rtp_sock_->getCompId(),
     186              :              rtcp_sock_->getCompId());
     187              : 
     188          313 :     rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     189         5678 :         std::lock_guard l(dataBuffMutex_);
     190         5678 :         rtpDataBuff_.emplace_back(buf, buf + len);
     191         5678 :         cv_.notify_one();
     192         5678 :         return len;
     193         5678 :     });
     194          314 :     rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     195          261 :         std::lock_guard l(dataBuffMutex_);
     196          261 :         rtcpDataBuff_.emplace_back(buf, buf + len);
     197          261 :         cv_.notify_one();
     198          261 :         return len;
     199          261 :     });
     200          314 : }
     201              : 
     202          318 : SocketPair::~SocketPair()
     203              : {
     204          318 :     interrupt();
     205          318 :     closeSockets();
     206         1272 :     JAMI_LOG("[{}] Instance destroyed", fmt::ptr(this));
     207          318 : }
     208              : 
     209              : bool
     210          658 : SocketPair::waitForRTCP(std::chrono::seconds interval)
     211              : {
     212          658 :     std::unique_lock lock(rtcpInfo_mutex_);
     213          658 :     return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
     214         2038 :         return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
     215         1316 :     });
     216          658 : }
     217              : 
     218              : void
     219           10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
     220              : {
     221           10 :     if (len < sizeof(rtcpRRHeader))
     222            0 :         return;
     223              : 
     224           10 :     auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
     225           10 :     if (header->pt != 201) // 201 = RR PT
     226            0 :         return;
     227              : 
     228           10 :     std::lock_guard lock(rtcpInfo_mutex_);
     229              : 
     230           10 :     if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
     231            0 :         listRtcpRRHeader_.pop_front();
     232              :     }
     233              : 
     234           10 :     listRtcpRRHeader_.emplace_back(*header);
     235              : 
     236           10 :     cvRtcpPacketReadyToRead_.notify_one();
     237           10 : }
     238              : 
     239              : void
     240          176 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
     241              : {
     242          176 :     if (len < sizeof(rtcpREMBHeader))
     243            0 :         return;
     244              : 
     245          176 :     auto* header = reinterpret_cast<rtcpREMBHeader*>(buf);
     246          176 :     if (header->pt != 206) // 206 = REMB PT
     247            0 :         return;
     248              : 
     249          176 :     if (header->uid != 0x424D4552) // uid must be "REMB"
     250            0 :         return;
     251              : 
     252          176 :     std::lock_guard lock(rtcpInfo_mutex_);
     253              : 
     254          176 :     if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
     255            0 :         listRtcpREMBHeader_.pop_front();
     256              :     }
     257              : 
     258          176 :     listRtcpREMBHeader_.push_back(*header);
     259              : 
     260          176 :     cvRtcpPacketReadyToRead_.notify_one();
     261          176 : }
     262              : 
     263              : std::list<rtcpRRHeader>
     264          658 : SocketPair::getRtcpRR()
     265              : {
     266          658 :     std::lock_guard lock(rtcpInfo_mutex_);
     267         1316 :     return std::move(listRtcpRRHeader_);
     268          658 : }
     269              : 
     270              : std::list<rtcpREMBHeader>
     271          373 : SocketPair::getRtcpREMB()
     272              : {
     273          373 :     std::lock_guard lock(rtcpInfo_mutex_);
     274          746 :     return std::move(listRtcpREMBHeader_);
     275          373 : }
     276              : 
     277              : void
     278          317 : SocketPair::createSRTP(const char* out_suite, const char* out_key, const char* in_suite, const char* in_key)
     279              : {
     280          317 :     srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
     281          318 : }
     282              : 
     283              : void
     284          636 : SocketPair::interrupt()
     285              : {
     286         2544 :     JAMI_WARNING("[{}] Interrupting RTP sockets", fmt::ptr(this));
     287          636 :     interrupted_ = true;
     288          636 :     if (rtp_sock_)
     289          628 :         rtp_sock_->setOnRecv(nullptr);
     290          636 :     if (rtcp_sock_)
     291          628 :         rtcp_sock_->setOnRecv(nullptr);
     292          636 :     cv_.notify_all();
     293          636 :     cvRtcpPacketReadyToRead_.notify_all();
     294          636 : }
     295              : 
     296              : void
     297          917 : SocketPair::setReadBlockingMode(bool block)
     298              : {
     299         3668 :     JAMI_LOG("[{}] Read operations in blocking mode [{}]", fmt::ptr(this), block ? "YES" : "NO");
     300          917 :     readBlockingMode_ = block;
     301          917 :     cv_.notify_all();
     302          917 :     cvRtcpPacketReadyToRead_.notify_all();
     303          917 : }
     304              : 
     305              : void
     306          915 : SocketPair::stopSendOp(bool state)
     307              : {
     308          915 :     noWrite_ = state;
     309          915 : }
     310              : 
     311              : void
     312          318 : SocketPair::closeSockets()
     313              : {
     314          318 :     if (rtcpHandle_ > 0 and close(rtcpHandle_))
     315            0 :         strErr();
     316          318 :     if (rtpHandle_ > 0 and close(rtpHandle_))
     317            0 :         strErr();
     318          318 : }
     319              : 
     320              : void
     321            4 : SocketPair::openSockets(const char* uri, int local_rtp_port)
     322              : {
     323           16 :     JAMI_LOG("[{}] Creating rtp socket for uri {} on port {}", fmt::ptr(this), uri, local_rtp_port);
     324              : 
     325              :     char hostname[256];
     326              :     char path[1024];
     327              :     int dst_rtp_port;
     328              : 
     329            4 :     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
     330              : 
     331            4 :     const int local_rtcp_port = local_rtp_port + 1;
     332            4 :     const int dst_rtcp_port = dst_rtp_port + 1;
     333              : 
     334            4 :     rtpDestAddr_ = dhtnet::IpAddr {hostname};
     335            4 :     rtpDestAddr_.setPort(dst_rtp_port);
     336            4 :     rtcpDestAddr_ = dhtnet::IpAddr {hostname};
     337            4 :     rtcpDestAddr_.setPort(dst_rtcp_port);
     338              : 
     339              :     // Open local sockets (RTP/RTCP)
     340            4 :     if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
     341            4 :         or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
     342            0 :         closeSockets();
     343            0 :         JAMI_ERROR("[{}] Sockets creation failed", fmt::ptr(this));
     344            0 :         throw std::runtime_error("Sockets creation failed");
     345              :     }
     346              : 
     347           16 :     JAMI_WARNING("SocketPair: local({},{}) / {}({},{})",
     348              :                  local_rtp_port,
     349              :                  local_rtcp_port,
     350              :                  hostname,
     351              :                  dst_rtp_port,
     352              :                  dst_rtcp_port);
     353            4 : }
     354              : 
     355              : MediaIOHandle*
     356          628 : SocketPair::createIOContext(const uint16_t mtu)
     357              : {
     358              :     unsigned ip_header_size;
     359          628 :     if (rtp_sock_)
     360          620 :         ip_header_size = rtp_sock_->getTransportOverhead();
     361            8 :     else if (rtpDestAddr_.getFamily() == AF_INET6)
     362            0 :         ip_header_size = 40;
     363              :     else
     364            8 :         ip_header_size = 20;
     365              :     return new MediaIOHandle(
     366          628 :         mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
     367              :         true,
     368         6377 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->readCallback(buf, len); },
     369         6397 :         [](void* sp, uint8_t* buf, int len) { return static_cast<SocketPair*>(sp)->writeCallback(buf, len); },
     370              :         0,
     371         1884 :         reinterpret_cast<void*>(this));
     372              : }
     373              : 
     374              : int
     375         6377 : SocketPair::waitForData()
     376              : {
     377              :     // System sockets
     378         6377 :     if (rtpHandle_ >= 0) {
     379              :         int ret;
     380              :         do {
     381           85 :             if (interrupted_) {
     382            4 :                 errno = EINTR;
     383            4 :                 return -1;
     384              :             }
     385              : 
     386           81 :             if (not readBlockingMode_) {
     387            0 :                 return 0;
     388              :             }
     389              : 
     390              :             // work with system socket
     391           81 :             struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
     392           81 :             ret = poll(p, 2, NET_POLL_TIMEOUT);
     393           81 :             if (ret > 0) {
     394            0 :                 ret = 0;
     395            0 :                 if (p[0].revents & POLLIN)
     396            0 :                     ret |= static_cast<int>(DataType::RTP);
     397            0 :                 if (p[1].revents & POLLIN)
     398            0 :                     ret |= static_cast<int>(DataType::RTCP);
     399              :             }
     400           81 :         } while (!ret or (ret < 0 and errno == EAGAIN));
     401              : 
     402            0 :         return ret;
     403              :     }
     404              : 
     405              :     // work with IceSocket
     406              :     {
     407         6373 :         std::unique_lock lk(dataBuffMutex_);
     408         6373 :         cv_.wait(lk, [this] {
     409        12584 :             return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty() or not readBlockingMode_;
     410              :         });
     411         6373 :     }
     412              : 
     413         6373 :     if (interrupted_) {
     414            1 :         errno = EINTR;
     415            1 :         return -1;
     416              :     }
     417              : 
     418         6371 :     return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
     419              : }
     420              : 
     421              : int
     422         6111 : SocketPair::readRtpData(void* buf, int buf_size)
     423              : {
     424              :     // handle system socket
     425         6111 :     if (rtpHandle_ >= 0) {
     426              :         struct sockaddr_storage from;
     427            0 :         socklen_t from_len = sizeof(from);
     428            0 :         return static_cast<int>(recvfrom(rtpHandle_,
     429              :                                          static_cast<char*>(buf),
     430              :                                          buf_size,
     431              :                                          0,
     432              :                                          reinterpret_cast<struct sockaddr*>(&from),
     433            0 :                                          &from_len));
     434              :     }
     435              : 
     436              :     // handle ICE
     437         6111 :     std::unique_lock lk(dataBuffMutex_);
     438         6111 :     if (not rtpDataBuff_.empty()) {
     439         5675 :         auto pkt = std::move(rtpDataBuff_.front());
     440         5675 :         rtpDataBuff_.pop_front();
     441         5675 :         lk.unlock(); // to not block our ICE callbacks
     442         5675 :         int pkt_size = static_cast<int>(pkt.size());
     443         5675 :         int len = std::min(pkt_size, buf_size);
     444         5675 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     445         5675 :         return len;
     446         5675 :     }
     447              : 
     448          436 :     return 0;
     449         6111 : }
     450              : 
     451              : int
     452         6371 : SocketPair::readRtcpData(void* buf, int buf_size)
     453              : {
     454              :     // handle system socket
     455         6371 :     if (rtcpHandle_ >= 0) {
     456              :         struct sockaddr_storage from;
     457            0 :         socklen_t from_len = sizeof(from);
     458            0 :         return static_cast<int>(recvfrom(rtcpHandle_,
     459              :                                          static_cast<char*>(buf),
     460              :                                          buf_size,
     461              :                                          0,
     462              :                                          reinterpret_cast<struct sockaddr*>(&from),
     463            0 :                                          &from_len));
     464              :     }
     465              : 
     466              :     // handle ICE
     467         6371 :     std::unique_lock lk(dataBuffMutex_);
     468         6372 :     if (not rtcpDataBuff_.empty()) {
     469          261 :         auto pkt = std::move(rtcpDataBuff_.front());
     470          261 :         rtcpDataBuff_.pop_front();
     471          261 :         lk.unlock();
     472          261 :         int pkt_size = static_cast<int>(pkt.size());
     473          261 :         int len = std::min(pkt_size, buf_size);
     474          261 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     475          261 :         return len;
     476          261 :     }
     477              : 
     478         6111 :     return 0;
     479         6372 : }
     480              : 
     481              : int
     482         6377 : SocketPair::readCallback(uint8_t* buf, int buf_size)
     483              : {
     484         6377 :     auto datatype = waitForData();
     485         6376 :     if (datatype < 0)
     486            5 :         return datatype;
     487              : 
     488         6371 :     int len = 0;
     489         6371 :     bool fromRTCP = false;
     490              : 
     491         6371 :     if (datatype & static_cast<int>(DataType::RTCP)) {
     492         6371 :         len = readRtcpData(buf, buf_size);
     493         6372 :         if (len > 0) {
     494          261 :             auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
     495              :             // 201 = RR PT
     496          261 :             if (header->pt == 201) {
     497           10 :                 lastDLSR_ = Swap4Bytes(header->dlsr);
     498              :                 // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
     499           10 :                 lastRR_time = std::chrono::steady_clock::now();
     500           10 :                 saveRtcpRRPacket(buf, len);
     501              :             }
     502              :             // 206 = REMB PT
     503          251 :             else if (header->pt == 206)
     504          176 :                 saveRtcpREMBPacket(buf, len);
     505              :             // 200 = SR PT
     506           75 :             else if (header->pt == 200) {
     507              :                 // not used yet
     508              :             } else {
     509            0 :                 unsigned pt = header->pt;
     510            0 :                 JAMI_LOG("Unable to read RTCP: unknown packet type {}", pt);
     511              :             }
     512          261 :             fromRTCP = true;
     513              :         }
     514              :     }
     515              : 
     516              :     // No RTCP… attempt RTP
     517         6372 :     if (!len and (datatype & static_cast<int>(DataType::RTP))) {
     518         6111 :         len = readRtpData(buf, buf_size);
     519         6111 :         fromRTCP = false;
     520              :     }
     521              : 
     522         6372 :     if (len <= 0)
     523          436 :         return len;
     524              : 
     525         5936 :     if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
     526            0 :         return len;
     527              : 
     528              :     // SRTP decrypt
     529         5936 :     if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
     530         5675 :         int32_t gradient = 0;
     531         5675 :         int32_t deltaT = 0;
     532         5675 :         float abs = 0.0f;
     533         5675 :         bool res_parse = false;
     534         5675 :         bool res_delay = false;
     535              : 
     536         5675 :         res_parse = parse_RTP_ext(buf, &abs);
     537         5675 :         bool marker = (buf[1] & 0x80) >> 7;
     538              : 
     539         5675 :         if (res_parse)
     540         5675 :             res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
     541              : 
     542              :         // rtpDelayCallback_ is not set for audio
     543         5675 :         if (rtpDelayCallback_ and res_delay)
     544         5579 :             rtpDelayCallback_(gradient, deltaT);
     545              : 
     546         5675 :         auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
     547         5675 :         if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
     548            0 :             packetLossCallback_();
     549         5675 :         lastSeqNumIn_ = buf[2] << 8 | buf[3];
     550         5675 :         if (err < 0)
     551            0 :             JAMI_WARNING("decrypt error {}", err);
     552              :     }
     553              : 
     554         5936 :     if (len != 0)
     555         5936 :         return len;
     556              :     else
     557            0 :         return AVERROR_EOF;
     558              : }
     559              : 
     560              : int
     561         5945 : SocketPair::writeData(uint8_t* buf, int buf_size)
     562              : {
     563         5945 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     564              : 
     565              :     // System sockets?
     566         5945 :     if (rtpHandle_ >= 0) {
     567              :         int fd;
     568              :         dhtnet::IpAddr* dest_addr;
     569              : 
     570            0 :         if (isRTCP) {
     571            0 :             fd = rtcpHandle_;
     572            0 :             dest_addr = &rtcpDestAddr_;
     573              :         } else {
     574            0 :             fd = rtpHandle_;
     575            0 :             dest_addr = &rtpDestAddr_;
     576              :         }
     577              : 
     578            0 :         auto ret = ff_network_wait_fd(fd);
     579            0 :         if (ret < 0)
     580            0 :             return ret;
     581              : 
     582            0 :         if (noWrite_)
     583            0 :             return buf_size;
     584              :         return static_cast<int>(
     585            0 :             ::sendto(fd, reinterpret_cast<const char*>(buf), buf_size, 0, *dest_addr, dest_addr->getLength()));
     586              :     }
     587              : 
     588         5945 :     if (noWrite_)
     589            0 :         return buf_size;
     590              : 
     591              :     // IceSocket
     592         5945 :     if (isRTCP)
     593          261 :         return static_cast<int>(rtcp_sock_->send(buf, buf_size));
     594              :     else
     595         5684 :         return static_cast<int>(rtp_sock_->send(buf, buf_size));
     596              : }
     597              : 
     598              : int
     599         5769 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
     600              : {
     601         5769 :     if (noWrite_)
     602            0 :         return 0;
     603              : 
     604              :     int ret;
     605         5769 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     606              :     unsigned int ts_LSB, ts_MSB;
     607              :     double currentSRTS, currentLatency;
     608              : 
     609              :     // Encrypt?
     610         5769 :     if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
     611         5684 :         buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
     612              :                                    buf,
     613              :                                    buf_size,
     614         5684 :                                    srtpContext_->encryptbuf,
     615              :                                    sizeof(srtpContext_->encryptbuf));
     616         5684 :         if (buf_size < 0) {
     617            0 :             JAMI_WARNING("encrypt error {}", buf_size);
     618            0 :             return buf_size;
     619              :         }
     620              : 
     621         5684 :         buf = srtpContext_->encryptbuf;
     622              :     }
     623              : 
     624              :     // check if we're sending an RR, if so, detect packet loss
     625              :     // buf_size gives length of buffer, not just header
     626         5769 :     if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
     627           10 :         auto* header = reinterpret_cast<rtcpRRHeader*>(buf);
     628           10 :         rtcpPacketLoss_ = (header->pt == 201 && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
     629              :     }
     630              : 
     631              :     do {
     632         5769 :         if (interrupted_)
     633            0 :             return -EINTR;
     634         5769 :         ret = writeData(buf, buf_size);
     635         5769 :     } while (ret < 0 and errno == EAGAIN);
     636              : 
     637         5769 :     if (buf[1] == 200) // Sender Report
     638              :     {
     639           75 :         auto* header = reinterpret_cast<rtcpSRHeader*>(buf);
     640           75 :         ts_LSB = Swap4Bytes(header->timestampLSB);
     641           75 :         ts_MSB = Swap4Bytes(header->timestampMSB);
     642              : 
     643           75 :         currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
     644              : 
     645           75 :         if (lastSRTS_ != 0 && lastDLSR_ != 0) {
     646           14 :             if (histoLatency_.size() >= MAX_LIST_SIZE)
     647            0 :                 histoLatency_.pop_front();
     648              : 
     649           14 :             currentLatency = (currentSRTS - lastSRTS_) / 2;
     650              :             // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
     651           14 :             histoLatency_.push_back(currentLatency);
     652              :         }
     653              : 
     654           75 :         lastSRTS_ = currentSRTS;
     655              : 
     656              :         // JAMI_WARN("SENDING NEW RTCP SR !! ");
     657              : 
     658         5694 :     } else if (buf[1] == 201) // Receiver Report
     659              :     {
     660              :         // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     661              :         // JAMI_WARN("SENDING NEW RTCP RR !! ");
     662              :     }
     663              : 
     664         5769 :     return ret < 0 ? -errno : ret;
     665              : }
     666              : 
     667              : double
     668           10 : SocketPair::getLastLatency()
     669              : {
     670           10 :     if (not histoLatency_.empty())
     671            0 :         return histoLatency_.back();
     672              :     else
     673           10 :         return -1;
     674              : }
     675              : 
     676              : void
     677          142 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
     678              : {
     679          142 :     rtpDelayCallback_ = std::move(cb);
     680          142 : }
     681              : 
     682              : bool
     683         5675 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
     684              : {
     685              :     // Keep only last packet of each frame
     686         5675 :     if (not marker) {
     687           49 :         return false;
     688              :     }
     689              : 
     690              :     // 1st frame
     691         5626 :     if (lastSendTS_ == 0.0f) {
     692           47 :         lastSendTS_ = sendTS;
     693           47 :         lastReceiveTS_ = std::chrono::steady_clock::now();
     694           47 :         return false;
     695              :     }
     696              : 
     697         5579 :     int32_t deltaS = static_cast<int32_t>((sendTS - lastSendTS_) * 1000); // milliseconds
     698         5579 :     if (deltaS < 0)
     699            4 :         deltaS += 64000;
     700         5579 :     lastSendTS_ = sendTS;
     701              : 
     702         5579 :     std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
     703              :     auto deltaR = static_cast<int32_t>(
     704         5579 :         std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count());
     705         5579 :     lastReceiveTS_ = arrival_TS;
     706              : 
     707         5579 :     *gradient = deltaR - deltaS;
     708         5579 :     *deltaT = deltaR;
     709              : 
     710         5579 :     return true;
     711              : }
     712              : 
     713              : bool
     714         5675 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
     715              : {
     716         5675 :     if (not(buf[0] & 0x10))
     717            0 :         return false;
     718              : 
     719         5675 :     uint16_t magic_word = (buf[12] << 8) + buf[13];
     720         5675 :     if (magic_word != 0xBEDE)
     721            0 :         return false;
     722              : 
     723         5675 :     uint8_t sec = buf[17] >> 2;
     724         5675 :     uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
     725         5675 :     float milli = static_cast<float>(fract / pow(2, 32));
     726              : 
     727         5675 :     *abs = static_cast<float>(sec) + (milli);
     728         5675 :     return true;
     729              : }
     730              : 
     731              : uint16_t
     732          140 : SocketPair::lastSeqValOut()
     733              : {
     734          140 :     if (srtpContext_)
     735          140 :         return srtpContext_->srtp_out.seq_largest;
     736            0 :     JAMI_ERROR("SRTP context not found.");
     737            0 :     return 0;
     738              : }
     739              : 
     740              : } // namespace jami
        

Generated by: LCOV version 2.0-1