Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 : #include "video_receive_thread.h"
20 : #include "media/media_decoder.h"
21 : #include "socket_pair.h"
22 : #include "manager.h"
23 : #include "client/videomanager.h"
24 : #include "sinkclient.h"
25 : #include "logger.h"
26 :
27 : extern "C" {
28 : #include <libavutil/display.h>
29 : }
30 :
31 : #include <unistd.h>
32 : #include <map>
33 :
34 : namespace jami {
35 : namespace video {
36 :
37 : using std::string;
38 :
39 145 : VideoReceiveThread::VideoReceiveThread(const std::string& id,
40 : bool useSink,
41 : const std::string& sdp,
42 145 : uint16_t mtu)
43 : : VideoGenerator::VideoGenerator()
44 145 : , args_()
45 145 : , id_(id)
46 145 : , useSink_(useSink)
47 145 : , stream_(sdp)
48 145 : , sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
49 145 : , sink_ {Manager::instance().createSinkClient(id)}
50 145 : , mtu_(mtu)
51 290 : , loop_(std::bind(&VideoReceiveThread::setup, this),
52 290 : std::bind(&VideoReceiveThread::decodeFrame, this),
53 580 : std::bind(&VideoReceiveThread::cleanup, this))
54 : {
55 145 : JAMI_DBG("[%p] Instance created", this);
56 145 : }
57 :
58 435 : VideoReceiveThread::~VideoReceiveThread()
59 : {
60 145 : loop_.join();
61 145 : JAMI_DBG("[%p] Instance destroyed", this);
62 290 : }
63 :
64 : void
65 145 : VideoReceiveThread::startLoop()
66 : {
67 145 : JAMI_DBG("[%p] Starting receiver’s loop", this);
68 145 : loop_.start();
69 145 : }
70 :
71 : void
72 294 : VideoReceiveThread::stopLoop()
73 : {
74 294 : if (loop_.isStopping())
75 161 : return;
76 133 : JAMI_DBG("[%p] Stopping receiver’s loop and waiting for the thread to exit…", this);
77 133 : loop_.stop();
78 133 : loop_.join();
79 133 : JAMI_DBG("[%p] Receiver’s thread exited", this);
80 : }
81 :
82 : // We do this setup here instead of the constructor because we don't want the
83 : // main thread to block while this executes, so it happens in the video thread.
84 : bool
85 145 : VideoReceiveThread::setup()
86 : {
87 145 : JAMI_DBG("[%p] Setting up video receiver", this);
88 :
89 290 : videoDecoder_.reset(new MediaDecoder([this](const std::shared_ptr<MediaFrame>& frame) mutable {
90 5542 : libav_utils::AVBufferPtr displayMatrix;
91 : {
92 5542 : std::lock_guard l(rotationMtx_);
93 5542 : if (displayMatrix_)
94 5542 : displayMatrix.reset(av_buffer_ref(displayMatrix_.get()));
95 5542 : }
96 5542 : if (displayMatrix)
97 5542 : av_frame_new_side_data_from_buf(frame->pointer(),
98 : AV_FRAME_DATA_DISPLAYMATRIX,
99 : displayMatrix.release());
100 5542 : publishFrame(std::static_pointer_cast<VideoFrame>(frame));
101 5687 : }));
102 145 : videoDecoder_->setContextCallback([this]() {
103 31 : if (recorderCallback_)
104 31 : recorderCallback_(getInfo());
105 31 : });
106 145 : videoDecoder_->setResolutionChangedCallback([this](int width, int height) {
107 0 : dstWidth_ = width;
108 0 : dstHeight_ = height;
109 0 : sink_->setFrameSize(dstWidth_, dstHeight_);
110 0 : });
111 :
112 145 : dstWidth_ = args_.width;
113 145 : dstHeight_ = args_.height;
114 :
115 145 : static const std::string SDP_FILENAME = "dummyFilename";
116 145 : if (args_.input.empty()) {
117 145 : args_.format = "sdp";
118 145 : args_.input = SDP_FILENAME;
119 0 : } else if (args_.input.substr(0, strlen("/dev/video")) == "/dev/video") {
120 : // it's a v4l device if starting with /dev/video
121 : // FIXME: This is not a robust way of checking if we mean to use a
122 : // v4l2 device
123 0 : args_.format = "video4linux2";
124 : }
125 :
126 145 : videoDecoder_->setInterruptCallback(interruptCb, this);
127 :
128 145 : if (args_.input == SDP_FILENAME) {
129 : // Force custom_io so the SDP demuxer will not open any UDP connections
130 : // We need it to use ICE transport.
131 145 : args_.sdp_flags = "custom_io";
132 :
133 145 : if (stream_.str().empty()) {
134 0 : JAMI_ERR("No SDP loaded");
135 0 : return false;
136 : }
137 :
138 145 : videoDecoder_->setIOContext(&sdpContext_);
139 : }
140 :
141 145 : args_.disable_dts_probe_delay = true;
142 :
143 145 : if (videoDecoder_->openInput(args_)) {
144 0 : JAMI_ERR("Unable to open input \"%s\"", args_.input.c_str());
145 0 : return false;
146 : }
147 :
148 145 : if (args_.input == SDP_FILENAME) {
149 : // Now replace our custom AVIOContext with one that will read packets
150 145 : videoDecoder_->setIOContext(demuxContext_.get());
151 : }
152 145 : return true;
153 : }
154 :
155 : void
156 145 : VideoReceiveThread::cleanup()
157 : {
158 145 : JAMI_DBG("[%p] Stopping receiver", this);
159 :
160 145 : detach(sink_.get());
161 145 : sink_->stop();
162 :
163 145 : videoDecoder_.reset();
164 145 : }
165 :
166 : // This callback is used by libav internally to break out of blocking calls
167 : int
168 192 : VideoReceiveThread::interruptCb(void* data)
169 : {
170 192 : const auto context = static_cast<VideoReceiveThread*>(data);
171 192 : return not context->loop_.isRunning();
172 : }
173 :
174 : int
175 580 : VideoReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
176 : {
177 580 : std::istream& is = static_cast<VideoReceiveThread*>(opaque)->stream_;
178 580 : is.read(reinterpret_cast<char*>(buf), buf_size);
179 :
180 580 : auto count = is.gcount();
181 580 : if (count != 0)
182 145 : return count;
183 : else
184 435 : return AVERROR_EOF;
185 : }
186 :
187 : void
188 145 : VideoReceiveThread::addIOContext(SocketPair& socketPair)
189 : {
190 145 : demuxContext_.reset(socketPair.createIOContext(mtu_));
191 145 : }
192 :
193 : void
194 150 : VideoReceiveThread::setRecorderCallback(
195 : const std::function<void(const MediaStream& ms)>& cb)
196 : {
197 150 : recorderCallback_ = cb;
198 150 : if (videoDecoder_)
199 3 : videoDecoder_->setContextCallback([this]() {
200 1 : if (recorderCallback_)
201 1 : recorderCallback_(getInfo());
202 1 : });
203 150 : }
204 :
205 : void
206 5927 : VideoReceiveThread::decodeFrame()
207 : {
208 5927 : if (not loop_.isRunning())
209 0 : return;
210 :
211 5927 : if (not isVideoConfigured_) {
212 145 : if (!configureVideoOutput()) {
213 0 : JAMI_ERROR("[{:p}] Failed to configure video output", fmt::ptr(this));
214 0 : return;
215 : } else {
216 435 : JAMI_LOG("[{:p}] Decoder configured, starting decoding", fmt::ptr(this));
217 : }
218 : }
219 5927 : auto status = videoDecoder_->decode();
220 5927 : if (status == MediaDemuxer::Status::EndOfFile) {
221 360 : JAMI_LOG("[{:p}] End of file", fmt::ptr(this));
222 120 : loop_.stop();
223 : }
224 5807 : else if (status == MediaDemuxer::Status::ReadError) {
225 0 : JAMI_ERROR("[{:p}] Decoding error: %s", fmt::ptr(this), MediaDemuxer::getStatusStr(status));
226 : }
227 5807 : else if (status == MediaDemuxer::Status::FallBack) {
228 0 : if (keyFrameRequestCallback_)
229 0 : keyFrameRequestCallback_();
230 : }
231 : }
232 :
233 : bool
234 145 : VideoReceiveThread::configureVideoOutput()
235 : {
236 145 : assert(not isVideoConfigured_);
237 :
238 145 : JAMI_DBG("[%p] Configuring video output", this);
239 :
240 145 : if (not loop_.isRunning()) {
241 0 : JAMI_WARN("[%p] Unable to configure video output, the loop is not running!", this);
242 0 : return false;
243 : }
244 :
245 145 : if (videoDecoder_->setupVideo() < 0) {
246 0 : JAMI_ERR("Decoder IO startup failed");
247 0 : stopLoop();
248 0 : return false;
249 : }
250 :
251 : // Default size from input video
252 145 : if (dstWidth_ == 0 and dstHeight_ == 0) {
253 145 : dstWidth_ = videoDecoder_->getWidth();
254 145 : dstHeight_ = videoDecoder_->getHeight();
255 : }
256 :
257 145 : if (not sink_->start()) {
258 0 : JAMI_ERR("RX: sink startup failed");
259 0 : stopLoop();
260 0 : return false;
261 : }
262 :
263 145 : if (useSink_)
264 113 : startSink();
265 :
266 145 : if (onSuccessfulSetup_)
267 145 : onSuccessfulSetup_(MEDIA_VIDEO, 1);
268 :
269 145 : return isVideoConfigured_ = true;
270 : }
271 :
272 : void
273 345 : VideoReceiveThread::stopSink()
274 : {
275 345 : JAMI_DBG("[%p] Stopping sink", this);
276 :
277 345 : if (!loop_.isRunning())
278 294 : return;
279 :
280 51 : detach(sink_.get());
281 51 : sink_->setFrameSize(0, 0);
282 : }
283 :
284 : void
285 163 : VideoReceiveThread::startSink()
286 : {
287 163 : JAMI_DBG("[%p] Starting sink", this);
288 :
289 163 : if (!loop_.isRunning())
290 70 : return;
291 :
292 93 : if (dstWidth_ > 0 and dstHeight_ > 0 and attach(sink_.get()))
293 47 : sink_->setFrameSize(dstWidth_, dstHeight_);
294 : }
295 :
296 : int
297 0 : VideoReceiveThread::getWidth() const
298 : {
299 0 : return dstWidth_;
300 : }
301 :
302 : int
303 0 : VideoReceiveThread::getHeight() const
304 : {
305 0 : return dstHeight_;
306 : }
307 :
308 : AVPixelFormat
309 0 : VideoReceiveThread::getPixelFormat() const
310 : {
311 0 : if (videoDecoder_)
312 0 : return videoDecoder_->getPixelFormat();
313 0 : return {};
314 : }
315 :
316 : MediaStream
317 458 : VideoReceiveThread::getInfo() const
318 : {
319 458 : if (videoDecoder_)
320 179 : return videoDecoder_->getStream("v:remote");
321 279 : return {};
322 : }
323 :
324 : void
325 194 : VideoReceiveThread::setRotation(int angle)
326 : {
327 194 : libav_utils::AVBufferPtr displayMatrix(av_buffer_alloc(sizeof(int32_t) * 9));
328 194 : av_display_rotation_set(reinterpret_cast<int32_t*>(displayMatrix->data), angle);
329 194 : std::lock_guard l(rotationMtx_);
330 194 : displayMatrix_ = std::move(displayMatrix);
331 194 : }
332 :
333 : } // namespace video
334 : } // namespace jami
|