Line data Source code
1 : /* 2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "audio_receive_thread.h" 19 : #include "libav_deps.h" 20 : #include "logger.h" 21 : #include "manager.h" 22 : #include "media_decoder.h" 23 : #include "media_io_handle.h" 24 : #include "media_recorder.h" 25 : #include "ringbuffer.h" 26 : #include "ringbufferpool.h" 27 : 28 : #include <memory> 29 : 30 : namespace jami { 31 : 32 190 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId, 33 : const AudioFormat& format, 34 : const std::string& sdp, 35 190 : const uint16_t mtu) 36 190 : : streamId_(streamId) 37 190 : , format_(format) 38 190 : , stream_(sdp) 39 190 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this)) 40 190 : , mtu_(mtu) 41 380 : , loop_(std::bind(&AudioReceiveThread::setup, this), 42 380 : std::bind(&AudioReceiveThread::process, this), 43 760 : std::bind(&AudioReceiveThread::cleanup, this)) 44 190 : {} 45 : 46 570 : AudioReceiveThread::~AudioReceiveThread() 47 : { 48 190 : loop_.join(); 49 380 : } 50 : 51 : bool 52 190 : AudioReceiveThread::setup() 53 : { 54 190 : std::lock_guard lk(mutex_); 55 380 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable { 56 0 : notify(frame); 57 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame)); 58 190 : })); 59 190 : audioDecoder_->setContextCallback([this]() { 60 0 : if (recorderCallback_) 61 0 : recorderCallback_(getInfo()); 62 0 : }); 63 190 : audioDecoder_->setInterruptCallback(interruptCb, this); 64 : 65 : // custom_io so the SDP demuxer will not open any UDP connections 66 190 : args_.input = SDP_FILENAME; 67 190 : args_.format = "sdp"; 68 190 : args_.sdp_flags = "custom_io"; 69 : 70 190 : if (stream_.str().empty()) { 71 0 : JAMI_ERR("No SDP loaded"); 72 0 : return false; 73 : } 74 : 75 190 : audioDecoder_->setIOContext(sdpContext_.get()); 76 190 : audioDecoder_->setFEC(true); 77 190 : if (audioDecoder_->openInput(args_)) { 78 0 : JAMI_ERR("Unable to open input \"%s\"", SDP_FILENAME); 79 0 : return false; 80 : } 81 : 82 : // Now replace our custom AVIOContext with one that will read packets 83 190 : audioDecoder_->setIOContext(demuxContext_.get()); 84 190 : if (audioDecoder_->setupAudio()) { 85 0 : JAMI_ERR("decoder IO startup failed"); 86 0 : return false; 87 : } 88 : 89 190 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_); 90 : 91 190 : if (onSuccessfulSetup_) 92 190 : onSuccessfulSetup_(MEDIA_AUDIO, 1); 93 : 94 190 : return true; 95 190 : } 96 : 97 : void 98 0 : AudioReceiveThread::process() 99 : { 100 0 : audioDecoder_->decode(); 101 0 : } 102 : 103 : void 104 190 : AudioReceiveThread::cleanup() 105 : { 106 190 : std::lock_guard lk(mutex_); 107 190 : audioDecoder_.reset(); 108 190 : demuxContext_.reset(); 109 190 : } 110 : 111 : int 112 760 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size) 113 : { 114 760 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_; 115 760 : is.read(reinterpret_cast<char*>(buf), buf_size); 116 : 117 760 : auto count = is.gcount(); 118 760 : return count ? count : AVERROR_EOF; 119 : } 120 : 121 : // This callback is used by libav internally to break out of blocking calls 122 : int 123 190 : AudioReceiveThread::interruptCb(void* data) 124 : { 125 190 : auto context = static_cast<AudioReceiveThread*>(data); 126 190 : return not context->loop_.isRunning(); 127 : } 128 : 129 : void 130 190 : AudioReceiveThread::addIOContext(SocketPair& socketPair) 131 : { 132 190 : demuxContext_.reset(socketPair.createIOContext(mtu_)); 133 190 : } 134 : 135 : void 136 196 : AudioReceiveThread::setRecorderCallback( 137 : const std::function<void(const MediaStream& ms)>& cb) 138 : { 139 196 : recorderCallback_ = cb; 140 196 : if (audioDecoder_) 141 6 : audioDecoder_->setContextCallback([this]() { 142 0 : if (recorderCallback_) 143 0 : recorderCallback_(getInfo()); 144 0 : }); 145 196 : } 146 : 147 : MediaStream 148 6 : AudioReceiveThread::getInfo() const 149 : { 150 6 : if (!audioDecoder_) 151 0 : return {}; 152 6 : return audioDecoder_->getStream("a:remote"); 153 : } 154 : 155 : void 156 190 : AudioReceiveThread::startReceiver() 157 : { 158 190 : loop_.start(); 159 190 : } 160 : 161 : void 162 190 : AudioReceiveThread::stopReceiver() 163 : { 164 190 : loop_.stop(); 165 190 : } 166 : 167 : }; // namespace jami