Line data Source code
1 : /* 2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "audio_receive_thread.h" 19 : #include "libav_deps.h" 20 : #include "logger.h" 21 : #include "manager.h" 22 : #include "media_decoder.h" 23 : #include "media_io_handle.h" 24 : #include "ringbufferpool.h" 25 : 26 : #include <memory> 27 : 28 : namespace jami { 29 : 30 72 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId, 31 : const AudioFormat& format, 32 : const std::string& sdp, 33 72 : const uint16_t mtu) 34 72 : : streamId_(streamId) 35 72 : , format_(format) 36 72 : , stream_(sdp) 37 72 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this)) 38 72 : , mtu_(mtu) 39 144 : , loop_(std::bind(&AudioReceiveThread::setup, this), 40 144 : std::bind(&AudioReceiveThread::process, this), 41 288 : std::bind(&AudioReceiveThread::cleanup, this)) 42 72 : {} 43 : 44 216 : AudioReceiveThread::~AudioReceiveThread() 45 : { 46 72 : loop_.join(); 47 144 : } 48 : 49 : bool 50 72 : AudioReceiveThread::setup() 51 : { 52 72 : std::lock_guard lk(mutex_); 53 144 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable { 54 0 : notify(frame); 55 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame)); 56 72 : })); 57 72 : audioDecoder_->setContextCallback([this]() { 58 0 : if (recorderCallback_) 59 0 : recorderCallback_(getInfo()); 60 0 : }); 61 72 : audioDecoder_->setInterruptCallback(interruptCb, this); 62 : 63 : // custom_io so the SDP demuxer will not open any UDP connections 64 72 : args_.input = SDP_FILENAME; 65 72 : args_.format = "sdp"; 66 72 : args_.sdp_flags = "custom_io"; 67 : 68 72 : if (stream_.str().empty()) { 69 0 : JAMI_ERR("No SDP loaded"); 70 0 : return false; 71 : } 72 : 73 72 : audioDecoder_->setIOContext(sdpContext_.get()); 74 72 : audioDecoder_->setFEC(true); 75 72 : if (audioDecoder_->openInput(args_)) { 76 0 : JAMI_ERR("Unable to open input \"%s\"", SDP_FILENAME); 77 0 : return false; 78 : } 79 : 80 : // Now replace our custom AVIOContext with one that will read packets 81 72 : audioDecoder_->setIOContext(demuxContext_.get()); 82 72 : if (audioDecoder_->setupAudio()) { 83 0 : JAMI_ERR("decoder IO startup failed"); 84 0 : return false; 85 : } 86 : 87 72 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_); 88 : 89 72 : if (onSuccessfulSetup_) 90 72 : onSuccessfulSetup_(MEDIA_AUDIO, 1); 91 : 92 72 : return true; 93 72 : } 94 : 95 : void 96 0 : AudioReceiveThread::process() 97 : { 98 0 : audioDecoder_->decode(); 99 0 : } 100 : 101 : void 102 72 : AudioReceiveThread::cleanup() 103 : { 104 72 : std::lock_guard lk(mutex_); 105 72 : audioDecoder_.reset(); 106 72 : demuxContext_.reset(); 107 72 : } 108 : 109 : int 110 288 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size) 111 : { 112 288 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_; 113 288 : is.read(reinterpret_cast<char*>(buf), buf_size); 114 : 115 288 : auto count = is.gcount(); 116 288 : return count ? static_cast<int>(count) : AVERROR_EOF; 117 : } 118 : 119 : // This callback is used by libav internally to break out of blocking calls 120 : int 121 72 : AudioReceiveThread::interruptCb(void* data) 122 : { 123 72 : auto* context = static_cast<AudioReceiveThread*>(data); 124 72 : return not context->loop_.isRunning(); 125 : } 126 : 127 : void 128 72 : AudioReceiveThread::addIOContext(SocketPair& socketPair) 129 : { 130 72 : demuxContext_.reset(socketPair.createIOContext(mtu_)); 131 72 : } 132 : 133 : void 134 72 : AudioReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb) 135 : { 136 72 : recorderCallback_ = cb; 137 72 : if (audioDecoder_) 138 0 : audioDecoder_->setContextCallback([this]() { 139 0 : if (recorderCallback_) 140 0 : recorderCallback_(getInfo()); 141 0 : }); 142 72 : } 143 : 144 : MediaStream 145 3 : AudioReceiveThread::getInfo() const 146 : { 147 3 : if (!audioDecoder_) 148 0 : return {}; 149 3 : return audioDecoder_->getStream("a:remote"); 150 : } 151 : 152 : void 153 72 : AudioReceiveThread::startReceiver() 154 : { 155 72 : loop_.start(); 156 72 : } 157 : 158 : void 159 72 : AudioReceiveThread::stopReceiver() 160 : { 161 72 : loop_.stop(); 162 72 : } 163 : 164 : }; // namespace jami