Line data Source code
1 : /* 2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "audio_receive_thread.h" 19 : #include "libav_deps.h" 20 : #include "logger.h" 21 : #include "manager.h" 22 : #include "media_decoder.h" 23 : #include "media_io_handle.h" 24 : #include "media_recorder.h" 25 : #include "ringbuffer.h" 26 : #include "ringbufferpool.h" 27 : 28 : #include <memory> 29 : 30 : namespace jami { 31 : 32 180 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId, 33 : const AudioFormat& format, 34 : const std::string& sdp, 35 180 : const uint16_t mtu) 36 180 : : streamId_(streamId) 37 180 : , format_(format) 38 180 : , stream_(sdp) 39 180 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this)) 40 180 : , mtu_(mtu) 41 360 : , loop_(std::bind(&AudioReceiveThread::setup, this), 42 360 : std::bind(&AudioReceiveThread::process, this), 43 720 : std::bind(&AudioReceiveThread::cleanup, this)) 44 180 : {} 45 : 46 540 : AudioReceiveThread::~AudioReceiveThread() 47 : { 48 180 : loop_.join(); 49 360 : } 50 : 51 : bool 52 180 : AudioReceiveThread::setup() 53 : { 54 180 : std::lock_guard lk(mutex_); 55 360 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable { 56 0 : notify(frame); 57 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame)); 58 180 : })); 59 180 : audioDecoder_->setContextCallback([this]() { 60 0 : if (recorderCallback_) 61 0 : recorderCallback_(getInfo()); 62 0 : }); 63 180 : audioDecoder_->setInterruptCallback(interruptCb, this); 64 : 65 : // custom_io so the SDP demuxer will not open any UDP connections 66 180 : args_.input = SDP_FILENAME; 67 180 : args_.format = "sdp"; 68 180 : args_.sdp_flags = "custom_io"; 69 : 70 180 : if (stream_.str().empty()) { 71 0 : JAMI_ERR("No SDP loaded"); 72 0 : return false; 73 : } 74 : 75 180 : audioDecoder_->setIOContext(sdpContext_.get()); 76 180 : audioDecoder_->setFEC(true); 77 180 : if (audioDecoder_->openInput(args_)) { 78 0 : JAMI_ERR("Unable to open input \"%s\"", SDP_FILENAME); 79 0 : return false; 80 : } 81 : 82 : // Now replace our custom AVIOContext with one that will read packets 83 180 : audioDecoder_->setIOContext(demuxContext_.get()); 84 180 : if (audioDecoder_->setupAudio()) { 85 0 : JAMI_ERR("decoder IO startup failed"); 86 0 : return false; 87 : } 88 : 89 180 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_); 90 : 91 180 : if (onSuccessfulSetup_) 92 180 : onSuccessfulSetup_(MEDIA_AUDIO, 1); 93 : 94 180 : return true; 95 180 : } 96 : 97 : void 98 0 : AudioReceiveThread::process() 99 : { 100 0 : audioDecoder_->decode(); 101 0 : } 102 : 103 : void 104 180 : AudioReceiveThread::cleanup() 105 : { 106 180 : std::lock_guard lk(mutex_); 107 180 : audioDecoder_.reset(); 108 180 : demuxContext_.reset(); 109 180 : } 110 : 111 : int 112 720 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size) 113 : { 114 720 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_; 115 720 : is.read(reinterpret_cast<char*>(buf), buf_size); 116 : 117 720 : auto count = is.gcount(); 118 720 : return count ? count : AVERROR_EOF; 119 : } 120 : 121 : // This callback is used by libav internally to break out of blocking calls 122 : int 123 180 : AudioReceiveThread::interruptCb(void* data) 124 : { 125 180 : auto context = static_cast<AudioReceiveThread*>(data); 126 180 : return not context->loop_.isRunning(); 127 : } 128 : 129 : void 130 180 : AudioReceiveThread::addIOContext(SocketPair& socketPair) 131 : { 132 180 : demuxContext_.reset(socketPair.createIOContext(mtu_)); 133 180 : } 134 : 135 : void 136 181 : AudioReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb) 137 : { 138 181 : recorderCallback_ = cb; 139 181 : if (audioDecoder_) 140 1 : audioDecoder_->setContextCallback([this]() { 141 0 : if (recorderCallback_) 142 0 : recorderCallback_(getInfo()); 143 0 : }); 144 181 : } 145 : 146 : MediaStream 147 3 : AudioReceiveThread::getInfo() const 148 : { 149 3 : if (!audioDecoder_) 150 0 : return {}; 151 3 : return audioDecoder_->getStream("a:remote"); 152 : } 153 : 154 : void 155 180 : AudioReceiveThread::startReceiver() 156 : { 157 180 : loop_.start(); 158 180 : } 159 : 160 : void 161 180 : AudioReceiveThread::stopReceiver() 162 : { 163 180 : loop_.stop(); 164 180 : } 165 : 166 : }; // namespace jami