LCOV - code coverage report
Current view: top level - src/media - socket_pair.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 292 363 80.4 %
Date: 2024-12-21 08:56:24 Functions: 36 37 97.3 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *  Copyright (c) 2007 The FFmpeg Project
       4             :  *
       5             :  *  This program is free software: you can redistribute it and/or modify
       6             :  *  it under the terms of the GNU General Public License as published by
       7             :  *  the Free Software Foundation, either version 3 of the License, or
       8             :  *  (at your option) any later version.
       9             :  *
      10             :  *  This program is distributed in the hope that it will be useful,
      11             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13             :  *  GNU General Public License for more details.
      14             :  *
      15             :  *  You should have received a copy of the GNU General Public License
      16             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : #include <dhtnet/ip_utils.h>   // MUST BE INCLUDED FIRST
      20             : 
      21             : #include "libav_deps.h" // THEN THIS ONE AFTER
      22             : 
      23             : #include "socket_pair.h"
      24             : #include "libav_utils.h"
      25             : #include "logger.h"
      26             : #include "connectivity/security/memory.h"
      27             : 
      28             : #include <dhtnet/ice_socket.h>
      29             : 
      30             : #include <iostream>
      31             : #include <string>
      32             : #include <algorithm>
      33             : #include <iterator>
      34             : 
      35             : extern "C" {
      36             : #include "srtp.h"
      37             : }
      38             : 
      39             : #include <cstring>
      40             : #include <stdexcept>
      41             : #include <unistd.h>
      42             : #include <sys/types.h>
      43             : #include <ciso646> // fix windows compiler bug
      44             : 
      45             : #ifdef _WIN32
      46             : #define SOCK_NONBLOCK FIONBIO
      47             : #define poll          WSAPoll
      48             : #define close(x)      closesocket(x)
      49             : #endif
      50             : 
      51             : #ifdef __ANDROID__
      52             : #include <asm-generic/fcntl.h>
      53             : #define SOCK_NONBLOCK O_NONBLOCK
      54             : #endif
      55             : 
      56             : #ifdef __APPLE__
      57             : #include <fcntl.h>
      58             : #endif
      59             : 
      60             : // Swap 2 byte, 16 bit values:
      61             : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
      62             : 
      63             : // Swap 4 byte, 32 bit values:
      64             : #define Swap4Bytes(val) \
      65             :     ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
      66             :      | (((val) << 24) & 0xFF000000))
      67             : 
      68             : // Swap 8 byte, 64 bit values:
      69             : #define Swap8Bytes(val) \
      70             :     ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
      71             :      | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
      72             :      | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
      73             :      | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
      74             : 
      75             : namespace jami {
      76             : 
      77             : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
      78             : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
      79             : static constexpr auto UDP_HEADER_SIZE = 8;
      80             : static constexpr auto SRTP_OVERHEAD = 10;
      81             : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
      82             : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
      83             : 
      84             : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
      85             : 
      86             : class SRTPProtoContext
      87             : {
      88             : public:
      89         340 :     SRTPProtoContext(const char* out_suite,
      90             :                      const char* out_key,
      91             :                      const char* in_suite,
      92             :                      const char* in_key)
      93         340 :     {
      94         340 :         ring_secure_memzero(&srtp_out, sizeof(srtp_out));
      95         340 :         ring_secure_memzero(&srtp_in, sizeof(srtp_in));
      96         340 :         if (out_suite && out_key) {
      97             :             // XXX: see srtp_open from libavformat/srtpproto.c
      98         340 :             if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
      99           0 :                 srtp_close();
     100           0 :                 throw std::runtime_error("Unable to set crypto on output");
     101             :             }
     102             :         }
     103             : 
     104         340 :         if (in_suite && in_key) {
     105         340 :             if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
     106           0 :                 srtp_close();
     107           0 :                 throw std::runtime_error("Unable to set crypto on input");
     108             :             }
     109             :         }
     110         340 :     }
     111             : 
     112         340 :     ~SRTPProtoContext() { srtp_close(); }
     113             : 
     114             :     SRTPContext srtp_out {};
     115             :     SRTPContext srtp_in {};
     116             :     uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
     117             : 
     118             : private:
     119         340 :     void srtp_close() noexcept
     120             :     {
     121         340 :         ff_srtp_free(&srtp_out);
     122         340 :         ff_srtp_free(&srtp_in);
     123         340 :     }
     124             : };
     125             : 
     126             : static int
     127           0 : ff_network_wait_fd(int fd)
     128             : {
     129           0 :     struct pollfd p = {fd, POLLOUT, 0};
     130           0 :     auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
     131           0 :     return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
     132             : }
     133             : 
     134             : static int
     135          24 : udp_socket_create(int family, int port)
     136             : {
     137          24 :     int udp_fd = -1;
     138             : 
     139             : #ifdef __APPLE__
     140             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     141             :     if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
     142             :         close(udp_fd);
     143             :         udp_fd = -1;
     144             :     }
     145             : #elif defined _WIN32
     146             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     147             :     u_long block = 1;
     148             :     if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
     149             :         close(udp_fd);
     150             :         udp_fd = -1;
     151             :     }
     152             : #else
     153          24 :     udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
     154             : #endif
     155             : 
     156          24 :     if (udp_fd < 0) {
     157           0 :         JAMI_ERR("socket() failed");
     158           0 :         strErr();
     159           0 :         return -1;
     160             :     }
     161             : 
     162          24 :     auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
     163          24 :     if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
     164           0 :         JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
     165           0 :         close(udp_fd);
     166           0 :         return -1;
     167             :     }
     168             : 
     169          24 :     bind_addr.setPort(port);
     170          24 :     JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
     171          24 :     if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
     172           0 :         JAMI_ERR("bind() failed");
     173           0 :         strErr();
     174           0 :         close(udp_fd);
     175           0 :         udp_fd = -1;
     176             :     }
     177             : 
     178          24 :     return udp_fd;
     179             : }
     180             : 
     181          12 : SocketPair::SocketPair(const char* uri, int localPort)
     182             : {
     183          12 :     openSockets(uri, localPort);
     184          12 : }
     185             : 
     186         328 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
     187         328 :     : rtp_sock_(std::move(rtp_sock))
     188         656 :     , rtcp_sock_(std::move(rtcp_sock))
     189             : {
     190         328 :     JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
     191             :              this,
     192             :              rtp_sock_->getCompId(),
     193             :              rtcp_sock_->getCompId());
     194             : 
     195         328 :     rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     196        5333 :         std::lock_guard l(dataBuffMutex_);
     197        5333 :         rtpDataBuff_.emplace_back(buf, buf + len);
     198        5333 :         cv_.notify_one();
     199        5333 :         return len;
     200        5333 :     });
     201         328 :     rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     202         244 :         std::lock_guard l(dataBuffMutex_);
     203         244 :         rtcpDataBuff_.emplace_back(buf, buf + len);
     204         244 :         cv_.notify_one();
     205         244 :         return len;
     206         244 :     });
     207         328 : }
     208             : 
     209         340 : SocketPair::~SocketPair()
     210             : {
     211         340 :     interrupt();
     212         340 :     closeSockets();
     213         340 :     JAMI_DBG("[%p] Instance destroyed", this);
     214         340 : }
     215             : 
     216             : bool
     217         730 : SocketPair::waitForRTCP(std::chrono::seconds interval)
     218             : {
     219         730 :     std::unique_lock lock(rtcpInfo_mutex_);
     220         730 :     return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
     221        2152 :         return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
     222        1460 :     });
     223         730 : }
     224             : 
     225             : void
     226          10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
     227             : {
     228          10 :     if (len < sizeof(rtcpRRHeader))
     229           0 :         return;
     230             : 
     231          10 :     auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     232          10 :     if (header->pt != 201) // 201 = RR PT
     233           0 :         return;
     234             : 
     235          10 :     std::lock_guard lock(rtcpInfo_mutex_);
     236             : 
     237          10 :     if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
     238           0 :         listRtcpRRHeader_.pop_front();
     239             :     }
     240             : 
     241          10 :     listRtcpRRHeader_.emplace_back(*header);
     242             : 
     243          10 :     cvRtcpPacketReadyToRead_.notify_one();
     244          10 : }
     245             : 
     246             : void
     247         163 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
     248             : {
     249         163 :     if (len < sizeof(rtcpREMBHeader))
     250           0 :         return;
     251             : 
     252         163 :     auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
     253         163 :     if (header->pt != 206) // 206 = REMB PT
     254           0 :         return;
     255             : 
     256         163 :     if (header->uid != 0x424D4552) // uid must be "REMB"
     257           0 :         return;
     258             : 
     259         163 :     std::lock_guard lock(rtcpInfo_mutex_);
     260             : 
     261         163 :     if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
     262           0 :         listRtcpREMBHeader_.pop_front();
     263             :     }
     264             : 
     265         163 :     listRtcpREMBHeader_.push_back(*header);
     266             : 
     267         163 :     cvRtcpPacketReadyToRead_.notify_one();
     268         163 : }
     269             : 
     270             : std::list<rtcpRRHeader>
     271         730 : SocketPair::getRtcpRR()
     272             : {
     273         730 :     std::lock_guard lock(rtcpInfo_mutex_);
     274        1460 :     return std::move(listRtcpRRHeader_);
     275         730 : }
     276             : 
     277             : std::list<rtcpREMBHeader>
     278         393 : SocketPair::getRtcpREMB()
     279             : {
     280         393 :     std::lock_guard lock(rtcpInfo_mutex_);
     281         786 :     return std::move(listRtcpREMBHeader_);
     282         393 : }
     283             : 
     284             : void
     285         340 : SocketPair::createSRTP(const char* out_suite,
     286             :                        const char* out_key,
     287             :                        const char* in_suite,
     288             :                        const char* in_key)
     289             : {
     290         340 :     srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
     291         340 : }
     292             : 
     293             : void
     294         680 : SocketPair::interrupt()
     295             : {
     296         680 :     JAMI_WARN("[%p] Interrupting RTP sockets", this);
     297         680 :     interrupted_ = true;
     298         680 :     if (rtp_sock_)
     299         656 :         rtp_sock_->setOnRecv(nullptr);
     300         680 :     if (rtcp_sock_)
     301         656 :         rtcp_sock_->setOnRecv(nullptr);
     302         680 :     cv_.notify_all();
     303         680 :     cvRtcpPacketReadyToRead_.notify_all();
     304         680 : }
     305             : 
     306             : void
     307         982 : SocketPair::setReadBlockingMode(bool block)
     308             : {
     309         982 :     JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
     310         982 :     readBlockingMode_ = block;
     311         982 :     cv_.notify_all();
     312         982 :     cvRtcpPacketReadyToRead_.notify_all();
     313         982 : }
     314             : 
     315             : void
     316         983 : SocketPair::stopSendOp(bool state)
     317             : {
     318         983 :     noWrite_ = state;
     319         983 : }
     320             : 
     321             : void
     322         340 : SocketPair::closeSockets()
     323             : {
     324         340 :     if (rtcpHandle_ > 0 and close(rtcpHandle_))
     325           0 :         strErr();
     326         340 :     if (rtpHandle_ > 0 and close(rtpHandle_))
     327           0 :         strErr();
     328         340 : }
     329             : 
     330             : void
     331          12 : SocketPair::openSockets(const char* uri, int local_rtp_port)
     332             : {
     333          12 :     JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
     334             : 
     335             :     char hostname[256];
     336             :     char path[1024];
     337             :     int dst_rtp_port;
     338             : 
     339          12 :     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
     340             : 
     341          12 :     const int local_rtcp_port = local_rtp_port + 1;
     342          12 :     const int dst_rtcp_port = dst_rtp_port + 1;
     343             : 
     344          12 :     rtpDestAddr_ = dhtnet::IpAddr {hostname};
     345          12 :     rtpDestAddr_.setPort(dst_rtp_port);
     346          12 :     rtcpDestAddr_ = dhtnet::IpAddr {hostname};
     347          12 :     rtcpDestAddr_.setPort(dst_rtcp_port);
     348             : 
     349             :     // Open local sockets (RTP/RTCP)
     350          12 :     if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
     351          12 :         or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
     352           0 :         closeSockets();
     353           0 :         JAMI_ERR("[%p] Sockets creation failed", this);
     354           0 :         throw std::runtime_error("Sockets creation failed");
     355             :     }
     356             : 
     357          12 :     JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
     358             :               local_rtp_port,
     359             :               local_rtcp_port,
     360             :               hostname,
     361             :               dst_rtp_port,
     362             :               dst_rtcp_port);
     363          12 : }
     364             : 
     365             : MediaIOHandle*
     366         677 : SocketPair::createIOContext(const uint16_t mtu)
     367             : {
     368             :     unsigned ip_header_size;
     369         677 :     if (rtp_sock_)
     370         657 :         ip_header_size = rtp_sock_->getTransportOverhead();
     371          20 :     else if (rtpDestAddr_.getFamily() == AF_INET6)
     372           0 :         ip_header_size = 40;
     373             :     else
     374          20 :         ip_header_size = 20;
     375             :     return new MediaIOHandle(
     376         677 :         mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
     377             :         true,
     378        6030 :         [](void* sp, uint8_t* buf, int len) {
     379        6030 :             return static_cast<SocketPair*>(sp)->readCallback(buf, len);
     380             :         },
     381        5427 :         [](void* sp, uint8_t* buf, int len) {
     382        5427 :             return static_cast<SocketPair*>(sp)->writeCallback(buf, len);
     383             :         },
     384             :         0,
     385         677 :         reinterpret_cast<void*>(this));
     386             : }
     387             : 
     388             : int
     389        6030 : SocketPair::waitForData()
     390             : {
     391             :     // System sockets
     392        6030 :     if (rtpHandle_ >= 0) {
     393             :         int ret;
     394             :         do {
     395         155 :             if (interrupted_) {
     396           8 :                 errno = EINTR;
     397          12 :                 return -1;
     398             :             }
     399             : 
     400         147 :             if (not readBlockingMode_) {
     401           4 :                 return 0;
     402             :             }
     403             : 
     404             :             // work with system socket
     405         143 :             struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
     406         143 :             ret = poll(p, 2, NET_POLL_TIMEOUT);
     407         143 :             if (ret > 0) {
     408           0 :                 ret = 0;
     409           0 :                 if (p[0].revents & POLLIN)
     410           0 :                     ret |= static_cast<int>(DataType::RTP);
     411           0 :                 if (p[1].revents & POLLIN)
     412           0 :                     ret |= static_cast<int>(DataType::RTCP);
     413             :             }
     414         143 :         } while (!ret or (ret < 0 and errno == EAGAIN));
     415             : 
     416           0 :         return ret;
     417             :     }
     418             : 
     419             :     // work with IceSocket
     420             :     {
     421        6018 :         std::unique_lock lk(dataBuffMutex_);
     422        6018 :         cv_.wait(lk, [this] {
     423       23654 :             return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty()
     424       23652 :                    or not readBlockingMode_;
     425             :         });
     426        6018 :     }
     427             : 
     428        6018 :     if (interrupted_) {
     429          34 :         errno = EINTR;
     430          34 :         return -1;
     431             :     }
     432             : 
     433        5984 :     return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
     434             : }
     435             : 
     436             : int
     437        5740 : SocketPair::readRtpData(void* buf, int buf_size)
     438             : {
     439             :     // handle system socket
     440        5740 :     if (rtpHandle_ >= 0) {
     441             :         struct sockaddr_storage from;
     442           0 :         socklen_t from_len = sizeof(from);
     443           0 :         return recvfrom(rtpHandle_,
     444             :                         static_cast<char*>(buf),
     445             :                         buf_size,
     446             :                         0,
     447             :                         reinterpret_cast<struct sockaddr*>(&from),
     448           0 :                         &from_len);
     449             :     }
     450             : 
     451             :     // handle ICE
     452        5740 :     std::unique_lock lk(dataBuffMutex_);
     453        5740 :     if (not rtpDataBuff_.empty()) {
     454        5333 :         auto pkt = std::move(rtpDataBuff_.front());
     455        5333 :         rtpDataBuff_.pop_front();
     456        5333 :         lk.unlock(); // to not block our ICE callbacks
     457        5333 :         int pkt_size = pkt.size();
     458        5333 :         int len = std::min(pkt_size, buf_size);
     459        5333 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     460        5333 :         return len;
     461        5333 :     }
     462             : 
     463         407 :     return 0;
     464        5740 : }
     465             : 
     466             : int
     467        5984 : SocketPair::readRtcpData(void* buf, int buf_size)
     468             : {
     469             :     // handle system socket
     470        5984 :     if (rtcpHandle_ >= 0) {
     471             :         struct sockaddr_storage from;
     472           0 :         socklen_t from_len = sizeof(from);
     473           0 :         return recvfrom(rtcpHandle_,
     474             :                         static_cast<char*>(buf),
     475             :                         buf_size,
     476             :                         0,
     477             :                         reinterpret_cast<struct sockaddr*>(&from),
     478           0 :                         &from_len);
     479             :     }
     480             : 
     481             :     // handle ICE
     482        5984 :     std::unique_lock lk(dataBuffMutex_);
     483        5984 :     if (not rtcpDataBuff_.empty()) {
     484         244 :         auto pkt = std::move(rtcpDataBuff_.front());
     485         244 :         rtcpDataBuff_.pop_front();
     486         244 :         lk.unlock();
     487         244 :         int pkt_size = pkt.size();
     488         244 :         int len = std::min(pkt_size, buf_size);
     489         244 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     490         244 :         return len;
     491         244 :     }
     492             : 
     493        5740 :     return 0;
     494        5984 : }
     495             : 
     496             : int
     497        6030 : SocketPair::readCallback(uint8_t* buf, int buf_size)
     498             : {
     499        6030 :     auto datatype = waitForData();
     500        6030 :     if (datatype < 0)
     501          42 :         return datatype;
     502             : 
     503        5988 :     int len = 0;
     504        5988 :     bool fromRTCP = false;
     505             : 
     506        5988 :     if (datatype & static_cast<int>(DataType::RTCP)) {
     507        5984 :         len = readRtcpData(buf, buf_size);
     508        5984 :         if (len > 0) {
     509         244 :             auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     510             :             // 201 = RR PT
     511         244 :             if (header->pt == 201) {
     512          10 :                 lastDLSR_ = Swap4Bytes(header->dlsr);
     513             :                 // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
     514          10 :                 lastRR_time = std::chrono::steady_clock::now();
     515          10 :                 saveRtcpRRPacket(buf, len);
     516             :             }
     517             :             // 206 = REMB PT
     518         234 :             else if (header->pt == 206)
     519         163 :                 saveRtcpREMBPacket(buf, len);
     520             :             // 200 = SR PT
     521          71 :             else if (header->pt == 200) {
     522             :                 // not used yet
     523             :             } else {
     524           0 :                 JAMI_DBG("Unable to read RTCP: unknown packet type %u", header->pt);
     525             :             }
     526         244 :             fromRTCP = true;
     527             :         }
     528             :     }
     529             : 
     530             :     // No RTCP… attempt RTP
     531        5988 :     if (!len and (datatype & static_cast<int>(DataType::RTP))) {
     532        5740 :         len = readRtpData(buf, buf_size);
     533        5740 :         fromRTCP = false;
     534             :     }
     535             : 
     536        5988 :     if (len <= 0)
     537         411 :         return len;
     538             : 
     539        5577 :     if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
     540           0 :         return len;
     541             : 
     542             :     // SRTP decrypt
     543        5577 :     if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
     544        5333 :         int32_t gradient = 0;
     545        5333 :         int32_t deltaT = 0;
     546        5333 :         float abs = 0.0f;
     547        5333 :         bool res_parse = false;
     548        5333 :         bool res_delay = false;
     549             : 
     550        5333 :         res_parse = parse_RTP_ext(buf, &abs);
     551        5333 :         bool marker = (buf[1] & 0x80) >> 7;
     552             : 
     553        5333 :         if (res_parse)
     554        5333 :             res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
     555             : 
     556             :         // rtpDelayCallback_ is not set for audio
     557        5333 :         if (rtpDelayCallback_ and res_delay)
     558        5240 :             rtpDelayCallback_(gradient, deltaT);
     559             : 
     560        5333 :         auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
     561        5333 :         if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
     562           0 :             packetLossCallback_();
     563        5333 :         lastSeqNumIn_ = buf[2] << 8 | buf[3];
     564        5333 :         if (err < 0)
     565           0 :             JAMI_WARN("decrypt error %d", err);
     566             :     }
     567             : 
     568        5577 :     if (len != 0)
     569        5577 :         return len;
     570             :     else
     571           0 :         return AVERROR_EOF;
     572             : }
     573             : 
     574             : int
     575        5590 : SocketPair::writeData(uint8_t* buf, int buf_size)
     576             : {
     577        5590 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     578             : 
     579             :     // System sockets?
     580        5590 :     if (rtpHandle_ >= 0) {
     581             :         int fd;
     582             :         dhtnet::IpAddr* dest_addr;
     583             : 
     584           0 :         if (isRTCP) {
     585           0 :             fd = rtcpHandle_;
     586           0 :             dest_addr = &rtcpDestAddr_;
     587             :         } else {
     588           0 :             fd = rtpHandle_;
     589           0 :             dest_addr = &rtpDestAddr_;
     590             :         }
     591             : 
     592           0 :         auto ret = ff_network_wait_fd(fd);
     593           0 :         if (ret < 0)
     594           0 :             return ret;
     595             : 
     596           0 :         if (noWrite_)
     597           0 :             return buf_size;
     598           0 :         return ::sendto(fd,
     599             :                         reinterpret_cast<const char*>(buf),
     600             :                         buf_size,
     601             :                         0,
     602             :                         *dest_addr,
     603           0 :                         dest_addr->getLength());
     604             :     }
     605             : 
     606        5590 :     if (noWrite_)
     607           0 :         return buf_size;
     608             : 
     609             :     // IceSocket
     610        5590 :     if (isRTCP)
     611         246 :         return rtcp_sock_->send(buf, buf_size);
     612             :     else
     613        5344 :         return rtp_sock_->send(buf, buf_size);
     614             : }
     615             : 
     616             : int
     617        5427 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
     618             : {
     619        5427 :     if (noWrite_)
     620           0 :         return 0;
     621             : 
     622             :     int ret;
     623        5427 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     624             :     unsigned int ts_LSB, ts_MSB;
     625             :     double currentSRTS, currentLatency;
     626             : 
     627             :     // Encrypt?
     628        5427 :     if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
     629        5344 :         buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
     630             :                                    buf,
     631             :                                    buf_size,
     632        5344 :                                    srtpContext_->encryptbuf,
     633             :                                    sizeof(srtpContext_->encryptbuf));
     634        5344 :         if (buf_size < 0) {
     635           0 :             JAMI_WARN("encrypt error %d", buf_size);
     636           0 :             return buf_size;
     637             :         }
     638             : 
     639        5344 :         buf = srtpContext_->encryptbuf;
     640             :     }
     641             : 
     642             :     // check if we're sending an RR, if so, detect packet loss
     643             :     // buf_size gives length of buffer, not just header
     644        5427 :     if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
     645          10 :         auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     646          10 :         rtcpPacketLoss_ = (header->pt == 201
     647          10 :                            && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
     648             :     }
     649             : 
     650             :     do {
     651        5427 :         if (interrupted_)
     652           0 :             return -EINTR;
     653        5427 :         ret = writeData(buf, buf_size);
     654        5427 :     } while (ret < 0 and errno == EAGAIN);
     655             : 
     656        5427 :     if (buf[1] == 200) // Sender Report
     657             :     {
     658          73 :         auto header = reinterpret_cast<rtcpSRHeader*>(buf);
     659          73 :         ts_LSB = Swap4Bytes(header->timestampLSB);
     660          73 :         ts_MSB = Swap4Bytes(header->timestampMSB);
     661             : 
     662          73 :         currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
     663             : 
     664          73 :         if (lastSRTS_ != 0 && lastDLSR_ != 0) {
     665          14 :             if (histoLatency_.size() >= MAX_LIST_SIZE)
     666           0 :                 histoLatency_.pop_front();
     667             : 
     668          14 :             currentLatency = (currentSRTS - lastSRTS_) / 2;
     669             :             // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
     670          14 :             histoLatency_.push_back(currentLatency);
     671             :         }
     672             : 
     673          73 :         lastSRTS_ = currentSRTS;
     674             : 
     675             :         // JAMI_WARN("SENDING NEW RTCP SR !! ");
     676             : 
     677        5354 :     } else if (buf[1] == 201) // Receiver Report
     678             :     {
     679             :         // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     680             :         // JAMI_WARN("SENDING NEW RTCP RR !! ");
     681             :     }
     682             : 
     683        5427 :     return ret < 0 ? -errno : ret;
     684             : }
     685             : 
     686             : double
     687          10 : SocketPair::getLastLatency()
     688             : {
     689          10 :     if (not histoLatency_.empty())
     690           0 :         return histoLatency_.back();
     691             :     else
     692          10 :         return -1;
     693             : }
     694             : 
     695             : void
     696         150 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
     697             : {
     698         150 :     rtpDelayCallback_ = std::move(cb);
     699         150 : }
     700             : 
     701             : bool
     702        5333 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
     703             : {
     704             :     // Keep only last packet of each frame
     705        5333 :     if (not marker) {
     706          46 :         return 0;
     707             :     }
     708             : 
     709             :     // 1st frame
     710        5287 :     if (not lastSendTS_) {
     711          47 :         lastSendTS_ = sendTS;
     712          47 :         lastReceiveTS_ = std::chrono::steady_clock::now();
     713          47 :         return 0;
     714             :     }
     715             : 
     716        5240 :     int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
     717        5240 :     if (deltaS < 0)
     718           4 :         deltaS += 64000;
     719        5240 :     lastSendTS_ = sendTS;
     720             : 
     721        5240 :     std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
     722        5240 :     auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_)
     723        5240 :                       .count();
     724        5240 :     lastReceiveTS_ = arrival_TS;
     725             : 
     726        5240 :     *gradient = deltaR - deltaS;
     727        5240 :     *deltaT = deltaR;
     728             : 
     729        5240 :     return true;
     730             : }
     731             : 
     732             : bool
     733        5333 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
     734             : {
     735        5333 :     if (not(buf[0] & 0x10))
     736           0 :         return false;
     737             : 
     738        5333 :     uint16_t magic_word = (buf[12] << 8) + buf[13];
     739        5333 :     if (magic_word != 0xBEDE)
     740           0 :         return false;
     741             : 
     742        5333 :     uint8_t sec = buf[17] >> 2;
     743        5333 :     uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
     744        5333 :     float milli = fract / pow(2, 32);
     745             : 
     746        5333 :     *abs = sec + (milli);
     747        5333 :     return true;
     748             : }
     749             : 
     750             : uint16_t
     751         152 : SocketPair::lastSeqValOut()
     752             : {
     753         152 :     if (srtpContext_)
     754         152 :         return srtpContext_->srtp_out.seq_largest;
     755           0 :     JAMI_ERR("SRTP context not found.");
     756           0 :     return 0;
     757             : }
     758             : 
     759             : } // namespace jami

Generated by: LCOV version 1.14