Line data Source code
1 : /* 2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "audio_receive_thread.h" 19 : #include "libav_deps.h" 20 : #include "logger.h" 21 : #include "manager.h" 22 : #include "media_decoder.h" 23 : #include "media_io_handle.h" 24 : #include "ringbufferpool.h" 25 : 26 : #include <memory> 27 : 28 : namespace jami { 29 : 30 166 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId, 31 : const AudioFormat& format, 32 : const std::string& sdp, 33 166 : const uint16_t mtu) 34 166 : : streamId_(streamId) 35 166 : , format_(format) 36 166 : , stream_(sdp) 37 166 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this)) 38 166 : , mtu_(mtu) 39 332 : , loop_(std::bind(&AudioReceiveThread::setup, this), 40 332 : std::bind(&AudioReceiveThread::process, this), 41 664 : std::bind(&AudioReceiveThread::cleanup, this)) 42 166 : {} 43 : 44 498 : AudioReceiveThread::~AudioReceiveThread() 45 : { 46 166 : loop_.join(); 47 332 : } 48 : 49 : bool 50 166 : AudioReceiveThread::setup() 51 : { 52 166 : std::lock_guard lk(mutex_); 53 332 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable { 54 0 : notify(frame); 55 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame)); 56 166 : })); 57 166 : audioDecoder_->setContextCallback([this]() { 58 0 : if (recorderCallback_) 59 0 : recorderCallback_(getInfo()); 60 0 : }); 61 166 : audioDecoder_->setInterruptCallback(interruptCb, this); 62 : 63 : // custom_io so the SDP demuxer will not open any UDP connections 64 166 : args_.input = SDP_FILENAME; 65 166 : args_.format = "sdp"; 66 166 : args_.sdp_flags = "custom_io"; 67 : 68 166 : if (stream_.str().empty()) { 69 0 : JAMI_ERR("No SDP loaded"); 70 0 : return false; 71 : } 72 : 73 166 : audioDecoder_->setIOContext(sdpContext_.get()); 74 166 : audioDecoder_->setFEC(true); 75 166 : if (audioDecoder_->openInput(args_)) { 76 0 : JAMI_ERR("Unable to open input \"%s\"", SDP_FILENAME); 77 0 : return false; 78 : } 79 : 80 : // Now replace our custom AVIOContext with one that will read packets 81 166 : audioDecoder_->setIOContext(demuxContext_.get()); 82 166 : if (audioDecoder_->setupAudio()) { 83 0 : JAMI_ERR("decoder IO startup failed"); 84 0 : return false; 85 : } 86 : 87 166 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_); 88 : 89 166 : if (onSuccessfulSetup_) 90 166 : onSuccessfulSetup_(MEDIA_AUDIO, 1); 91 : 92 166 : return true; 93 166 : } 94 : 95 : void 96 0 : AudioReceiveThread::process() 97 : { 98 0 : audioDecoder_->decode(); 99 0 : } 100 : 101 : void 102 166 : AudioReceiveThread::cleanup() 103 : { 104 166 : std::lock_guard lk(mutex_); 105 166 : audioDecoder_.reset(); 106 166 : demuxContext_.reset(); 107 166 : } 108 : 109 : int 110 664 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size) 111 : { 112 664 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_; 113 664 : is.read(reinterpret_cast<char*>(buf), buf_size); 114 : 115 664 : auto count = is.gcount(); 116 664 : return count ? static_cast<int>(count) : AVERROR_EOF; 117 : } 118 : 119 : // This callback is used by libav internally to break out of blocking calls 120 : int 121 166 : AudioReceiveThread::interruptCb(void* data) 122 : { 123 166 : auto* context = static_cast<AudioReceiveThread*>(data); 124 166 : return not context->loop_.isRunning(); 125 : } 126 : 127 : void 128 166 : AudioReceiveThread::addIOContext(SocketPair& socketPair) 129 : { 130 166 : demuxContext_.reset(socketPair.createIOContext(mtu_)); 131 166 : } 132 : 133 : void 134 167 : AudioReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb) 135 : { 136 167 : recorderCallback_ = cb; 137 167 : if (audioDecoder_) 138 1 : audioDecoder_->setContextCallback([this]() { 139 0 : if (recorderCallback_) 140 0 : recorderCallback_(getInfo()); 141 0 : }); 142 167 : } 143 : 144 : MediaStream 145 2 : AudioReceiveThread::getInfo() const 146 : { 147 2 : if (!audioDecoder_) 148 0 : return {}; 149 2 : return audioDecoder_->getStream("a:remote"); 150 : } 151 : 152 : void 153 166 : AudioReceiveThread::startReceiver() 154 : { 155 166 : loop_.start(); 156 166 : } 157 : 158 : void 159 166 : AudioReceiveThread::stopReceiver() 160 : { 161 166 : loop_.stop(); 162 166 : } 163 : 164 : }; // namespace jami