Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 : #include "video_receive_thread.h"
20 : #include "media/media_decoder.h"
21 : #include "socket_pair.h"
22 : #include "manager.h"
23 : #include "logger.h"
24 :
25 : extern "C" {
26 : #include <libavutil/display.h>
27 : }
28 :
29 : #include <unistd.h>
30 :
31 : namespace jami {
32 : namespace video {
33 :
34 : using std::string;
35 :
36 131 : VideoReceiveThread::VideoReceiveThread(const std::string& id, bool useSink, const std::string& sdp, uint16_t mtu)
37 : : VideoGenerator::VideoGenerator()
38 131 : , args_()
39 131 : , id_(id)
40 131 : , useSink_(useSink)
41 131 : , stream_(sdp)
42 131 : , sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
43 131 : , sink_ {Manager::instance().createSinkClient(id)}
44 131 : , mtu_(mtu)
45 262 : , loop_(std::bind(&VideoReceiveThread::setup, this),
46 262 : std::bind(&VideoReceiveThread::decodeFrame, this),
47 524 : std::bind(&VideoReceiveThread::cleanup, this))
48 : {
49 131 : JAMI_DBG("[%p] Instance created", this);
50 131 : }
51 :
52 393 : VideoReceiveThread::~VideoReceiveThread()
53 : {
54 131 : loop_.join();
55 131 : JAMI_DBG("[%p] Instance destroyed", this);
56 262 : }
57 :
58 : void
59 131 : VideoReceiveThread::startLoop()
60 : {
61 131 : JAMI_DBG("[%p] Starting receiver’s loop", this);
62 131 : loop_.start();
63 131 : }
64 :
65 : void
66 264 : VideoReceiveThread::stopLoop()
67 : {
68 264 : if (loop_.isStopping())
69 142 : return;
70 122 : JAMI_DBG("[%p] Stopping receiver’s loop and waiting for the thread to exit…", this);
71 122 : loop_.stop();
72 122 : loop_.join();
73 122 : JAMI_DBG("[%p] Receiver’s thread exited", this);
74 : }
75 :
76 : // We do this setup here instead of the constructor because we don't want the
77 : // main thread to block while this executes, so it happens in the video thread.
78 : bool
79 131 : VideoReceiveThread::setup()
80 : {
81 131 : JAMI_DBG("[%p] Setting up video receiver", this);
82 :
83 262 : videoDecoder_.reset(new MediaDecoder([this](const std::shared_ptr<MediaFrame>& frame) mutable {
84 5308 : libav_utils::AVBufferPtr displayMatrix;
85 : {
86 5308 : std::lock_guard l(rotationMtx_);
87 5308 : if (displayMatrix_)
88 5308 : displayMatrix.reset(av_buffer_ref(displayMatrix_.get()));
89 5308 : }
90 5308 : if (displayMatrix)
91 5308 : av_frame_new_side_data_from_buf(frame->pointer(), AV_FRAME_DATA_DISPLAYMATRIX, displayMatrix.release());
92 5308 : publishFrame(std::static_pointer_cast<VideoFrame>(frame));
93 5439 : }));
94 131 : videoDecoder_->setContextCallback([this]() {
95 30 : if (recorderCallback_)
96 30 : recorderCallback_(getInfo());
97 30 : });
98 131 : videoDecoder_->setResolutionChangedCallback([this](int width, int height) {
99 0 : dstWidth_ = width;
100 0 : dstHeight_ = height;
101 0 : sink_->setFrameSize(dstWidth_, dstHeight_);
102 0 : });
103 :
104 131 : dstWidth_ = static_cast<int>(args_.width);
105 131 : dstHeight_ = static_cast<int>(args_.height);
106 :
107 131 : static const std::string SDP_FILENAME = "dummyFilename";
108 131 : if (args_.input.empty()) {
109 131 : args_.format = "sdp";
110 131 : args_.input = SDP_FILENAME;
111 0 : } else if (args_.input.substr(0, strlen("/dev/video")) == "/dev/video") {
112 : // it's a v4l device if starting with /dev/video
113 : // FIXME: This is not a robust way of checking if we mean to use a
114 : // v4l2 device
115 0 : args_.format = "video4linux2";
116 : }
117 :
118 131 : videoDecoder_->setInterruptCallback(interruptCb, this);
119 :
120 131 : if (args_.input == SDP_FILENAME) {
121 : // Force custom_io so the SDP demuxer will not open any UDP connections
122 : // We need it to use ICE transport.
123 131 : args_.sdp_flags = "custom_io";
124 :
125 131 : if (stream_.str().empty()) {
126 0 : JAMI_ERR("No SDP loaded");
127 0 : return false;
128 : }
129 :
130 131 : videoDecoder_->setIOContext(&sdpContext_);
131 : }
132 :
133 131 : args_.disable_dts_probe_delay = true;
134 :
135 131 : if (videoDecoder_->openInput(args_)) {
136 0 : JAMI_ERR("Unable to open input \"%s\"", args_.input.c_str());
137 0 : return false;
138 : }
139 :
140 131 : videoDecoder_->setKeyFrameRequestCb(keyFrameRequestCallback_);
141 :
142 131 : if (args_.input == SDP_FILENAME) {
143 : // Now replace our custom AVIOContext with one that will read packets
144 131 : videoDecoder_->setIOContext(demuxContext_.get());
145 : }
146 131 : return true;
147 : }
148 :
149 : void
150 131 : VideoReceiveThread::cleanup()
151 : {
152 131 : JAMI_DBG("[%p] Stopping receiver", this);
153 :
154 131 : detach(sink_.get());
155 131 : sink_->stop();
156 :
157 131 : videoDecoder_.reset();
158 131 : }
159 :
160 : // This callback is used by libav internally to break out of blocking calls
161 : int
162 179 : VideoReceiveThread::interruptCb(void* data)
163 : {
164 179 : auto* const context = static_cast<VideoReceiveThread*>(data);
165 179 : return not context->loop_.isRunning();
166 : }
167 :
168 : int
169 524 : VideoReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
170 : {
171 524 : std::istream& is = static_cast<VideoReceiveThread*>(opaque)->stream_;
172 524 : is.read(reinterpret_cast<char*>(buf), buf_size);
173 :
174 524 : auto count = is.gcount();
175 524 : if (count != 0)
176 131 : return static_cast<int>(count);
177 : else
178 393 : return AVERROR_EOF;
179 : }
180 :
181 : void
182 131 : VideoReceiveThread::addIOContext(SocketPair& socketPair)
183 : {
184 131 : demuxContext_.reset(socketPair.createIOContext(mtu_));
185 131 : }
186 :
187 : void
188 132 : VideoReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb)
189 : {
190 132 : recorderCallback_ = cb;
191 132 : if (videoDecoder_)
192 1 : videoDecoder_->setContextCallback([this]() {
193 1 : if (recorderCallback_)
194 1 : recorderCallback_(getInfo());
195 1 : });
196 132 : }
197 :
198 : void
199 5667 : VideoReceiveThread::decodeFrame()
200 : {
201 5667 : if (not loop_.isRunning())
202 0 : return;
203 :
204 5667 : if (not isVideoConfigured_) {
205 131 : if (!configureVideoOutput()) {
206 0 : JAMI_ERROR("[{:p}] Failed to configure video output", fmt::ptr(this));
207 0 : return;
208 : } else {
209 524 : JAMI_LOG("[{:p}] Decoder configured, starting decoding", fmt::ptr(this));
210 : }
211 : }
212 5667 : auto status = videoDecoder_->decode();
213 5667 : if (status == MediaDemuxer::Status::EndOfFile) {
214 420 : JAMI_LOG("[{:p}] End of file", fmt::ptr(this));
215 105 : loop_.stop();
216 5562 : } else if (status == MediaDemuxer::Status::ReadError) {
217 0 : JAMI_ERROR("[{:p}] Decoding error: %s", fmt::ptr(this), MediaDemuxer::getStatusStr(status));
218 5562 : } else if (status == MediaDemuxer::Status::FallBack) {
219 0 : if (keyFrameRequestCallback_)
220 0 : keyFrameRequestCallback_();
221 : }
222 : }
223 :
224 : bool
225 131 : VideoReceiveThread::configureVideoOutput()
226 : {
227 131 : assert(not isVideoConfigured_);
228 :
229 131 : JAMI_DBG("[%p] Configuring video output", this);
230 :
231 131 : if (not loop_.isRunning()) {
232 0 : JAMI_WARN("[%p] Unable to configure video output, the loop is not running!", this);
233 0 : return false;
234 : }
235 :
236 131 : if (videoDecoder_->setupVideo() < 0) {
237 0 : JAMI_ERR("Decoder IO startup failed");
238 0 : stopLoop();
239 0 : return false;
240 : }
241 :
242 : // Default size from input video
243 131 : if (dstWidth_ == 0 and dstHeight_ == 0) {
244 131 : dstWidth_ = videoDecoder_->getWidth();
245 131 : dstHeight_ = videoDecoder_->getHeight();
246 : }
247 :
248 131 : if (not sink_->start()) {
249 0 : JAMI_ERR("RX: sink startup failed");
250 0 : stopLoop();
251 0 : return false;
252 : }
253 :
254 131 : if (useSink_)
255 100 : startSink();
256 :
257 131 : if (onSuccessfulSetup_)
258 131 : onSuccessfulSetup_(MEDIA_VIDEO, 1);
259 :
260 131 : return isVideoConfigured_ = true;
261 : }
262 :
263 : void
264 314 : VideoReceiveThread::stopSink()
265 : {
266 314 : JAMI_DBG("[%p] Stopping sink", this);
267 :
268 314 : if (!loop_.isRunning())
269 264 : return;
270 :
271 50 : detach(sink_.get());
272 50 : sink_->setFrameSize(0, 0);
273 : }
274 :
275 : void
276 149 : VideoReceiveThread::startSink()
277 : {
278 149 : JAMI_DBG("[%p] Starting sink", this);
279 :
280 149 : if (!loop_.isRunning())
281 57 : return;
282 :
283 92 : if (dstWidth_ > 0 and dstHeight_ > 0 and attach(sink_.get()))
284 45 : sink_->setFrameSize(dstWidth_, dstHeight_);
285 : }
286 :
287 : int
288 0 : VideoReceiveThread::getWidth() const
289 : {
290 0 : return dstWidth_;
291 : }
292 :
293 : int
294 0 : VideoReceiveThread::getHeight() const
295 : {
296 0 : return dstHeight_;
297 : }
298 :
299 : AVPixelFormat
300 0 : VideoReceiveThread::getPixelFormat() const
301 : {
302 0 : if (videoDecoder_)
303 0 : return videoDecoder_->getPixelFormat();
304 0 : return {};
305 : }
306 :
307 : MediaStream
308 414 : VideoReceiveThread::getInfo() const
309 : {
310 414 : if (videoDecoder_)
311 164 : return videoDecoder_->getStream("v:remote");
312 250 : return {};
313 : }
314 :
315 : void
316 178 : VideoReceiveThread::setRotation(int angle)
317 : {
318 178 : libav_utils::AVBufferPtr displayMatrix(av_buffer_alloc(sizeof(int32_t) * 9));
319 178 : av_display_rotation_set(reinterpret_cast<int32_t*>(displayMatrix->data), angle);
320 178 : std::lock_guard l(rotationMtx_);
321 178 : displayMatrix_ = std::move(displayMatrix);
322 178 : }
323 :
324 : } // namespace video
325 : } // namespace jami
|