Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 : #include "video_receive_thread.h"
20 : #include "media/media_decoder.h"
21 : #include "socket_pair.h"
22 : #include "manager.h"
23 : #include "client/videomanager.h"
24 : #include "sinkclient.h"
25 : #include "logger.h"
26 :
27 : extern "C" {
28 : #include <libavutil/display.h>
29 : }
30 :
31 : #include <unistd.h>
32 : #include <map>
33 :
34 : namespace jami {
35 : namespace video {
36 :
37 : using std::string;
38 :
39 145 : VideoReceiveThread::VideoReceiveThread(const std::string& id,
40 : bool useSink,
41 : const std::string& sdp,
42 145 : uint16_t mtu)
43 : : VideoGenerator::VideoGenerator()
44 145 : , args_()
45 145 : , id_(id)
46 145 : , useSink_(useSink)
47 145 : , stream_(sdp)
48 145 : , sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
49 145 : , sink_ {Manager::instance().createSinkClient(id)}
50 145 : , mtu_(mtu)
51 290 : , loop_(std::bind(&VideoReceiveThread::setup, this),
52 290 : std::bind(&VideoReceiveThread::decodeFrame, this),
53 580 : std::bind(&VideoReceiveThread::cleanup, this))
54 : {
55 145 : JAMI_DBG("[%p] Instance created", this);
56 145 : }
57 :
58 435 : VideoReceiveThread::~VideoReceiveThread()
59 : {
60 145 : loop_.join();
61 145 : JAMI_DBG("[%p] Instance destroyed", this);
62 290 : }
63 :
64 : void
65 145 : VideoReceiveThread::startLoop()
66 : {
67 145 : JAMI_DBG("[%p] Starting receiver’s loop", this);
68 145 : loop_.start();
69 145 : }
70 :
71 : void
72 294 : VideoReceiveThread::stopLoop()
73 : {
74 294 : if (loop_.isStopping())
75 163 : return;
76 131 : JAMI_DBG("[%p] Stopping receiver’s loop and waiting for the thread to exit…", this);
77 131 : loop_.stop();
78 131 : loop_.join();
79 131 : JAMI_DBG("[%p] Receiver’s thread exited", this);
80 : }
81 :
82 : // We do this setup here instead of the constructor because we don't want the
83 : // main thread to block while this executes, so it happens in the video thread.
84 : bool
85 145 : VideoReceiveThread::setup()
86 : {
87 145 : JAMI_DBG("[%p] Setting up video receiver", this);
88 :
89 290 : videoDecoder_.reset(new MediaDecoder([this](const std::shared_ptr<MediaFrame>& frame) mutable {
90 5021 : libav_utils::AVBufferPtr displayMatrix;
91 : {
92 5021 : std::lock_guard l(rotationMtx_);
93 5021 : if (displayMatrix_)
94 5021 : displayMatrix.reset(av_buffer_ref(displayMatrix_.get()));
95 5021 : }
96 5021 : if (displayMatrix)
97 5021 : av_frame_new_side_data_from_buf(frame->pointer(),
98 : AV_FRAME_DATA_DISPLAYMATRIX,
99 : displayMatrix.release());
100 5021 : publishFrame(std::static_pointer_cast<VideoFrame>(frame));
101 5166 : }));
102 145 : videoDecoder_->setContextCallback([this]() {
103 30 : if (recorderCallback_)
104 30 : recorderCallback_(getInfo());
105 30 : });
106 145 : videoDecoder_->setResolutionChangedCallback([this](int width, int height) {
107 0 : dstWidth_ = width;
108 0 : dstHeight_ = height;
109 0 : sink_->setFrameSize(dstWidth_, dstHeight_);
110 0 : });
111 :
112 145 : dstWidth_ = args_.width;
113 145 : dstHeight_ = args_.height;
114 :
115 145 : static const std::string SDP_FILENAME = "dummyFilename";
116 145 : if (args_.input.empty()) {
117 145 : args_.format = "sdp";
118 145 : args_.input = SDP_FILENAME;
119 0 : } else if (args_.input.substr(0, strlen("/dev/video")) == "/dev/video") {
120 : // it's a v4l device if starting with /dev/video
121 : // FIXME: This is not a robust way of checking if we mean to use a
122 : // v4l2 device
123 0 : args_.format = "video4linux2";
124 : }
125 :
126 145 : videoDecoder_->setInterruptCallback(interruptCb, this);
127 :
128 145 : if (args_.input == SDP_FILENAME) {
129 : // Force custom_io so the SDP demuxer will not open any UDP connections
130 : // We need it to use ICE transport.
131 145 : args_.sdp_flags = "custom_io";
132 :
133 145 : if (stream_.str().empty()) {
134 0 : JAMI_ERR("No SDP loaded");
135 0 : return false;
136 : }
137 :
138 145 : videoDecoder_->setIOContext(&sdpContext_);
139 : }
140 :
141 145 : if (videoDecoder_->openInput(args_)) {
142 0 : JAMI_ERR("Unable to open input \"%s\"", args_.input.c_str());
143 0 : return false;
144 : }
145 :
146 145 : if (args_.input == SDP_FILENAME) {
147 : // Now replace our custom AVIOContext with one that will read packets
148 145 : videoDecoder_->setIOContext(demuxContext_.get());
149 : }
150 145 : return true;
151 : }
152 :
153 : void
154 145 : VideoReceiveThread::cleanup()
155 : {
156 145 : JAMI_DBG("[%p] Stopping receiver", this);
157 :
158 145 : detach(sink_.get());
159 145 : sink_->stop();
160 :
161 145 : videoDecoder_.reset();
162 145 : }
163 :
164 : // This callback is used by libav internally to break out of blocking calls
165 : int
166 471 : VideoReceiveThread::interruptCb(void* data)
167 : {
168 471 : const auto context = static_cast<VideoReceiveThread*>(data);
169 471 : return not context->loop_.isRunning();
170 : }
171 :
172 : int
173 580 : VideoReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
174 : {
175 580 : std::istream& is = static_cast<VideoReceiveThread*>(opaque)->stream_;
176 580 : is.read(reinterpret_cast<char*>(buf), buf_size);
177 :
178 580 : auto count = is.gcount();
179 580 : if (count != 0)
180 145 : return count;
181 : else
182 435 : return AVERROR_EOF;
183 : }
184 :
185 : void
186 145 : VideoReceiveThread::addIOContext(SocketPair& socketPair)
187 : {
188 145 : demuxContext_.reset(socketPair.createIOContext(mtu_));
189 145 : }
190 :
191 : void
192 150 : VideoReceiveThread::setRecorderCallback(
193 : const std::function<void(const MediaStream& ms)>& cb)
194 : {
195 150 : recorderCallback_ = cb;
196 150 : if (videoDecoder_)
197 3 : videoDecoder_->setContextCallback([this]() {
198 1 : if (recorderCallback_)
199 1 : recorderCallback_(getInfo());
200 1 : });
201 150 : }
202 :
203 : void
204 5366 : VideoReceiveThread::decodeFrame()
205 : {
206 5366 : if (not loop_.isRunning())
207 0 : return;
208 :
209 5366 : if (not isVideoConfigured_) {
210 145 : if (!configureVideoOutput()) {
211 0 : JAMI_ERROR("[{:p}] Failed to configure video output", fmt::ptr(this));
212 0 : return;
213 : } else {
214 435 : JAMI_LOG("[{:p}] Decoder configured, starting decoding", fmt::ptr(this));
215 : }
216 : }
217 5366 : auto status = videoDecoder_->decode();
218 5366 : if (status == MediaDemuxer::Status::EndOfFile) {
219 354 : JAMI_LOG("[{:p}] End of file", fmt::ptr(this));
220 118 : loop_.stop();
221 : }
222 5248 : else if (status == MediaDemuxer::Status::ReadError) {
223 0 : JAMI_ERROR("[{:p}] Decoding error: %s", fmt::ptr(this), MediaDemuxer::getStatusStr(status));
224 : }
225 5248 : else if (status == MediaDemuxer::Status::FallBack) {
226 0 : if (keyFrameRequestCallback_)
227 0 : keyFrameRequestCallback_();
228 : }
229 : }
230 :
231 : bool
232 145 : VideoReceiveThread::configureVideoOutput()
233 : {
234 145 : assert(not isVideoConfigured_);
235 :
236 145 : JAMI_DBG("[%p] Configuring video output", this);
237 :
238 145 : if (not loop_.isRunning()) {
239 0 : JAMI_WARN("[%p] Unable to configure video output, the loop is not running!", this);
240 0 : return false;
241 : }
242 :
243 145 : if (videoDecoder_->setupVideo() < 0) {
244 0 : JAMI_ERR("Decoder IO startup failed");
245 0 : stopLoop();
246 0 : return false;
247 : }
248 :
249 : // Default size from input video
250 145 : if (dstWidth_ == 0 and dstHeight_ == 0) {
251 145 : dstWidth_ = videoDecoder_->getWidth();
252 145 : dstHeight_ = videoDecoder_->getHeight();
253 : }
254 :
255 145 : if (not sink_->start()) {
256 0 : JAMI_ERR("RX: sink startup failed");
257 0 : stopLoop();
258 0 : return false;
259 : }
260 :
261 145 : if (useSink_)
262 111 : startSink();
263 :
264 145 : if (onSuccessfulSetup_)
265 145 : onSuccessfulSetup_(MEDIA_VIDEO, 1);
266 :
267 145 : return isVideoConfigured_ = true;
268 : }
269 :
270 : void
271 352 : VideoReceiveThread::stopSink()
272 : {
273 352 : JAMI_DBG("[%p] Stopping sink", this);
274 :
275 352 : if (!loop_.isRunning())
276 298 : return;
277 :
278 54 : detach(sink_.get());
279 54 : sink_->setFrameSize(0, 0);
280 : }
281 :
282 : void
283 161 : VideoReceiveThread::startSink()
284 : {
285 161 : JAMI_DBG("[%p] Starting sink", this);
286 :
287 161 : if (!loop_.isRunning())
288 83 : return;
289 :
290 78 : if (dstWidth_ > 0 and dstHeight_ > 0 and attach(sink_.get()))
291 30 : sink_->setFrameSize(dstWidth_, dstHeight_);
292 : }
293 :
294 : int
295 0 : VideoReceiveThread::getWidth() const
296 : {
297 0 : return dstWidth_;
298 : }
299 :
300 : int
301 0 : VideoReceiveThread::getHeight() const
302 : {
303 0 : return dstHeight_;
304 : }
305 :
306 : AVPixelFormat
307 0 : VideoReceiveThread::getPixelFormat() const
308 : {
309 0 : if (videoDecoder_)
310 0 : return videoDecoder_->getPixelFormat();
311 0 : return {};
312 : }
313 :
314 : MediaStream
315 476 : VideoReceiveThread::getInfo() const
316 : {
317 476 : if (videoDecoder_)
318 187 : return videoDecoder_->getStream("v:remote");
319 289 : return {};
320 : }
321 :
322 : void
323 193 : VideoReceiveThread::setRotation(int angle)
324 : {
325 193 : libav_utils::AVBufferPtr displayMatrix(av_buffer_alloc(sizeof(int32_t) * 9));
326 193 : av_display_rotation_set(reinterpret_cast<int32_t*>(displayMatrix->data), angle);
327 193 : std::lock_guard l(rotationMtx_);
328 193 : displayMatrix_ = std::move(displayMatrix);
329 193 : }
330 :
331 : } // namespace video
332 : } // namespace jami
|