Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 : #include "video_receive_thread.h"
20 : #include "media/media_decoder.h"
21 : #include "socket_pair.h"
22 : #include "manager.h"
23 : #include "client/videomanager.h"
24 : #include "sinkclient.h"
25 : #include "logger.h"
26 :
27 : extern "C" {
28 : #include <libavutil/display.h>
29 : }
30 :
31 : #include <unistd.h>
32 : #include <map>
33 :
34 : namespace jami {
35 : namespace video {
36 :
37 : using std::string;
38 :
39 139 : VideoReceiveThread::VideoReceiveThread(const std::string& id, bool useSink, const std::string& sdp, uint16_t mtu)
40 : : VideoGenerator::VideoGenerator()
41 139 : , args_()
42 139 : , id_(id)
43 139 : , useSink_(useSink)
44 139 : , stream_(sdp)
45 139 : , sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
46 139 : , sink_ {Manager::instance().createSinkClient(id)}
47 139 : , mtu_(mtu)
48 278 : , loop_(std::bind(&VideoReceiveThread::setup, this),
49 278 : std::bind(&VideoReceiveThread::decodeFrame, this),
50 556 : std::bind(&VideoReceiveThread::cleanup, this))
51 : {
52 139 : JAMI_DBG("[%p] Instance created", this);
53 139 : }
54 :
55 417 : VideoReceiveThread::~VideoReceiveThread()
56 : {
57 139 : loop_.join();
58 139 : JAMI_DBG("[%p] Instance destroyed", this);
59 278 : }
60 :
61 : void
62 139 : VideoReceiveThread::startLoop()
63 : {
64 139 : JAMI_DBG("[%p] Starting receiver’s loop", this);
65 139 : loop_.start();
66 139 : }
67 :
68 : void
69 282 : VideoReceiveThread::stopLoop()
70 : {
71 282 : if (loop_.isStopping())
72 152 : return;
73 130 : JAMI_DBG("[%p] Stopping receiver’s loop and waiting for the thread to exit…", this);
74 130 : loop_.stop();
75 130 : loop_.join();
76 130 : JAMI_DBG("[%p] Receiver’s thread exited", this);
77 : }
78 :
79 : // We do this setup here instead of the constructor because we don't want the
80 : // main thread to block while this executes, so it happens in the video thread.
81 : bool
82 139 : VideoReceiveThread::setup()
83 : {
84 139 : JAMI_DBG("[%p] Setting up video receiver", this);
85 :
86 278 : videoDecoder_.reset(new MediaDecoder([this](const std::shared_ptr<MediaFrame>& frame) mutable {
87 5475 : libav_utils::AVBufferPtr displayMatrix;
88 : {
89 5475 : std::lock_guard l(rotationMtx_);
90 5475 : if (displayMatrix_)
91 5475 : displayMatrix.reset(av_buffer_ref(displayMatrix_.get()));
92 5475 : }
93 5475 : if (displayMatrix)
94 5475 : av_frame_new_side_data_from_buf(frame->pointer(), AV_FRAME_DATA_DISPLAYMATRIX, displayMatrix.release());
95 5475 : publishFrame(std::static_pointer_cast<VideoFrame>(frame));
96 5614 : }));
97 139 : videoDecoder_->setContextCallback([this]() {
98 31 : if (recorderCallback_)
99 31 : recorderCallback_(getInfo());
100 31 : });
101 139 : videoDecoder_->setResolutionChangedCallback([this](int width, int height) {
102 0 : dstWidth_ = width;
103 0 : dstHeight_ = height;
104 0 : sink_->setFrameSize(dstWidth_, dstHeight_);
105 0 : });
106 :
107 139 : dstWidth_ = args_.width;
108 139 : dstHeight_ = args_.height;
109 :
110 139 : static const std::string SDP_FILENAME = "dummyFilename";
111 139 : if (args_.input.empty()) {
112 139 : args_.format = "sdp";
113 139 : args_.input = SDP_FILENAME;
114 0 : } else if (args_.input.substr(0, strlen("/dev/video")) == "/dev/video") {
115 : // it's a v4l device if starting with /dev/video
116 : // FIXME: This is not a robust way of checking if we mean to use a
117 : // v4l2 device
118 0 : args_.format = "video4linux2";
119 : }
120 :
121 139 : videoDecoder_->setInterruptCallback(interruptCb, this);
122 :
123 139 : if (args_.input == SDP_FILENAME) {
124 : // Force custom_io so the SDP demuxer will not open any UDP connections
125 : // We need it to use ICE transport.
126 139 : args_.sdp_flags = "custom_io";
127 :
128 139 : if (stream_.str().empty()) {
129 0 : JAMI_ERR("No SDP loaded");
130 0 : return false;
131 : }
132 :
133 139 : videoDecoder_->setIOContext(&sdpContext_);
134 : }
135 :
136 139 : args_.disable_dts_probe_delay = true;
137 :
138 139 : if (videoDecoder_->openInput(args_)) {
139 0 : JAMI_ERR("Unable to open input \"%s\"", args_.input.c_str());
140 0 : return false;
141 : }
142 :
143 139 : if (args_.input == SDP_FILENAME) {
144 : // Now replace our custom AVIOContext with one that will read packets
145 139 : videoDecoder_->setIOContext(demuxContext_.get());
146 : }
147 139 : return true;
148 : }
149 :
150 : void
151 139 : VideoReceiveThread::cleanup()
152 : {
153 139 : JAMI_DBG("[%p] Stopping receiver", this);
154 :
155 139 : detach(sink_.get());
156 139 : sink_->stop();
157 :
158 139 : videoDecoder_.reset();
159 139 : }
160 :
161 : // This callback is used by libav internally to break out of blocking calls
162 : int
163 186 : VideoReceiveThread::interruptCb(void* data)
164 : {
165 186 : const auto context = static_cast<VideoReceiveThread*>(data);
166 186 : return not context->loop_.isRunning();
167 : }
168 :
169 : int
170 556 : VideoReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
171 : {
172 556 : std::istream& is = static_cast<VideoReceiveThread*>(opaque)->stream_;
173 556 : is.read(reinterpret_cast<char*>(buf), buf_size);
174 :
175 556 : auto count = is.gcount();
176 556 : if (count != 0)
177 139 : return count;
178 : else
179 417 : return AVERROR_EOF;
180 : }
181 :
182 : void
183 139 : VideoReceiveThread::addIOContext(SocketPair& socketPair)
184 : {
185 139 : demuxContext_.reset(socketPair.createIOContext(mtu_));
186 139 : }
187 :
188 : void
189 140 : VideoReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb)
190 : {
191 140 : recorderCallback_ = cb;
192 140 : if (videoDecoder_)
193 1 : videoDecoder_->setContextCallback([this]() {
194 1 : if (recorderCallback_)
195 1 : recorderCallback_(getInfo());
196 1 : });
197 140 : }
198 :
199 : void
200 5857 : VideoReceiveThread::decodeFrame()
201 : {
202 5857 : if (not loop_.isRunning())
203 1 : return;
204 :
205 5856 : if (not isVideoConfigured_) {
206 139 : if (!configureVideoOutput()) {
207 0 : JAMI_ERROR("[{:p}] Failed to configure video output", fmt::ptr(this));
208 0 : return;
209 : } else {
210 556 : JAMI_LOG("[{:p}] Decoder configured, starting decoding", fmt::ptr(this));
211 : }
212 : }
213 5856 : auto status = videoDecoder_->decode();
214 5856 : if (status == MediaDemuxer::Status::EndOfFile) {
215 464 : JAMI_LOG("[{:p}] End of file", fmt::ptr(this));
216 116 : loop_.stop();
217 5740 : } else if (status == MediaDemuxer::Status::ReadError) {
218 0 : JAMI_ERROR("[{:p}] Decoding error: %s", fmt::ptr(this), MediaDemuxer::getStatusStr(status));
219 5740 : } else if (status == MediaDemuxer::Status::FallBack) {
220 0 : if (keyFrameRequestCallback_)
221 0 : keyFrameRequestCallback_();
222 : }
223 : }
224 :
225 : bool
226 139 : VideoReceiveThread::configureVideoOutput()
227 : {
228 139 : assert(not isVideoConfigured_);
229 :
230 139 : JAMI_DBG("[%p] Configuring video output", this);
231 :
232 139 : if (not loop_.isRunning()) {
233 0 : JAMI_WARN("[%p] Unable to configure video output, the loop is not running!", this);
234 0 : return false;
235 : }
236 :
237 139 : if (videoDecoder_->setupVideo() < 0) {
238 0 : JAMI_ERR("Decoder IO startup failed");
239 0 : stopLoop();
240 0 : return false;
241 : }
242 :
243 : // Default size from input video
244 139 : if (dstWidth_ == 0 and dstHeight_ == 0) {
245 139 : dstWidth_ = videoDecoder_->getWidth();
246 139 : dstHeight_ = videoDecoder_->getHeight();
247 : }
248 :
249 139 : if (not sink_->start()) {
250 0 : JAMI_ERR("RX: sink startup failed");
251 0 : stopLoop();
252 0 : return false;
253 : }
254 :
255 139 : if (useSink_)
256 109 : startSink();
257 :
258 139 : if (onSuccessfulSetup_)
259 139 : onSuccessfulSetup_(MEDIA_VIDEO, 1);
260 :
261 139 : return isVideoConfigured_ = true;
262 : }
263 :
264 : void
265 332 : VideoReceiveThread::stopSink()
266 : {
267 332 : JAMI_DBG("[%p] Stopping sink", this);
268 :
269 332 : if (!loop_.isRunning())
270 282 : return;
271 :
272 50 : detach(sink_.get());
273 50 : sink_->setFrameSize(0, 0);
274 : }
275 :
276 : void
277 158 : VideoReceiveThread::startSink()
278 : {
279 158 : JAMI_DBG("[%p] Starting sink", this);
280 :
281 158 : if (!loop_.isRunning())
282 65 : return;
283 :
284 93 : if (dstWidth_ > 0 and dstHeight_ > 0 and attach(sink_.get()))
285 46 : sink_->setFrameSize(dstWidth_, dstHeight_);
286 : }
287 :
288 : int
289 0 : VideoReceiveThread::getWidth() const
290 : {
291 0 : return dstWidth_;
292 : }
293 :
294 : int
295 0 : VideoReceiveThread::getHeight() const
296 : {
297 0 : return dstHeight_;
298 : }
299 :
300 : AVPixelFormat
301 0 : VideoReceiveThread::getPixelFormat() const
302 : {
303 0 : if (videoDecoder_)
304 0 : return videoDecoder_->getPixelFormat();
305 0 : return {};
306 : }
307 :
308 : MediaStream
309 439 : VideoReceiveThread::getInfo() const
310 : {
311 439 : if (videoDecoder_)
312 173 : return videoDecoder_->getStream("v:remote");
313 266 : return {};
314 : }
315 :
316 : void
317 188 : VideoReceiveThread::setRotation(int angle)
318 : {
319 188 : libav_utils::AVBufferPtr displayMatrix(av_buffer_alloc(sizeof(int32_t) * 9));
320 188 : av_display_rotation_set(reinterpret_cast<int32_t*>(displayMatrix->data), angle);
321 188 : std::lock_guard l(rotationMtx_);
322 188 : displayMatrix_ = std::move(displayMatrix);
323 188 : }
324 :
325 : } // namespace video
326 : } // namespace jami
|