LCOV - code coverage report
Current view: top level - src/media - socket_pair.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 292 363 80.4 %
Date: 2024-03-29 09:30:40 Functions: 36 37 97.3 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *  Copyright (c) 2007 The FFmpeg Project
       4             :  *
       5             :  *  Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
       6             :  *  Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
       7             :  *
       8             :  *  This program is free software; you can redistribute it and/or modify
       9             :  *  it under the terms of the GNU General Public License as published by
      10             :  *  the Free Software Foundation; either version 3 of the License, or
      11             :  *  (at your option) any later version.
      12             :  *
      13             :  *  This program is distributed in the hope that it will be useful,
      14             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      15             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      16             :  *  GNU General Public License for more details.
      17             :  *
      18             :  *  You should have received a copy of the GNU General Public License
      19             :  *  along with this program; if not, write to the Free Software
      20             :  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
      21             :  */
      22             : 
      23             : #include <dhtnet/ip_utils.h>   // MUST BE INCLUDED FIRST
      24             : 
      25             : #include "libav_deps.h" // THEN THIS ONE AFTER
      26             : 
      27             : #include "socket_pair.h"
      28             : #include "libav_utils.h"
      29             : #include "logger.h"
      30             : #include "connectivity/security/memory.h"
      31             : 
      32             : #include <dhtnet/ice_socket.h>
      33             : 
      34             : #include <iostream>
      35             : #include <string>
      36             : #include <algorithm>
      37             : #include <iterator>
      38             : 
      39             : extern "C" {
      40             : #include "srtp.h"
      41             : }
      42             : 
      43             : #include <cstring>
      44             : #include <stdexcept>
      45             : #include <unistd.h>
      46             : #include <sys/types.h>
      47             : #include <ciso646> // fix windows compiler bug
      48             : 
      49             : #ifdef _WIN32
      50             : #define SOCK_NONBLOCK FIONBIO
      51             : #define poll          WSAPoll
      52             : #define close(x)      closesocket(x)
      53             : #endif
      54             : 
      55             : #ifdef __ANDROID__
      56             : #include <asm-generic/fcntl.h>
      57             : #define SOCK_NONBLOCK O_NONBLOCK
      58             : #endif
      59             : 
      60             : #ifdef __APPLE__
      61             : #include <fcntl.h>
      62             : #endif
      63             : 
      64             : // Swap 2 byte, 16 bit values:
      65             : #define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
      66             : 
      67             : // Swap 4 byte, 32 bit values:
      68             : #define Swap4Bytes(val) \
      69             :     ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
      70             :      | (((val) << 24) & 0xFF000000))
      71             : 
      72             : // Swap 8 byte, 64 bit values:
      73             : #define Swap8Bytes(val) \
      74             :     ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
      75             :      | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
      76             :      | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
      77             :      | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
      78             : 
      79             : namespace jami {
      80             : 
      81             : static constexpr int NET_POLL_TIMEOUT = 100; /* poll() timeout in ms */
      82             : static constexpr int RTP_MAX_PACKET_LENGTH = 2048;
      83             : static constexpr auto UDP_HEADER_SIZE = 8;
      84             : static constexpr auto SRTP_OVERHEAD = 10;
      85             : static constexpr uint32_t RTCP_RR_FRACTION_MASK = 0xFF000000;
      86             : static constexpr unsigned MINIMUM_RTP_HEADER_SIZE = 16;
      87             : 
      88             : enum class DataType : unsigned { RTP = 1 << 0, RTCP = 1 << 1 };
      89             : 
      90             : class SRTPProtoContext
      91             : {
      92             : public:
      93         334 :     SRTPProtoContext(const char* out_suite,
      94             :                      const char* out_key,
      95             :                      const char* in_suite,
      96             :                      const char* in_key)
      97         334 :     {
      98         334 :         ring_secure_memzero(&srtp_out, sizeof(srtp_out));
      99         334 :         ring_secure_memzero(&srtp_in, sizeof(srtp_in));
     100         334 :         if (out_suite && out_key) {
     101             :             // XXX: see srtp_open from libavformat/srtpproto.c
     102         334 :             if (ff_srtp_set_crypto(&srtp_out, out_suite, out_key) < 0) {
     103           0 :                 srtp_close();
     104           0 :                 throw std::runtime_error("Could not set crypto on output");
     105             :             }
     106             :         }
     107             : 
     108         334 :         if (in_suite && in_key) {
     109         334 :             if (ff_srtp_set_crypto(&srtp_in, in_suite, in_key) < 0) {
     110           0 :                 srtp_close();
     111           0 :                 throw std::runtime_error("Could not set crypto on input");
     112             :             }
     113             :         }
     114         334 :     }
     115             : 
     116         334 :     ~SRTPProtoContext() { srtp_close(); }
     117             : 
     118             :     SRTPContext srtp_out {};
     119             :     SRTPContext srtp_in {};
     120             :     uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH];
     121             : 
     122             : private:
     123         334 :     void srtp_close() noexcept
     124             :     {
     125         334 :         ff_srtp_free(&srtp_out);
     126         334 :         ff_srtp_free(&srtp_in);
     127         334 :     }
     128             : };
     129             : 
     130             : static int
     131           0 : ff_network_wait_fd(int fd)
     132             : {
     133           0 :     struct pollfd p = {fd, POLLOUT, 0};
     134           0 :     auto ret = poll(&p, 1, NET_POLL_TIMEOUT);
     135           0 :     return ret < 0 ? errno : p.revents & (POLLOUT | POLLERR | POLLHUP) ? 0 : -EAGAIN;
     136             : }
     137             : 
     138             : static int
     139          20 : udp_socket_create(int family, int port)
     140             : {
     141          20 :     int udp_fd = -1;
     142             : 
     143             : #ifdef __APPLE__
     144             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     145             :     if (udp_fd >= 0 && fcntl(udp_fd, F_SETFL, O_NONBLOCK) < 0) {
     146             :         close(udp_fd);
     147             :         udp_fd = -1;
     148             :     }
     149             : #elif defined _WIN32
     150             :     udp_fd = socket(family, SOCK_DGRAM, 0);
     151             :     u_long block = 1;
     152             :     if (udp_fd >= 0 && ioctlsocket(udp_fd, FIONBIO, &block) < 0) {
     153             :         close(udp_fd);
     154             :         udp_fd = -1;
     155             :     }
     156             : #else
     157          20 :     udp_fd = socket(family, SOCK_DGRAM | SOCK_NONBLOCK, 0);
     158             : #endif
     159             : 
     160          20 :     if (udp_fd < 0) {
     161           0 :         JAMI_ERR("socket() failed");
     162           0 :         strErr();
     163           0 :         return -1;
     164             :     }
     165             : 
     166          20 :     auto bind_addr = dhtnet::ip_utils::getAnyHostAddr(family);
     167          20 :     if (not bind_addr.isIpv4() and not bind_addr.isIpv6()) {
     168           0 :         JAMI_ERR("No IPv4/IPv6 host found for family %u", family);
     169           0 :         close(udp_fd);
     170           0 :         return -1;
     171             :     }
     172             : 
     173          20 :     bind_addr.setPort(port);
     174          20 :     JAMI_DBG("use local address: %s", bind_addr.toString(true, true).c_str());
     175          20 :     if (::bind(udp_fd, bind_addr, bind_addr.getLength()) < 0) {
     176           0 :         JAMI_ERR("bind() failed");
     177           0 :         strErr();
     178           0 :         close(udp_fd);
     179           0 :         udp_fd = -1;
     180             :     }
     181             : 
     182          20 :     return udp_fd;
     183             : }
     184             : 
     185          10 : SocketPair::SocketPair(const char* uri, int localPort)
     186             : {
     187          10 :     openSockets(uri, localPort);
     188          10 : }
     189             : 
     190         324 : SocketPair::SocketPair(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
     191         324 :     : rtp_sock_(std::move(rtp_sock))
     192         648 :     , rtcp_sock_(std::move(rtcp_sock))
     193             : {
     194         324 :     JAMI_DBG("[%p] Creating instance using ICE sockets for comp %d and %d",
     195             :              this,
     196             :              rtp_sock_->getCompId(),
     197             :              rtcp_sock_->getCompId());
     198             : 
     199         324 :     rtp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     200        5435 :         std::lock_guard l(dataBuffMutex_);
     201        5435 :         rtpDataBuff_.emplace_back(buf, buf + len);
     202        5435 :         cv_.notify_one();
     203        5435 :         return len;
     204        5435 :     });
     205         324 :     rtcp_sock_->setOnRecv([this](uint8_t* buf, size_t len) {
     206         250 :         std::lock_guard l(dataBuffMutex_);
     207         250 :         rtcpDataBuff_.emplace_back(buf, buf + len);
     208         250 :         cv_.notify_one();
     209         250 :         return len;
     210         250 :     });
     211         324 : }
     212             : 
     213         334 : SocketPair::~SocketPair()
     214             : {
     215         334 :     interrupt();
     216         334 :     closeSockets();
     217         334 :     JAMI_DBG("[%p] Instance destroyed", this);
     218         334 : }
     219             : 
     220             : bool
     221         731 : SocketPair::waitForRTCP(std::chrono::seconds interval)
     222             : {
     223         731 :     std::unique_lock lock(rtcpInfo_mutex_);
     224         731 :     return cvRtcpPacketReadyToRead_.wait_for(lock, interval, [this] {
     225        2048 :         return interrupted_ or not listRtcpRRHeader_.empty() or not listRtcpREMBHeader_.empty();
     226        1462 :     });
     227         731 : }
     228             : 
     229             : void
     230          10 : SocketPair::saveRtcpRRPacket(uint8_t* buf, size_t len)
     231             : {
     232          10 :     if (len < sizeof(rtcpRRHeader))
     233           0 :         return;
     234             : 
     235          10 :     auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     236          10 :     if (header->pt != 201) // 201 = RR PT
     237           0 :         return;
     238             : 
     239          10 :     std::lock_guard lock(rtcpInfo_mutex_);
     240             : 
     241          10 :     if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
     242           0 :         listRtcpRRHeader_.pop_front();
     243             :     }
     244             : 
     245          10 :     listRtcpRRHeader_.emplace_back(*header);
     246             : 
     247          10 :     cvRtcpPacketReadyToRead_.notify_one();
     248          10 : }
     249             : 
     250             : void
     251         166 : SocketPair::saveRtcpREMBPacket(uint8_t* buf, size_t len)
     252             : {
     253         166 :     if (len < sizeof(rtcpREMBHeader))
     254           0 :         return;
     255             : 
     256         166 :     auto header = reinterpret_cast<rtcpREMBHeader*>(buf);
     257         166 :     if (header->pt != 206) // 206 = REMB PT
     258           0 :         return;
     259             : 
     260         166 :     if (header->uid != 0x424D4552) // uid must be "REMB"
     261           0 :         return;
     262             : 
     263         166 :     std::lock_guard lock(rtcpInfo_mutex_);
     264             : 
     265         166 :     if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
     266           0 :         listRtcpREMBHeader_.pop_front();
     267             :     }
     268             : 
     269         166 :     listRtcpREMBHeader_.push_back(*header);
     270             : 
     271         166 :     cvRtcpPacketReadyToRead_.notify_one();
     272         166 : }
     273             : 
     274             : std::list<rtcpRRHeader>
     275         731 : SocketPair::getRtcpRR()
     276             : {
     277         731 :     std::lock_guard lock(rtcpInfo_mutex_);
     278        1462 :     return std::move(listRtcpRRHeader_);
     279         731 : }
     280             : 
     281             : std::list<rtcpREMBHeader>
     282         397 : SocketPair::getRtcpREMB()
     283             : {
     284         397 :     std::lock_guard lock(rtcpInfo_mutex_);
     285         794 :     return std::move(listRtcpREMBHeader_);
     286         397 : }
     287             : 
     288             : void
     289         334 : SocketPair::createSRTP(const char* out_suite,
     290             :                        const char* out_key,
     291             :                        const char* in_suite,
     292             :                        const char* in_key)
     293             : {
     294         334 :     srtpContext_.reset(new SRTPProtoContext(out_suite, out_key, in_suite, in_key));
     295         334 : }
     296             : 
     297             : void
     298         668 : SocketPair::interrupt()
     299             : {
     300         668 :     JAMI_WARN("[%p] Interrupting RTP sockets", this);
     301         668 :     interrupted_ = true;
     302         668 :     if (rtp_sock_)
     303         648 :         rtp_sock_->setOnRecv(nullptr);
     304         668 :     if (rtcp_sock_)
     305         648 :         rtcp_sock_->setOnRecv(nullptr);
     306         668 :     cv_.notify_all();
     307         668 :     cvRtcpPacketReadyToRead_.notify_all();
     308         668 : }
     309             : 
     310             : void
     311         968 : SocketPair::setReadBlockingMode(bool block)
     312             : {
     313         968 :     JAMI_DBG("[%p] Read operations in blocking mode [%s]", this, block ? "YES" : "NO");
     314         968 :     readBlockingMode_ = block;
     315         968 :     cv_.notify_all();
     316         968 :     cvRtcpPacketReadyToRead_.notify_all();
     317         968 : }
     318             : 
     319             : void
     320         972 : SocketPair::stopSendOp(bool state)
     321             : {
     322         972 :     noWrite_ = state;
     323         972 : }
     324             : 
     325             : void
     326         334 : SocketPair::closeSockets()
     327             : {
     328         334 :     if (rtcpHandle_ > 0 and close(rtcpHandle_))
     329           0 :         strErr();
     330         334 :     if (rtpHandle_ > 0 and close(rtpHandle_))
     331           0 :         strErr();
     332         334 : }
     333             : 
     334             : void
     335          10 : SocketPair::openSockets(const char* uri, int local_rtp_port)
     336             : {
     337          10 :     JAMI_DBG("Creating rtp socket for uri %s on port %d", uri, local_rtp_port);
     338             : 
     339             :     char hostname[256];
     340             :     char path[1024];
     341             :     int dst_rtp_port;
     342             : 
     343          10 :     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &dst_rtp_port, path, sizeof(path), uri);
     344             : 
     345          10 :     const int local_rtcp_port = local_rtp_port + 1;
     346          10 :     const int dst_rtcp_port = dst_rtp_port + 1;
     347             : 
     348          10 :     rtpDestAddr_ = dhtnet::IpAddr {hostname};
     349          10 :     rtpDestAddr_.setPort(dst_rtp_port);
     350          10 :     rtcpDestAddr_ = dhtnet::IpAddr {hostname};
     351          10 :     rtcpDestAddr_.setPort(dst_rtcp_port);
     352             : 
     353             :     // Open local sockets (RTP/RTCP)
     354          10 :     if ((rtpHandle_ = udp_socket_create(rtpDestAddr_.getFamily(), local_rtp_port)) == -1
     355          10 :         or (rtcpHandle_ = udp_socket_create(rtcpDestAddr_.getFamily(), local_rtcp_port)) == -1) {
     356           0 :         closeSockets();
     357           0 :         JAMI_ERR("[%p] Sockets creation failed", this);
     358           0 :         throw std::runtime_error("Sockets creation failed");
     359             :     }
     360             : 
     361          10 :     JAMI_WARN("SocketPair: local{%d,%d} / %s{%d,%d}",
     362             :               local_rtp_port,
     363             :               local_rtcp_port,
     364             :               hostname,
     365             :               dst_rtp_port,
     366             :               dst_rtcp_port);
     367          10 : }
     368             : 
     369             : MediaIOHandle*
     370         669 : SocketPair::createIOContext(const uint16_t mtu)
     371             : {
     372             :     unsigned ip_header_size;
     373         669 :     if (rtp_sock_)
     374         652 :         ip_header_size = rtp_sock_->getTransportOverhead();
     375          17 :     else if (rtpDestAddr_.getFamily() == AF_INET6)
     376           0 :         ip_header_size = 40;
     377             :     else
     378          17 :         ip_header_size = 20;
     379             :     return new MediaIOHandle(
     380         669 :         mtu - (srtpContext_ ? SRTP_OVERHEAD : 0) - UDP_HEADER_SIZE - ip_header_size,
     381             :         true,
     382        6125 :         [](void* sp, uint8_t* buf, int len) {
     383        6125 :             return static_cast<SocketPair*>(sp)->readCallback(buf, len);
     384             :         },
     385        5521 :         [](void* sp, uint8_t* buf, int len) {
     386        5521 :             return static_cast<SocketPair*>(sp)->writeCallback(buf, len);
     387             :         },
     388             :         0,
     389         669 :         reinterpret_cast<void*>(this));
     390             : }
     391             : 
     392             : int
     393        6125 : SocketPair::waitForData()
     394             : {
     395             :     // System sockets
     396        6125 :     if (rtpHandle_ >= 0) {
     397             :         int ret;
     398             :         do {
     399         129 :             if (interrupted_) {
     400           7 :                 errno = EINTR;
     401           9 :                 return -1;
     402             :             }
     403             : 
     404         122 :             if (not readBlockingMode_) {
     405           2 :                 return 0;
     406             :             }
     407             : 
     408             :             // work with system socket
     409         120 :             struct pollfd p[2] = {{rtpHandle_, POLLIN, 0}, {rtcpHandle_, POLLIN, 0}};
     410         120 :             ret = poll(p, 2, NET_POLL_TIMEOUT);
     411         120 :             if (ret > 0) {
     412           0 :                 ret = 0;
     413           0 :                 if (p[0].revents & POLLIN)
     414           0 :                     ret |= static_cast<int>(DataType::RTP);
     415           0 :                 if (p[1].revents & POLLIN)
     416           0 :                     ret |= static_cast<int>(DataType::RTCP);
     417             :             }
     418         120 :         } while (!ret or (ret < 0 and errno == EAGAIN));
     419             : 
     420           0 :         return ret;
     421             :     }
     422             : 
     423             :     // work with IceSocket
     424             :     {
     425        6116 :         std::unique_lock lk(dataBuffMutex_);
     426        6116 :         cv_.wait(lk, [this] {
     427       24017 :             return interrupted_ or not rtpDataBuff_.empty() or not rtcpDataBuff_.empty()
     428       24017 :                    or not readBlockingMode_;
     429             :         });
     430        6116 :     }
     431             : 
     432        6116 :     if (interrupted_) {
     433          60 :         errno = EINTR;
     434          60 :         return -1;
     435             :     }
     436             : 
     437        6056 :     return static_cast<int>(DataType::RTP) | static_cast<int>(DataType::RTCP);
     438             : }
     439             : 
     440             : int
     441        5806 : SocketPair::readRtpData(void* buf, int buf_size)
     442             : {
     443             :     // handle system socket
     444        5806 :     if (rtpHandle_ >= 0) {
     445             :         struct sockaddr_storage from;
     446           0 :         socklen_t from_len = sizeof(from);
     447           0 :         return recvfrom(rtpHandle_,
     448             :                         static_cast<char*>(buf),
     449             :                         buf_size,
     450             :                         0,
     451             :                         reinterpret_cast<struct sockaddr*>(&from),
     452           0 :                         &from_len);
     453             :     }
     454             : 
     455             :     // handle ICE
     456        5806 :     std::unique_lock lk(dataBuffMutex_);
     457        5806 :     if (not rtpDataBuff_.empty()) {
     458        5433 :         auto pkt = std::move(rtpDataBuff_.front());
     459        5433 :         rtpDataBuff_.pop_front();
     460        5433 :         lk.unlock(); // to not block our ICE callbacks
     461        5433 :         int pkt_size = pkt.size();
     462        5433 :         int len = std::min(pkt_size, buf_size);
     463        5433 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     464        5433 :         return len;
     465        5433 :     }
     466             : 
     467         373 :     return 0;
     468        5806 : }
     469             : 
     470             : int
     471        6056 : SocketPair::readRtcpData(void* buf, int buf_size)
     472             : {
     473             :     // handle system socket
     474        6056 :     if (rtcpHandle_ >= 0) {
     475             :         struct sockaddr_storage from;
     476           0 :         socklen_t from_len = sizeof(from);
     477           0 :         return recvfrom(rtcpHandle_,
     478             :                         static_cast<char*>(buf),
     479             :                         buf_size,
     480             :                         0,
     481             :                         reinterpret_cast<struct sockaddr*>(&from),
     482           0 :                         &from_len);
     483             :     }
     484             : 
     485             :     // handle ICE
     486        6056 :     std::unique_lock lk(dataBuffMutex_);
     487        6056 :     if (not rtcpDataBuff_.empty()) {
     488         250 :         auto pkt = std::move(rtcpDataBuff_.front());
     489         250 :         rtcpDataBuff_.pop_front();
     490         250 :         lk.unlock();
     491         250 :         int pkt_size = pkt.size();
     492         250 :         int len = std::min(pkt_size, buf_size);
     493         250 :         std::copy_n(pkt.begin(), len, static_cast<char*>(buf));
     494         250 :         return len;
     495         250 :     }
     496             : 
     497        5806 :     return 0;
     498        6056 : }
     499             : 
     500             : int
     501        6125 : SocketPair::readCallback(uint8_t* buf, int buf_size)
     502             : {
     503        6125 :     auto datatype = waitForData();
     504        6125 :     if (datatype < 0)
     505          67 :         return datatype;
     506             : 
     507        6058 :     int len = 0;
     508        6058 :     bool fromRTCP = false;
     509             : 
     510        6058 :     if (datatype & static_cast<int>(DataType::RTCP)) {
     511        6056 :         len = readRtcpData(buf, buf_size);
     512        6056 :         if (len > 0) {
     513         250 :             auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     514             :             // 201 = RR PT
     515         250 :             if (header->pt == 201) {
     516          10 :                 lastDLSR_ = Swap4Bytes(header->dlsr);
     517             :                 // JAMI_WARN("Read RR, lastDLSR : %d", lastDLSR_);
     518          10 :                 lastRR_time = std::chrono::steady_clock::now();
     519          10 :                 saveRtcpRRPacket(buf, len);
     520             :             }
     521             :             // 206 = REMB PT
     522         240 :             else if (header->pt == 206)
     523         166 :                 saveRtcpREMBPacket(buf, len);
     524             :             // 200 = SR PT
     525          74 :             else if (header->pt == 200) {
     526             :                 // not used yet
     527             :             } else {
     528           0 :                 JAMI_DBG("Can't read RTCP: unknown packet type %u", header->pt);
     529             :             }
     530         250 :             fromRTCP = true;
     531             :         }
     532             :     }
     533             : 
     534             :     // No RTCP... try RTP
     535        6058 :     if (!len and (datatype & static_cast<int>(DataType::RTP))) {
     536        5806 :         len = readRtpData(buf, buf_size);
     537        5806 :         fromRTCP = false;
     538             :     }
     539             : 
     540        6058 :     if (len <= 0)
     541         375 :         return len;
     542             : 
     543        5683 :     if (not fromRTCP && (buf_size < static_cast<int>(MINIMUM_RTP_HEADER_SIZE)))
     544           0 :         return len;
     545             : 
     546             :     // SRTP decrypt
     547        5683 :     if (not fromRTCP and srtpContext_ and srtpContext_->srtp_in.aes) {
     548        5433 :         int32_t gradient = 0;
     549        5433 :         int32_t deltaT = 0;
     550        5433 :         float abs = 0.0f;
     551        5433 :         bool res_parse = false;
     552        5433 :         bool res_delay = false;
     553             : 
     554        5433 :         res_parse = parse_RTP_ext(buf, &abs);
     555        5433 :         bool marker = (buf[1] & 0x80) >> 7;
     556             : 
     557        5433 :         if (res_parse)
     558        5433 :             res_delay = getOneWayDelayGradient(abs, marker, &gradient, &deltaT);
     559             : 
     560             :         // rtpDelayCallback_ is not set for audio
     561        5433 :         if (rtpDelayCallback_ and res_delay)
     562        5339 :             rtpDelayCallback_(gradient, deltaT);
     563             : 
     564        5433 :         auto err = ff_srtp_decrypt(&srtpContext_->srtp_in, buf, &len);
     565        5433 :         if (packetLossCallback_ and (buf[2] << 8 | buf[3]) != lastSeqNumIn_ + 1)
     566           0 :             packetLossCallback_();
     567        5433 :         lastSeqNumIn_ = buf[2] << 8 | buf[3];
     568        5433 :         if (err < 0)
     569           0 :             JAMI_WARN("decrypt error %d", err);
     570             :     }
     571             : 
     572        5683 :     if (len != 0)
     573        5683 :         return len;
     574             :     else
     575           0 :         return AVERROR_EOF;
     576             : }
     577             : 
     578             : int
     579        5687 : SocketPair::writeData(uint8_t* buf, int buf_size)
     580             : {
     581        5687 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     582             : 
     583             :     // System sockets?
     584        5687 :     if (rtpHandle_ >= 0) {
     585             :         int fd;
     586             :         dhtnet::IpAddr* dest_addr;
     587             : 
     588           0 :         if (isRTCP) {
     589           0 :             fd = rtcpHandle_;
     590           0 :             dest_addr = &rtcpDestAddr_;
     591             :         } else {
     592           0 :             fd = rtpHandle_;
     593           0 :             dest_addr = &rtpDestAddr_;
     594             :         }
     595             : 
     596           0 :         auto ret = ff_network_wait_fd(fd);
     597           0 :         if (ret < 0)
     598           0 :             return ret;
     599             : 
     600           0 :         if (noWrite_)
     601           0 :             return buf_size;
     602           0 :         return ::sendto(fd,
     603             :                         reinterpret_cast<const char*>(buf),
     604             :                         buf_size,
     605             :                         0,
     606             :                         *dest_addr,
     607           0 :                         dest_addr->getLength());
     608             :     }
     609             : 
     610        5687 :     if (noWrite_)
     611           0 :         return buf_size;
     612             : 
     613             :     // IceSocket
     614        5687 :     if (isRTCP)
     615         250 :         return rtcp_sock_->send(buf, buf_size);
     616             :     else
     617        5437 :         return rtp_sock_->send(buf, buf_size);
     618             : }
     619             : 
     620             : int
     621        5521 : SocketPair::writeCallback(uint8_t* buf, int buf_size)
     622             : {
     623        5521 :     if (noWrite_)
     624           0 :         return 0;
     625             : 
     626             :     int ret;
     627        5521 :     bool isRTCP = RTP_PT_IS_RTCP(buf[1]);
     628             :     unsigned int ts_LSB, ts_MSB;
     629             :     double currentSRTS, currentLatency;
     630             : 
     631             :     // Encrypt?
     632        5521 :     if (not isRTCP and srtpContext_ and srtpContext_->srtp_out.aes) {
     633        5437 :         buf_size = ff_srtp_encrypt(&srtpContext_->srtp_out,
     634             :                                    buf,
     635             :                                    buf_size,
     636        5437 :                                    srtpContext_->encryptbuf,
     637             :                                    sizeof(srtpContext_->encryptbuf));
     638        5437 :         if (buf_size < 0) {
     639           0 :             JAMI_WARN("encrypt error %d", buf_size);
     640           0 :             return buf_size;
     641             :         }
     642             : 
     643        5437 :         buf = srtpContext_->encryptbuf;
     644             :     }
     645             : 
     646             :     // check if we're sending an RR, if so, detect packet loss
     647             :     // buf_size gives length of buffer, not just header
     648        5521 :     if (isRTCP && static_cast<unsigned>(buf_size) >= sizeof(rtcpRRHeader)) {
     649          10 :         auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     650          10 :         rtcpPacketLoss_ = (header->pt == 201
     651          10 :                            && ntohl(header->fraction_lost) & RTCP_RR_FRACTION_MASK);
     652             :     }
     653             : 
     654             :     do {
     655        5521 :         if (interrupted_)
     656           0 :             return -EINTR;
     657        5521 :         ret = writeData(buf, buf_size);
     658        5521 :     } while (ret < 0 and errno == EAGAIN);
     659             : 
     660        5521 :     if (buf[1] == 200) // Sender Report
     661             :     {
     662          74 :         auto header = reinterpret_cast<rtcpSRHeader*>(buf);
     663          74 :         ts_LSB = Swap4Bytes(header->timestampLSB);
     664          74 :         ts_MSB = Swap4Bytes(header->timestampMSB);
     665             : 
     666          74 :         currentSRTS = ts_MSB + (ts_LSB / pow(2, 32));
     667             : 
     668          74 :         if (lastSRTS_ != 0 && lastDLSR_ != 0) {
     669          15 :             if (histoLatency_.size() >= MAX_LIST_SIZE)
     670           0 :                 histoLatency_.pop_front();
     671             : 
     672          15 :             currentLatency = (currentSRTS - lastSRTS_) / 2;
     673             :             // JAMI_WARN("Current Latency : %f from sender %X", currentLatency, header->ssrc);
     674          15 :             histoLatency_.push_back(currentLatency);
     675             :         }
     676             : 
     677          74 :         lastSRTS_ = currentSRTS;
     678             : 
     679             :         // JAMI_WARN("SENDING NEW RTCP SR !! ");
     680             : 
     681        5447 :     } else if (buf[1] == 201) // Receiver Report
     682             :     {
     683             :         // auto header = reinterpret_cast<rtcpRRHeader*>(buf);
     684             :         // JAMI_WARN("SENDING NEW RTCP RR !! ");
     685             :     }
     686             : 
     687        5521 :     return ret < 0 ? -errno : ret;
     688             : }
     689             : 
     690             : double
     691          10 : SocketPair::getLastLatency()
     692             : {
     693          10 :     if (not histoLatency_.empty())
     694           0 :         return histoLatency_.back();
     695             :     else
     696          10 :         return -1;
     697             : }
     698             : 
     699             : void
     700         147 : SocketPair::setRtpDelayCallback(std::function<void(int, int)> cb)
     701             : {
     702         147 :     rtpDelayCallback_ = std::move(cb);
     703         147 : }
     704             : 
     705             : bool
     706        5433 : SocketPair::getOneWayDelayGradient(float sendTS, bool marker, int32_t* gradient, int32_t* deltaT)
     707             : {
     708             :     // Keep only last packet of each frame
     709        5433 :     if (not marker) {
     710          48 :         return 0;
     711             :     }
     712             : 
     713             :     // 1st frame
     714        5385 :     if (not lastSendTS_) {
     715          46 :         lastSendTS_ = sendTS;
     716          46 :         lastReceiveTS_ = std::chrono::steady_clock::now();
     717          46 :         return 0;
     718             :     }
     719             : 
     720        5339 :     int32_t deltaS = (sendTS - lastSendTS_) * 1000; // milliseconds
     721        5339 :     if (deltaS < 0)
     722           4 :         deltaS += 64000;
     723        5339 :     lastSendTS_ = sendTS;
     724             : 
     725        5339 :     std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
     726        5339 :     auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_)
     727        5339 :                       .count();
     728        5339 :     lastReceiveTS_ = arrival_TS;
     729             : 
     730        5339 :     *gradient = deltaR - deltaS;
     731        5339 :     *deltaT = deltaR;
     732             : 
     733        5339 :     return true;
     734             : }
     735             : 
     736             : bool
     737        5433 : SocketPair::parse_RTP_ext(uint8_t* buf, float* abs)
     738             : {
     739        5433 :     if (not(buf[0] & 0x10))
     740           0 :         return false;
     741             : 
     742        5433 :     uint16_t magic_word = (buf[12] << 8) + buf[13];
     743        5433 :     if (magic_word != 0xBEDE)
     744           0 :         return false;
     745             : 
     746        5433 :     uint8_t sec = buf[17] >> 2;
     747        5433 :     uint32_t fract = ((buf[17] & 0x3) << 16 | (buf[18] << 8) | buf[19]) << 14;
     748        5433 :     float milli = fract / pow(2, 32);
     749             : 
     750        5433 :     *abs = sec + (milli);
     751        5433 :     return true;
     752             : }
     753             : 
     754             : uint16_t
     755         152 : SocketPair::lastSeqValOut()
     756             : {
     757         152 :     if (srtpContext_)
     758         152 :         return srtpContext_->srtp_out.seq_largest;
     759           0 :     JAMI_ERR("SRTP context not found.");
     760           0 :     return 0;
     761             : }
     762             : 
     763             : } // namespace jami

Generated by: LCOV version 1.14