Line data Source code
1 : /* 2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "audio_receive_thread.h" 19 : #include "libav_deps.h" 20 : #include "logger.h" 21 : #include "manager.h" 22 : #include "media_decoder.h" 23 : #include "media_io_handle.h" 24 : #include "ringbufferpool.h" 25 : 26 : #include <memory> 27 : 28 : namespace jami { 29 : 30 175 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId, 31 : const AudioFormat& format, 32 : const std::string& sdp, 33 175 : const uint16_t mtu) 34 175 : : streamId_(streamId) 35 175 : , format_(format) 36 175 : , stream_(sdp) 37 175 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this)) 38 175 : , mtu_(mtu) 39 350 : , loop_(std::bind(&AudioReceiveThread::setup, this), 40 350 : std::bind(&AudioReceiveThread::process, this), 41 700 : std::bind(&AudioReceiveThread::cleanup, this)) 42 175 : {} 43 : 44 525 : AudioReceiveThread::~AudioReceiveThread() 45 : { 46 175 : loop_.join(); 47 350 : } 48 : 49 : bool 50 175 : AudioReceiveThread::setup() 51 : { 52 175 : std::lock_guard lk(mutex_); 53 350 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable { 54 0 : notify(frame); 55 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame)); 56 174 : })); 57 175 : audioDecoder_->setContextCallback([this]() { 58 0 : if (recorderCallback_) 59 0 : recorderCallback_(getInfo()); 60 0 : }); 61 174 : audioDecoder_->setInterruptCallback(interruptCb, this); 62 : 63 : // custom_io so the SDP demuxer will not open any UDP connections 64 175 : args_.input = SDP_FILENAME; 65 175 : args_.format = "sdp"; 66 175 : args_.sdp_flags = "custom_io"; 67 : 68 175 : if (stream_.str().empty()) { 69 0 : JAMI_ERR("No SDP loaded"); 70 0 : return false; 71 : } 72 : 73 175 : audioDecoder_->setIOContext(sdpContext_.get()); 74 175 : audioDecoder_->setFEC(true); 75 175 : if (audioDecoder_->openInput(args_)) { 76 0 : JAMI_ERR("Unable to open input \"%s\"", SDP_FILENAME); 77 0 : return false; 78 : } 79 : 80 : // Now replace our custom AVIOContext with one that will read packets 81 175 : audioDecoder_->setIOContext(demuxContext_.get()); 82 175 : if (audioDecoder_->setupAudio()) { 83 0 : JAMI_ERR("decoder IO startup failed"); 84 0 : return false; 85 : } 86 : 87 175 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_); 88 : 89 175 : if (onSuccessfulSetup_) 90 175 : onSuccessfulSetup_(MEDIA_AUDIO, 1); 91 : 92 175 : return true; 93 175 : } 94 : 95 : void 96 0 : AudioReceiveThread::process() 97 : { 98 0 : audioDecoder_->decode(); 99 0 : } 100 : 101 : void 102 175 : AudioReceiveThread::cleanup() 103 : { 104 175 : std::lock_guard lk(mutex_); 105 175 : audioDecoder_.reset(); 106 175 : demuxContext_.reset(); 107 175 : } 108 : 109 : int 110 700 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size) 111 : { 112 700 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_; 113 700 : is.read(reinterpret_cast<char*>(buf), buf_size); 114 : 115 700 : auto count = is.gcount(); 116 700 : return count ? static_cast<int>(count) : AVERROR_EOF; 117 : } 118 : 119 : // This callback is used by libav internally to break out of blocking calls 120 : int 121 175 : AudioReceiveThread::interruptCb(void* data) 122 : { 123 175 : auto* context = static_cast<AudioReceiveThread*>(data); 124 175 : return not context->loop_.isRunning(); 125 : } 126 : 127 : void 128 175 : AudioReceiveThread::addIOContext(SocketPair& socketPair) 129 : { 130 175 : demuxContext_.reset(socketPair.createIOContext(mtu_)); 131 175 : } 132 : 133 : void 134 176 : AudioReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb) 135 : { 136 176 : recorderCallback_ = cb; 137 176 : if (audioDecoder_) 138 1 : audioDecoder_->setContextCallback([this]() { 139 0 : if (recorderCallback_) 140 0 : recorderCallback_(getInfo()); 141 0 : }); 142 176 : } 143 : 144 : MediaStream 145 3 : AudioReceiveThread::getInfo() const 146 : { 147 3 : if (!audioDecoder_) 148 0 : return {}; 149 3 : return audioDecoder_->getStream("a:remote"); 150 : } 151 : 152 : void 153 175 : AudioReceiveThread::startReceiver() 154 : { 155 175 : loop_.start(); 156 175 : } 157 : 158 : void 159 175 : AudioReceiveThread::stopReceiver() 160 : { 161 175 : loop_.stop(); 162 175 : } 163 : 164 : }; // namespace jami