Line data Source code
1 : /* 2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc. 3 : * 4 : * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com> 5 : * Author: Philippe Gorley <philippe.gorley@savoirfairelinux.com> 6 : * 7 : * This program is free software; you can redistribute it and/or modify 8 : * it under the terms of the GNU General Public License as published by 9 : * the Free Software Foundation; either version 3 of the License, or 10 : * (at your option) any later version. 11 : * 12 : * This program is distributed in the hope that it will be useful, 13 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 : * GNU General Public License for more details. 16 : * 17 : * You should have received a copy of the GNU General Public License 18 : * along with this program; if not, write to the Free Software 19 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 20 : */ 21 : 22 : #include "audio_receive_thread.h" 23 : #include "libav_deps.h" 24 : #include "logger.h" 25 : #include "manager.h" 26 : #include "media_decoder.h" 27 : #include "media_io_handle.h" 28 : #include "media_recorder.h" 29 : #include "ringbuffer.h" 30 : #include "ringbufferpool.h" 31 : 32 : #include <memory> 33 : 34 : namespace jami { 35 : 36 187 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId, 37 : const AudioFormat& format, 38 : const std::string& sdp, 39 187 : const uint16_t mtu) 40 187 : : streamId_(streamId) 41 187 : , format_(format) 42 187 : , stream_(sdp) 43 187 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this)) 44 187 : , mtu_(mtu) 45 374 : , loop_(std::bind(&AudioReceiveThread::setup, this), 46 374 : std::bind(&AudioReceiveThread::process, this), 47 748 : std::bind(&AudioReceiveThread::cleanup, this)) 48 187 : {} 49 : 50 561 : AudioReceiveThread::~AudioReceiveThread() 51 : { 52 187 : loop_.join(); 53 374 : } 54 : 55 : bool 56 187 : AudioReceiveThread::setup() 57 : { 58 187 : std::lock_guard lk(mutex_); 59 374 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable { 60 0 : notify(frame); 61 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame)); 62 187 : })); 63 187 : audioDecoder_->setContextCallback([this]() { 64 0 : if (recorderCallback_) 65 0 : recorderCallback_(getInfo()); 66 0 : }); 67 187 : audioDecoder_->setInterruptCallback(interruptCb, this); 68 : 69 : // custom_io so the SDP demuxer will not open any UDP connections 70 187 : args_.input = SDP_FILENAME; 71 187 : args_.format = "sdp"; 72 187 : args_.sdp_flags = "custom_io"; 73 : 74 187 : if (stream_.str().empty()) { 75 0 : JAMI_ERR("No SDP loaded"); 76 0 : return false; 77 : } 78 : 79 187 : audioDecoder_->setIOContext(sdpContext_.get()); 80 187 : audioDecoder_->setFEC(true); 81 187 : if (audioDecoder_->openInput(args_)) { 82 0 : JAMI_ERR("Could not open input \"%s\"", SDP_FILENAME); 83 0 : return false; 84 : } 85 : 86 : // Now replace our custom AVIOContext with one that will read packets 87 187 : audioDecoder_->setIOContext(demuxContext_.get()); 88 187 : if (audioDecoder_->setupAudio()) { 89 0 : JAMI_ERR("decoder IO startup failed"); 90 0 : return false; 91 : } 92 : 93 187 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_); 94 187 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(RingBufferPool::DEFAULT_ID, streamId_); 95 : 96 187 : if (onSuccessfulSetup_) 97 187 : onSuccessfulSetup_(MEDIA_AUDIO, 1); 98 : 99 187 : return true; 100 187 : } 101 : 102 : void 103 1 : AudioReceiveThread::process() 104 : { 105 1 : audioDecoder_->decode(); 106 1 : } 107 : 108 : void 109 187 : AudioReceiveThread::cleanup() 110 : { 111 187 : std::lock_guard lk(mutex_); 112 187 : audioDecoder_.reset(); 113 187 : demuxContext_.reset(); 114 187 : } 115 : 116 : int 117 748 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size) 118 : { 119 748 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_; 120 748 : is.read(reinterpret_cast<char*>(buf), buf_size); 121 : 122 748 : auto count = is.gcount(); 123 748 : return count ? count : AVERROR_EOF; 124 : } 125 : 126 : // This callback is used by libav internally to break out of blocking calls 127 : int 128 187 : AudioReceiveThread::interruptCb(void* data) 129 : { 130 187 : auto context = static_cast<AudioReceiveThread*>(data); 131 187 : return not context->loop_.isRunning(); 132 : } 133 : 134 : void 135 187 : AudioReceiveThread::addIOContext(SocketPair& socketPair) 136 : { 137 187 : demuxContext_.reset(socketPair.createIOContext(mtu_)); 138 187 : } 139 : 140 : void 141 195 : AudioReceiveThread::setRecorderCallback( 142 : const std::function<void(const MediaStream& ms)>& cb) 143 : { 144 195 : recorderCallback_ = cb; 145 195 : if (audioDecoder_) 146 7 : audioDecoder_->setContextCallback([this]() { 147 0 : if (recorderCallback_) 148 0 : recorderCallback_(getInfo()); 149 0 : }); 150 195 : } 151 : 152 : MediaStream 153 9 : AudioReceiveThread::getInfo() const 154 : { 155 9 : if (!audioDecoder_) 156 1 : return {}; 157 8 : return audioDecoder_->getStream("a:remote"); 158 : } 159 : 160 : void 161 187 : AudioReceiveThread::startReceiver() 162 : { 163 187 : loop_.start(); 164 187 : } 165 : 166 : void 167 187 : AudioReceiveThread::stopReceiver() 168 : { 169 187 : loop_.stop(); 170 187 : } 171 : 172 : }; // namespace jami