Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
5 : * Author: Guillaume Roguez <Guillaume.Roguez@savoirfairelinux.com>
6 : * Author: Philippe Gorley <philippe.gorley@savoirfairelinux.com>
7 : *
8 : * This program is free software; you can redistribute it and/or modify
9 : * it under the terms of the GNU General Public License as published by
10 : * the Free Software Foundation; either version 3 of the License, or
11 : * (at your option) any later version.
12 : *
13 : * This program is distributed in the hope that it will be useful,
14 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 : * GNU General Public License for more details.
17 : *
18 : * You should have received a copy of the GNU General Public License
19 : * along with this program; if not, write to the Free Software
20 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 : */
22 :
23 : #include "client/videomanager.h"
24 : #include "video_rtp_session.h"
25 : #include "video_sender.h"
26 : #include "video_receive_thread.h"
27 : #include "video_mixer.h"
28 : #include "socket_pair.h"
29 : #include "sip/sipvoiplink.h" // for enqueueKeyframeRequest
30 : #include "manager.h"
31 : #include "media_const.h"
32 : #ifdef ENABLE_PLUGIN
33 : #include "plugin/streamdata.h"
34 : #include "plugin/jamipluginmanager.h"
35 : #endif
36 : #include "logger.h"
37 : #include "string_utils.h"
38 : #include "call.h"
39 : #include "conference.h"
40 : #include "congestion_control.h"
41 :
42 : #include "account_const.h"
43 :
44 : #include <dhtnet/ice_socket.h>
45 :
46 : #include <asio/io_context.hpp>
47 : #include <sstream>
48 : #include <map>
49 : #include <string>
50 : #include <thread>
51 : #include <chrono>
52 :
53 : namespace jami {
54 : namespace video {
55 :
56 : using std::string;
57 :
58 : static constexpr unsigned MAX_REMB_DEC {1};
59 :
60 : constexpr auto DELAY_AFTER_RESTART = std::chrono::milliseconds(1000);
61 : constexpr auto EXPIRY_TIME_RTCP = std::chrono::seconds(2);
62 : constexpr auto DELAY_AFTER_REMB_INC = std::chrono::seconds(1);
63 : constexpr auto DELAY_AFTER_REMB_DEC = std::chrono::milliseconds(500);
64 :
65 324 : VideoRtpSession::VideoRtpSession(const string& callId,
66 : const string& streamId,
67 : const DeviceParams& localVideoParams,
68 324 : const std::shared_ptr<MediaRecorder>& rec)
69 : : RtpSession(callId, streamId, MediaType::MEDIA_VIDEO)
70 324 : , localVideoParams_(localVideoParams)
71 324 : , videoBitrateInfo_ {}
72 996 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
73 648 : , cc(std::make_unique<CongestionControl>())
74 : {
75 324 : recorder_ = rec;
76 324 : setupVideoBitrateInfo(); // reset bitrate
77 972 : JAMI_LOG("[{:p}] Video RTP session created for call {} (recorder {:p})", fmt::ptr(this), callId_, fmt::ptr(recorder_));
78 324 : }
79 :
80 324 : VideoRtpSession::~VideoRtpSession()
81 : {
82 324 : deinitRecorder();
83 324 : stop();
84 972 : JAMI_LOG("[{:p}] Video RTP session destroyed", fmt::ptr(this));
85 324 : }
86 :
87 : const VideoBitrateInfo&
88 89 : VideoRtpSession::getVideoBitrateInfo()
89 : {
90 89 : return videoBitrateInfo_;
91 : }
92 :
93 : /// Setup internal VideoBitrateInfo structure from media descriptors.
94 : ///
95 : void
96 147 : VideoRtpSession::updateMedia(const MediaDescription& send, const MediaDescription& receive)
97 : {
98 147 : BaseType::updateMedia(send, receive);
99 : // adjust send->codec bitrate info for higher video resolutions
100 147 : auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec);
101 147 : if (codecVideo) {
102 147 : auto const pixels = localVideoParams_.height * localVideoParams_.width;
103 147 : codecVideo->bitrate = std::max((unsigned int)(pixels * 0.001), SystemCodecInfo::DEFAULT_VIDEO_BITRATE);
104 147 : codecVideo->maxBitrate = std::max((unsigned int)(pixels * 0.0015), SystemCodecInfo::DEFAULT_MAX_BITRATE);
105 : }
106 147 : setupVideoBitrateInfo();
107 147 : }
108 :
109 : void
110 147 : VideoRtpSession::setRequestKeyFrameCallback(std::function<void(void)> cb)
111 : {
112 147 : cbKeyFrameRequest_ = std::move(cb);
113 147 : }
114 :
115 : void
116 169 : VideoRtpSession::startSender()
117 : {
118 169 : std::lock_guard lock(mutex_);
119 :
120 169 : JAMI_DBG("[%p] Start video RTP sender: input [%s] - muted [%s]",
121 : this,
122 : conference_ ? "Video Mixer" : input_.c_str(),
123 : send_.onHold ? "YES" : "NO");
124 :
125 169 : if (not socketPair_) {
126 : // Ignore if the transport is not set yet
127 0 : JAMI_WARN("[%p] Transport not set yet", this);
128 0 : return;
129 : }
130 :
131 169 : if (send_.enabled and not send_.onHold) {
132 164 : if (sender_) {
133 21 : if (videoLocal_)
134 20 : videoLocal_->detach(sender_.get());
135 21 : if (videoMixer_)
136 19 : videoMixer_->detach(sender_.get());
137 21 : JAMI_WARN("[%p] Restarting video sender", this);
138 : }
139 :
140 164 : if (not conference_) {
141 115 : videoLocal_ = getVideoInput(input_);
142 115 : if (videoLocal_) {
143 115 : videoLocal_->setRecorderCallback(
144 0 : [w=weak_from_this()](const MediaStream& ms) {
145 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
146 0 : if (auto shared = w.lock())
147 0 : shared->attachLocalRecorder(ms);
148 0 : });
149 0 : });
150 115 : auto newParams = videoLocal_->getParams();
151 : try {
152 115 : if (newParams.valid()
153 115 : && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
154 105 : localVideoParams_ = newParams.get();
155 : } else {
156 10 : JAMI_ERR("[%p] No valid new video parameters", this);
157 10 : return;
158 : }
159 0 : } catch (const std::exception& e) {
160 0 : JAMI_ERR("Exception during retrieving video parameters: %s", e.what());
161 0 : return;
162 0 : }
163 115 : } else {
164 0 : JAMI_WARN("Can't lock video input");
165 0 : return;
166 : }
167 :
168 : #if (defined(__ANDROID__) || (defined(TARGET_OS_IOS) && TARGET_OS_IOS))
169 : videoLocal_->setupSink(localVideoParams_.width, localVideoParams_.height);
170 : #endif
171 : }
172 :
173 : // be sure to not send any packets before saving last RTP seq value
174 154 : socketPair_->stopSendOp();
175 :
176 154 : auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec);
177 154 : auto autoQuality = codecVideo->isAutoQualityEnabled;
178 :
179 154 : send_.linkableHW = conference_ == nullptr;
180 154 : send_.bitrate = videoBitrateInfo_.videoBitrateCurrent;
181 : // NOTE:
182 : // Current implementation does not handle resolution change
183 : // (needed by window sharing feature) with HW codecs, so HW
184 : // codecs will be disabled for now.
185 154 : bool allowHwAccel = (localVideoParams_.format != "x11grab" && localVideoParams_.format != "dxgigrab" && localVideoParams_.format != "lavfi");
186 :
187 154 : if (socketPair_)
188 154 : initSeqVal_ = socketPair_->lastSeqValOut();
189 :
190 : try {
191 154 : sender_.reset();
192 154 : socketPair_->stopSendOp(false);
193 : MediaStream ms
194 154 : = !videoMixer_
195 : ? MediaStream("video sender",
196 : AV_PIX_FMT_YUV420P,
197 259 : 1 / static_cast<rational<int>>(localVideoParams_.framerate),
198 111 : localVideoParams_.width == 0 ? 1080u : localVideoParams_.width,
199 111 : localVideoParams_.height == 0 ? 720u : localVideoParams_.height,
200 105 : send_.bitrate,
201 : static_cast<rational<int>>(localVideoParams_.framerate))
202 567 : : videoMixer_->getStream("Video Sender");
203 154 : sender_.reset(new VideoSender(
204 308 : getRemoteRtpUri(), ms, send_, *socketPair_, initSeqVal_ + 1, mtu_, allowHwAccel));
205 154 : if (changeOrientationCallback_)
206 154 : sender_->setChangeOrientationCallback(changeOrientationCallback_);
207 154 : if (socketPair_)
208 154 : socketPair_->setPacketLossCallback([this]() { cbKeyFrameRequest_(); });
209 :
210 154 : } catch (const MediaEncoderException& e) {
211 0 : JAMI_ERR("%s", e.what());
212 0 : send_.enabled = false;
213 0 : }
214 154 : lastMediaRestart_ = clock::now();
215 154 : last_REMB_inc_ = clock::now();
216 154 : last_REMB_dec_ = clock::now();
217 154 : if (autoQuality and not rtcpCheckerThread_.isRunning())
218 133 : rtcpCheckerThread_.start();
219 21 : else if (not autoQuality and rtcpCheckerThread_.isRunning())
220 0 : rtcpCheckerThread_.join();
221 : // Block reads to received feedback packets
222 154 : if(socketPair_)
223 154 : socketPair_->setReadBlockingMode(true);
224 154 : }
225 169 : }
226 :
227 : void
228 23 : VideoRtpSession::restartSender()
229 : {
230 23 : std::lock_guard lock(mutex_);
231 :
232 : // ensure that start has been called before restart
233 23 : if (not socketPair_)
234 1 : return;
235 :
236 22 : startSender();
237 :
238 22 : if (conference_)
239 19 : setupConferenceVideoPipeline(*conference_, Direction::SEND);
240 : else
241 3 : setupVideoPipeline();
242 23 : }
243 :
244 : void
245 705 : VideoRtpSession::stopSender(bool forceStopSocket)
246 : {
247 : // Concurrency protection must be done by caller.
248 :
249 705 : JAMI_DBG("[%p] Stop video RTP sender: input [%s] - muted [%s]",
250 : this,
251 : conference_ ? "Video Mixer" : input_.c_str(),
252 : send_.onHold ? "YES" : "NO");
253 :
254 705 : if (sender_) {
255 133 : if (videoLocal_)
256 103 : videoLocal_->detach(sender_.get());
257 133 : if (videoMixer_)
258 4 : videoMixer_->detach(sender_.get());
259 133 : sender_.reset();
260 : }
261 :
262 705 : if (socketPair_) {
263 152 : bool const isReceivingVideo = receive_.enabled && !receive_.onHold;
264 152 : if(forceStopSocket || !isReceivingVideo) {
265 149 : socketPair_->stopSendOp();
266 149 : socketPair_->setReadBlockingMode(false);
267 : }
268 : }
269 705 : }
270 :
271 : void
272 148 : VideoRtpSession::startReceiver()
273 : {
274 : // Concurrency protection must be done by caller.
275 :
276 148 : JAMI_DBG("[%p] Starting receiver", this);
277 :
278 148 : if (receive_.enabled and not receive_.onHold) {
279 143 : if (receiveThread_)
280 13 : JAMI_WARN("[%p] Already has a receiver, restarting", this);
281 143 : receiveThread_.reset(
282 143 : new VideoReceiveThread(callId_, !conference_, receive_.receiving_sdp, mtu_));
283 :
284 : // ensure that start has been called
285 143 : if (not socketPair_)
286 0 : return;
287 :
288 : // XXX keyframe requests can timeout if unanswered
289 143 : receiveThread_->addIOContext(*socketPair_);
290 143 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
291 143 : receiveThread_->startLoop();
292 143 : receiveThread_->setRequestKeyFrameCallback([this]() { cbKeyFrameRequest_(); });
293 286 : receiveThread_->setRotation(rotation_.load());
294 143 : if (videoMixer_ and conference_) {
295 : // Note, this should be managed differently, this is a bit hacky
296 30 : auto audioId = streamId_;
297 30 : string_replace(audioId, "video", "audio");
298 30 : auto activeStream = videoMixer_->verifyActive(audioId);
299 30 : videoMixer_->removeAudioOnlySource(callId_, audioId);
300 30 : if (activeStream)
301 0 : videoMixer_->setActiveStream(streamId_);
302 30 : }
303 143 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
304 30 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
305 30 : if (auto shared = w.lock())
306 30 : shared->attachRemoteRecorder(ms);
307 30 : });
308 30 : });
309 143 : } else {
310 5 : JAMI_DBG("[%p] Video receiver disabled", this);
311 5 : if (receiveThread_ and videoMixer_ and conference_) {
312 : // Note, this should be managed differently, this is a bit hacky
313 0 : auto audioId_ = streamId_;
314 0 : string_replace(audioId_, "video", "audio");
315 0 : auto activeStream = videoMixer_->verifyActive(streamId_);
316 0 : videoMixer_->addAudioOnlySource(callId_, audioId_);
317 0 : receiveThread_->detach(videoMixer_.get());
318 0 : if (activeStream)
319 0 : videoMixer_->setActiveStream(audioId_);
320 0 : }
321 : }
322 148 : if (socketPair_)
323 148 : socketPair_->setReadBlockingMode(true);
324 : }
325 :
326 : void
327 704 : VideoRtpSession::stopReceiver(bool forceStopSocket)
328 : {
329 : // Concurrency protection must be done by caller.
330 :
331 704 : JAMI_DBG("[%p] Stopping receiver", this);
332 :
333 704 : if (not receiveThread_)
334 409 : return;
335 :
336 295 : if (videoMixer_) {
337 5 : auto activeStream = videoMixer_->verifyActive(streamId_);
338 5 : auto audioId = streamId_;
339 5 : string_replace(audioId, "video", "audio");
340 5 : videoMixer_->addAudioOnlySource(callId_, audioId);
341 5 : receiveThread_->detach(videoMixer_.get());
342 5 : if (activeStream)
343 0 : videoMixer_->setActiveStream(audioId);
344 5 : }
345 :
346 : // We need to disable the read operation, otherwise the
347 : // receiver thread will block since the peer stopped sending
348 : // RTP packets.
349 295 : bool const isSendingVideo = send_.enabled && !send_.onHold;
350 295 : if (socketPair_) {
351 148 : if (forceStopSocket || !isSendingVideo) {
352 145 : socketPair_->setReadBlockingMode(false);
353 145 : socketPair_->stopSendOp();
354 : }
355 : }
356 :
357 295 : auto ms = receiveThread_->getInfo();
358 295 : if (auto ob = recorder_->getStream(ms.name)) {
359 30 : receiveThread_->detach(ob);
360 30 : recorder_->removeStream(ms);
361 : }
362 :
363 295 : if(forceStopSocket || !isSendingVideo)
364 289 : receiveThread_->stopLoop();
365 295 : receiveThread_->stopSink();
366 295 : }
367 :
368 : void
369 152 : VideoRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
370 : {
371 152 : std::lock_guard lock(mutex_);
372 :
373 152 : if (not send_.enabled and not receive_.enabled) {
374 5 : stop();
375 5 : return;
376 : }
377 :
378 : try {
379 147 : if (rtp_sock and rtcp_sock) {
380 144 : if (send_.addr) {
381 144 : rtp_sock->setDefaultRemoteAddress(send_.addr);
382 : }
383 :
384 144 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
385 144 : if (rtcpAddr) {
386 144 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
387 : }
388 144 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
389 : } else {
390 3 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
391 : }
392 :
393 147 : last_REMB_inc_ = clock::now();
394 147 : last_REMB_dec_ = clock::now();
395 :
396 147 : socketPair_->setRtpDelayCallback(
397 5495 : [&](int gradient, int deltaT) { delayMonitor(gradient, deltaT); });
398 :
399 147 : if (send_.crypto and receive_.crypto) {
400 588 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
401 294 : receive_.crypto.getSrtpKeyInfo().c_str(),
402 294 : send_.crypto.getCryptoSuite().c_str(),
403 294 : send_.crypto.getSrtpKeyInfo().c_str());
404 : }
405 0 : } catch (const std::runtime_error& e) {
406 0 : JAMI_ERR("[%p] Socket creation failed: %s", this, e.what());
407 0 : return;
408 0 : }
409 :
410 147 : startSender();
411 147 : startReceiver();
412 :
413 147 : if (conference_) {
414 30 : if (send_.enabled and not send_.onHold) {
415 30 : setupConferenceVideoPipeline(*conference_, Direction::SEND);
416 : }
417 30 : if (receive_.enabled and not receive_.onHold) {
418 30 : setupConferenceVideoPipeline(*conference_, Direction::RECV);
419 : }
420 : } else {
421 117 : setupVideoPipeline();
422 : }
423 152 : }
424 :
425 : void
426 694 : VideoRtpSession::stop()
427 : {
428 694 : std::lock_guard lock(mutex_);
429 :
430 694 : stopSender(true);
431 694 : stopReceiver(true);
432 :
433 694 : if (socketPair_)
434 147 : socketPair_->interrupt();
435 :
436 694 : rtcpCheckerThread_.join();
437 :
438 : // reset default video quality if exist
439 694 : if (videoBitrateInfo_.videoQualityCurrent != SystemCodecInfo::DEFAULT_NO_QUALITY)
440 81 : videoBitrateInfo_.videoQualityCurrent = SystemCodecInfo::DEFAULT_CODEC_QUALITY;
441 :
442 694 : videoBitrateInfo_.videoBitrateCurrent = SystemCodecInfo::DEFAULT_VIDEO_BITRATE;
443 694 : storeVideoBitrateInfo();
444 :
445 694 : socketPair_.reset();
446 694 : videoLocal_.reset();
447 694 : }
448 :
449 : void
450 324 : VideoRtpSession::setMuted(bool mute, Direction dir)
451 : {
452 324 : std::lock_guard lock(mutex_);
453 :
454 : // Sender
455 324 : if (dir == Direction::SEND) {
456 157 : if (send_.onHold == mute) {
457 144 : JAMI_DBG("[%p] Local already %s", this, mute ? "muted" : "un-muted");
458 144 : return;
459 : }
460 :
461 13 : if ((send_.onHold = mute)) {
462 11 : if (videoLocal_) {
463 5 : auto ms = videoLocal_->getInfo();
464 5 : if (auto ob = recorder_->getStream(ms.name)) {
465 0 : videoLocal_->detach(ob);
466 0 : recorder_->removeStream(ms);
467 : }
468 5 : }
469 11 : stopSender();
470 : } else {
471 2 : restartSender();
472 : }
473 13 : return;
474 : }
475 :
476 : // Receiver
477 167 : if (receive_.onHold == mute) {
478 156 : JAMI_DBG("[%p] Remote already %s", this, mute ? "muted" : "un-muted");
479 156 : return;
480 : }
481 :
482 11 : if ((receive_.onHold = mute)) {
483 10 : if (receiveThread_) {
484 8 : auto ms = receiveThread_->getInfo();
485 8 : if (auto ob = recorder_->getStream(ms.name)) {
486 0 : receiveThread_->detach(ob);
487 0 : recorder_->removeStream(ms);
488 : }
489 8 : }
490 10 : stopReceiver();
491 : } else {
492 1 : startReceiver();
493 1 : if (conference_ and not receive_.onHold) {
494 0 : setupConferenceVideoPipeline(*conference_, Direction::RECV);
495 : }
496 : }
497 324 : }
498 :
499 : void
500 146 : VideoRtpSession::forceKeyFrame()
501 : {
502 146 : std::lock_guard lock(mutex_);
503 : #if __ANDROID__
504 : if (videoLocal_)
505 : emitSignal<libjami::VideoSignal::RequestKeyFrame>(videoLocal_->getName());
506 : #else
507 146 : if (sender_)
508 73 : sender_->forceKeyFrame();
509 : #endif
510 146 : }
511 :
512 : void
513 372 : VideoRtpSession::setRotation(int rotation)
514 : {
515 372 : rotation_.store(rotation);
516 372 : if (receiveThread_)
517 48 : receiveThread_->setRotation(rotation);
518 372 : }
519 :
520 : void
521 120 : VideoRtpSession::setupVideoPipeline()
522 : {
523 120 : if (sender_) {
524 105 : if (videoLocal_) {
525 105 : JAMI_DBG("[%p] Setup video pipeline on local capture device", this);
526 105 : videoLocal_->attach(sender_.get());
527 : }
528 : } else {
529 15 : videoLocal_.reset();
530 : }
531 120 : }
532 :
533 : void
534 98 : VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction dir)
535 : {
536 98 : if (dir == Direction::SEND) {
537 49 : JAMI_DBG("[%p] Setup video sender pipeline on conference %s for call %s",
538 : this,
539 : conference.getConfId().c_str(),
540 : callId_.c_str());
541 49 : videoMixer_ = conference.getVideoMixer();
542 49 : if (sender_) {
543 : // Swap sender from local video to conference video mixer
544 49 : if (videoLocal_)
545 18 : videoLocal_->detach(sender_.get());
546 49 : if (videoMixer_)
547 49 : videoMixer_->attach(sender_.get());
548 : } else {
549 0 : JAMI_WARN("[%p] no sender", this);
550 : }
551 : } else {
552 49 : JAMI_DBG("[%p] Setup video receiver pipeline on conference %s for call %s",
553 : this,
554 : conference.getConfId().c_str(),
555 : callId_.c_str());
556 49 : if (receiveThread_) {
557 49 : receiveThread_->stopSink();
558 49 : if (videoMixer_)
559 49 : videoMixer_->attachVideo(receiveThread_.get(), callId_, streamId_);
560 : } else {
561 0 : JAMI_WARN("[%p] no receiver", this);
562 : }
563 : }
564 98 : }
565 :
566 : void
567 68 : VideoRtpSession::enterConference(Conference& conference)
568 : {
569 68 : std::lock_guard lock(mutex_);
570 :
571 68 : exitConference();
572 :
573 68 : conference_ = &conference;
574 68 : videoMixer_ = conference.getVideoMixer();
575 68 : JAMI_DBG("[%p] enterConference (conf: %s)", this, conference.getConfId().c_str());
576 :
577 68 : if (send_.enabled or receiveThread_) {
578 : // Restart encoder with conference parameter ON in order to unlink HW encoder
579 : // from HW decoder.
580 19 : restartSender();
581 19 : if (conference_) {
582 19 : setupConferenceVideoPipeline(conference, Direction::RECV);
583 : }
584 : }
585 68 : }
586 :
587 : void
588 135 : VideoRtpSession::exitConference()
589 : {
590 135 : std::lock_guard lock(mutex_);
591 :
592 135 : if (!conference_)
593 67 : return;
594 :
595 68 : JAMI_DBG("[%p] exitConference (conf: %s)", this, conference_->getConfId().c_str());
596 :
597 68 : if (videoMixer_) {
598 68 : if (sender_)
599 45 : videoMixer_->detach(sender_.get());
600 :
601 68 : if (receiveThread_) {
602 48 : auto activeStream = videoMixer_->verifyActive(streamId_);
603 48 : videoMixer_->detachVideo(receiveThread_.get());
604 48 : receiveThread_->startSink();
605 48 : if (activeStream)
606 0 : videoMixer_->setActiveStream(streamId_);
607 : }
608 :
609 68 : videoMixer_.reset();
610 : }
611 :
612 68 : conference_ = nullptr;
613 135 : }
614 :
615 : bool
616 406 : VideoRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
617 : {
618 406 : auto rtcpInfoVect = socketPair_->getRtcpRR();
619 406 : unsigned totalLost = 0;
620 406 : unsigned totalJitter = 0;
621 406 : unsigned nbDropNotNull = 0;
622 406 : auto vectSize = rtcpInfoVect.size();
623 :
624 406 : if (vectSize != 0) {
625 20 : for (const auto& it : rtcpInfoVect) {
626 10 : if (it.fraction_lost != 0) // Exclude null drop
627 0 : nbDropNotNull++;
628 10 : totalLost += it.fraction_lost;
629 10 : totalJitter += ntohl(it.jitter);
630 : }
631 10 : rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
632 : // Jitter is expressed in timestamp unit -> convert to milliseconds
633 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
634 10 : rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
635 10 : rtcpi.nb_sample = vectSize;
636 10 : rtcpi.latency = socketPair_->getLastLatency();
637 10 : return true;
638 : }
639 396 : return false;
640 406 : }
641 :
642 : bool
643 406 : VideoRtpSession::check_RCTP_Info_REMB(uint64_t* br)
644 : {
645 406 : auto rtcpInfoVect = socketPair_->getRtcpREMB();
646 :
647 406 : if (!rtcpInfoVect.empty()) {
648 173 : auto pkt = rtcpInfoVect.back();
649 173 : auto temp = cc->parseREMB(pkt);
650 173 : *br = (temp >> 10) | ((temp << 6) & 0xff00) | ((temp << 16) & 0x30000);
651 173 : return true;
652 : }
653 233 : return false;
654 406 : }
655 :
656 : void
657 406 : VideoRtpSession::adaptQualityAndBitrate()
658 : {
659 406 : setupVideoBitrateInfo();
660 :
661 : uint64_t br;
662 406 : if (check_RCTP_Info_REMB(&br)) {
663 173 : delayProcessing(br);
664 : }
665 :
666 406 : RTCPInfo rtcpi {};
667 406 : if (check_RCTP_Info_RR(rtcpi)) {
668 10 : dropProcessing(&rtcpi);
669 : }
670 406 : }
671 :
672 : void
673 10 : VideoRtpSession::dropProcessing(RTCPInfo* rtcpi)
674 : {
675 : // If bitrate has changed, let time to receive fresh RTCP packets
676 10 : auto now = clock::now();
677 10 : auto restartTimer = now - lastMediaRestart_;
678 10 : if (restartTimer < DELAY_AFTER_RESTART) {
679 0 : return;
680 : }
681 : #ifndef __ANDROID__
682 : // Do nothing if jitter is more than 1 second
683 10 : if (rtcpi->jitter > 1000) {
684 0 : return;
685 : }
686 : #endif
687 10 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
688 10 : auto oldBitrate = videoBitrateInfo_.videoBitrateCurrent;
689 10 : int newBitrate = oldBitrate;
690 :
691 : // Fill histoLoss and histoJitter_ with samples
692 10 : if (restartTimer < DELAY_AFTER_RESTART + std::chrono::seconds(1)) {
693 0 : return;
694 : } else {
695 : // If ponderate drops are inferior to 10% that mean drop are not from congestion but from
696 : // network...
697 : // ... we can increase
698 10 : if (pondLoss >= 5.0f && rtcpi->packetLoss > 0.0f) {
699 0 : newBitrate *= 1.0f - rtcpi->packetLoss / 150.0f;
700 0 : histoLoss_.clear();
701 0 : lastMediaRestart_ = now;
702 0 : JAMI_DBG(
703 : "[BandwidthAdapt] Detected transmission bandwidth overuse, decrease bitrate from "
704 : "%u Kbps to %d Kbps, ratio %f (ponderate loss: %f%%, packet loss rate: %f%%)",
705 : oldBitrate,
706 : newBitrate,
707 : (float) newBitrate / oldBitrate,
708 : pondLoss,
709 : rtcpi->packetLoss);
710 : }
711 : }
712 :
713 10 : setNewBitrate(newBitrate);
714 : }
715 :
716 : void
717 173 : VideoRtpSession::delayProcessing(int br)
718 : {
719 173 : int newBitrate = videoBitrateInfo_.videoBitrateCurrent;
720 173 : if (br == 0x6803)
721 0 : newBitrate *= 0.85f;
722 173 : else if (br == 0x7378) {
723 173 : auto now = clock::now();
724 173 : auto msSinceLastDecrease = std::chrono::duration_cast<std::chrono::milliseconds>(
725 173 : now - lastBitrateDecrease);
726 173 : auto increaseCoefficient = std::min(msSinceLastDecrease.count() / 600000.0f + 1.0f, 1.05f);
727 173 : newBitrate *= increaseCoefficient;
728 : } else
729 0 : return;
730 :
731 173 : setNewBitrate(newBitrate);
732 : }
733 :
734 : void
735 183 : VideoRtpSession::setNewBitrate(unsigned int newBR)
736 : {
737 183 : newBR = std::max(newBR, videoBitrateInfo_.videoBitrateMin);
738 183 : newBR = std::min(newBR, videoBitrateInfo_.videoBitrateMax);
739 :
740 183 : if (newBR < videoBitrateInfo_.videoBitrateCurrent)
741 0 : lastBitrateDecrease = clock::now();
742 :
743 183 : if (videoBitrateInfo_.videoBitrateCurrent != newBR) {
744 36 : videoBitrateInfo_.videoBitrateCurrent = newBR;
745 36 : storeVideoBitrateInfo();
746 :
747 : #if __ANDROID__
748 : if (auto input_device = std::dynamic_pointer_cast<VideoInput>(videoLocal_))
749 : emitSignal<libjami::VideoSignal::SetBitrate>(input_device->getConfig().name, (int) newBR);
750 : #endif
751 :
752 36 : if (sender_) {
753 36 : auto ret = sender_->setBitrate(newBR);
754 36 : if (ret == -1)
755 0 : JAMI_ERR("Fail to access the encoder");
756 36 : else if (ret == 0)
757 0 : restartSender();
758 : } else {
759 0 : JAMI_ERR("Fail to access the sender");
760 : }
761 : }
762 183 : }
763 :
764 : void
765 877 : VideoRtpSession::setupVideoBitrateInfo()
766 : {
767 877 : auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec);
768 877 : if (codecVideo) {
769 553 : videoBitrateInfo_ = {
770 553 : codecVideo->bitrate,
771 553 : codecVideo->minBitrate,
772 553 : codecVideo->maxBitrate,
773 553 : codecVideo->quality,
774 553 : codecVideo->minQuality,
775 553 : codecVideo->maxQuality,
776 553 : videoBitrateInfo_.cptBitrateChecking,
777 553 : videoBitrateInfo_.maxBitrateChecking,
778 553 : videoBitrateInfo_.packetLostThreshold,
779 : };
780 : } else {
781 : videoBitrateInfo_
782 324 : = {0, 0, 0, 0, 0, 0, 0, MAX_ADAPTATIVE_BITRATE_ITERATION, PACKET_LOSS_THRESHOLD};
783 : }
784 877 : }
785 :
786 : void
787 730 : VideoRtpSession::storeVideoBitrateInfo()
788 : {
789 730 : if (auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec)) {
790 464 : codecVideo->bitrate = videoBitrateInfo_.videoBitrateCurrent;
791 464 : codecVideo->quality = videoBitrateInfo_.videoQualityCurrent;
792 730 : }
793 730 : }
794 :
795 : void
796 406 : VideoRtpSession::processRtcpChecker()
797 : {
798 406 : adaptQualityAndBitrate();
799 406 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
800 406 : }
801 :
802 : void
803 31 : VideoRtpSession::attachRemoteRecorder(const MediaStream& ms)
804 : {
805 31 : std::lock_guard lock(mutex_);
806 31 : if (!recorder_ || !receiveThread_)
807 0 : return;
808 31 : if (auto ob = recorder_->addStream(ms)) {
809 31 : receiveThread_->attach(ob);
810 : }
811 31 : }
812 :
813 : void
814 0 : VideoRtpSession::attachLocalRecorder(const MediaStream& ms)
815 : {
816 0 : std::lock_guard lock(mutex_);
817 0 : if (!recorder_ || !videoLocal_ || !Manager::instance().videoPreferences.getRecordPreview())
818 0 : return;
819 0 : if (auto ob = recorder_->addStream(ms)) {
820 0 : videoLocal_->attach(ob);
821 : }
822 0 : }
823 :
824 : void
825 7 : VideoRtpSession::initRecorder()
826 : {
827 7 : if (!recorder_)
828 0 : return;
829 7 : if (receiveThread_) {
830 5 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
831 1 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
832 1 : if (auto shared = w.lock())
833 1 : shared->attachRemoteRecorder(ms);
834 1 : });
835 1 : });
836 : }
837 7 : if (videoLocal_ && !send_.onHold) {
838 3 : videoLocal_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
839 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
840 0 : if (auto shared = w.lock())
841 0 : shared->attachLocalRecorder(ms);
842 0 : });
843 0 : });
844 : }
845 : }
846 :
847 : void
848 329 : VideoRtpSession::deinitRecorder()
849 : {
850 329 : if (!recorder_)
851 0 : return;
852 329 : if (receiveThread_) {
853 133 : auto ms = receiveThread_->getInfo();
854 133 : if (auto ob = recorder_->getStream(ms.name)) {
855 0 : receiveThread_->detach(ob);
856 0 : recorder_->removeStream(ms);
857 : }
858 133 : }
859 329 : if (videoLocal_) {
860 3 : auto ms = videoLocal_->getInfo();
861 3 : if (auto ob = recorder_->getStream(ms.name)) {
862 0 : videoLocal_->detach(ob);
863 0 : recorder_->removeStream(ms);
864 : }
865 3 : }
866 : }
867 :
868 : void
869 147 : VideoRtpSession::setChangeOrientationCallback(std::function<void(int)> cb)
870 : {
871 147 : changeOrientationCallback_ = std::move(cb);
872 147 : if (sender_)
873 9 : sender_->setChangeOrientationCallback(changeOrientationCallback_);
874 147 : }
875 :
876 : float
877 10 : VideoRtpSession::getPonderateLoss(float lastLoss)
878 : {
879 10 : float pond = 0.0f, pondLoss = 0.0f, totalPond = 0.0f;
880 10 : constexpr float coefficient_a = -1 / 100.0f;
881 10 : constexpr float coefficient_b = 100.0f;
882 :
883 10 : auto now = clock::now();
884 :
885 10 : histoLoss_.emplace_back(now, lastLoss);
886 :
887 20 : for (auto it = histoLoss_.begin(); it != histoLoss_.end();) {
888 10 : auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->first);
889 :
890 : // 1ms -> 100%
891 : // 2000ms -> 80%
892 10 : if (delay <= EXPIRY_TIME_RTCP) {
893 10 : if (it->second == 0.0f)
894 10 : pond = 20.0f; // Reduce weight of null drop
895 : else
896 0 : pond = std::min(delay.count() * coefficient_a + coefficient_b, 100.0f);
897 10 : totalPond += pond;
898 10 : pondLoss += it->second * pond;
899 10 : ++it;
900 : } else
901 0 : it = histoLoss_.erase(it);
902 : }
903 10 : if (totalPond == 0)
904 0 : return 0.0f;
905 :
906 10 : return pondLoss / totalPond;
907 : }
908 :
909 : void
910 5495 : VideoRtpSession::delayMonitor(int gradient, int deltaT)
911 : {
912 5495 : float estimation = cc->kalmanFilter(gradient);
913 5495 : float thresh = cc->get_thresh();
914 :
915 5495 : cc->update_thresh(estimation, deltaT);
916 :
917 5495 : BandwidthUsage bwState = cc->get_bw_state(estimation, thresh);
918 5495 : auto now = clock::now();
919 :
920 5495 : if (bwState == BandwidthUsage::bwOverusing) {
921 0 : auto remb_timer_dec = now - last_REMB_dec_;
922 0 : if ((not remb_dec_cnt_) or (remb_timer_dec > DELAY_AFTER_REMB_DEC)) {
923 0 : last_REMB_dec_ = now;
924 0 : remb_dec_cnt_ = 0;
925 : }
926 :
927 : // Limit REMB decrease to MAX_REMB_DEC every DELAY_AFTER_REMB_DEC ms
928 0 : if (remb_dec_cnt_ < MAX_REMB_DEC && remb_timer_dec < DELAY_AFTER_REMB_DEC) {
929 0 : remb_dec_cnt_++;
930 0 : JAMI_WARN("[BandwidthAdapt] Detected reception bandwidth overuse");
931 0 : uint8_t* buf = nullptr;
932 0 : uint64_t br = 0x6803; // Decrease 3
933 0 : auto v = cc->createREMB(br);
934 0 : buf = &v[0];
935 0 : socketPair_->writeData(buf, v.size());
936 0 : last_REMB_inc_ = clock::now();
937 0 : }
938 5495 : } else if (bwState == BandwidthUsage::bwNormal) {
939 4160 : auto remb_timer_inc = now - last_REMB_inc_;
940 4160 : if (remb_timer_inc > DELAY_AFTER_REMB_INC) {
941 173 : uint8_t* buf = nullptr;
942 173 : uint64_t br = 0x7378; // INcrease
943 173 : auto v = cc->createREMB(br);
944 173 : buf = &v[0];
945 173 : socketPair_->writeData(buf, v.size());
946 173 : last_REMB_inc_ = clock::now();
947 173 : }
948 : }
949 5495 : }
950 : } // namespace video
951 : } // namespace jami
|