Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "audio_receive_thread.h"
19 : #include "libav_deps.h"
20 : #include "logger.h"
21 : #include "manager.h"
22 : #include "media_decoder.h"
23 : #include "media_io_handle.h"
24 : #include "ringbufferpool.h"
25 :
26 : #include <memory>
27 :
28 : namespace jami {
29 :
30 176 : AudioReceiveThread::AudioReceiveThread(const std::string& streamId,
31 : const AudioFormat& format,
32 : const std::string& sdp,
33 176 : const uint16_t mtu)
34 176 : : streamId_(streamId)
35 176 : , format_(format)
36 176 : , stream_(sdp)
37 176 : , sdpContext_(new MediaIOHandle(sdp.size(), false, &readFunction, 0, 0, this))
38 176 : , mtu_(mtu)
39 352 : , loop_(std::bind(&AudioReceiveThread::setup, this),
40 352 : std::bind(&AudioReceiveThread::process, this),
41 704 : std::bind(&AudioReceiveThread::cleanup, this))
42 176 : {}
43 :
44 528 : AudioReceiveThread::~AudioReceiveThread()
45 : {
46 176 : loop_.join();
47 352 : }
48 :
49 : bool
50 176 : AudioReceiveThread::setup()
51 : {
52 176 : std::lock_guard lk(mutex_);
53 176 : audioDecoder_.reset(new MediaDecoder([this](std::shared_ptr<MediaFrame>&& frame) mutable {
54 0 : notify(frame);
55 0 : ringbuffer_->put(std::static_pointer_cast<AudioFrame>(frame));
56 352 : }));
57 176 : audioDecoder_->setContextCallback([this]() {
58 0 : if (recorderCallback_)
59 0 : recorderCallback_(getInfo());
60 0 : });
61 176 : audioDecoder_->setInterruptCallback(interruptCb, this);
62 :
63 : // custom_io so the SDP demuxer will not open any UDP connections
64 176 : args_.input = SDP_FILENAME;
65 176 : args_.format = "sdp";
66 176 : args_.sdp_flags = "custom_io";
67 :
68 175 : if (stream_.str().empty()) {
69 0 : JAMI_ERROR("No SDP loaded");
70 0 : return false;
71 : }
72 :
73 176 : audioDecoder_->setIOContext(sdpContext_.get());
74 175 : audioDecoder_->setFEC(true);
75 175 : if (audioDecoder_->openInput(args_)) {
76 0 : JAMI_ERROR("Unable to open input \"{}\"", SDP_FILENAME);
77 0 : return false;
78 : }
79 :
80 : // Now replace our custom AVIOContext with one that will read packets
81 176 : audioDecoder_->setIOContext(demuxContext_.get());
82 176 : if (audioDecoder_->setupAudio()) {
83 0 : JAMI_ERROR("decoder IO startup failed");
84 0 : return false;
85 : }
86 :
87 176 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
88 :
89 176 : if (onSuccessfulSetup_)
90 176 : onSuccessfulSetup_(MEDIA_AUDIO, 1);
91 :
92 176 : return true;
93 176 : }
94 :
95 : void
96 0 : AudioReceiveThread::process()
97 : {
98 0 : audioDecoder_->decode();
99 0 : }
100 :
101 : void
102 176 : AudioReceiveThread::cleanup()
103 : {
104 176 : std::lock_guard lk(mutex_);
105 176 : audioDecoder_.reset();
106 176 : demuxContext_.reset();
107 176 : }
108 :
109 : int
110 565 : AudioReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
111 : {
112 565 : std::istream& is = static_cast<AudioReceiveThread*>(opaque)->stream_;
113 565 : is.read(reinterpret_cast<char*>(buf), buf_size);
114 :
115 565 : auto count = is.gcount();
116 565 : return count ? static_cast<int>(count) : AVERROR_EOF;
117 : }
118 :
119 : // This callback is used by libav internally to break out of blocking calls
120 : int
121 176 : AudioReceiveThread::interruptCb(void* data)
122 : {
123 176 : auto* context = static_cast<AudioReceiveThread*>(data);
124 176 : return not context->loop_.isRunning();
125 : }
126 :
127 : void
128 176 : AudioReceiveThread::addIOContext(SocketPair& socketPair)
129 : {
130 176 : demuxContext_.reset(socketPair.createIOContext(mtu_));
131 176 : }
132 :
133 : void
134 177 : AudioReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb)
135 : {
136 177 : recorderCallback_ = cb;
137 177 : if (audioDecoder_)
138 1 : audioDecoder_->setContextCallback([this]() {
139 0 : if (recorderCallback_)
140 0 : recorderCallback_(getInfo());
141 0 : });
142 177 : }
143 :
144 : MediaStream
145 3 : AudioReceiveThread::getInfo() const
146 : {
147 3 : if (!audioDecoder_)
148 0 : return {};
149 9 : return audioDecoder_->getStream("a:remote");
150 : }
151 :
152 : void
153 176 : AudioReceiveThread::startReceiver()
154 : {
155 176 : loop_.start();
156 176 : }
157 :
158 : void
159 176 : AudioReceiveThread::stopReceiver()
160 : {
161 176 : loop_.stop();
162 176 : }
163 :
164 : }; // namespace jami
|