Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "client/videomanager.h"
19 : #include "video_rtp_session.h"
20 : #include "video_sender.h"
21 : #include "video_receive_thread.h"
22 : #include "video_mixer.h"
23 : #include "socket_pair.h"
24 : #include "sip/sipvoiplink.h" // for enqueueKeyframeRequest
25 : #include "manager.h"
26 : #include "media_const.h"
27 : #ifdef ENABLE_PLUGIN
28 : #include "plugin/streamdata.h"
29 : #include "plugin/jamipluginmanager.h"
30 : #endif
31 : #include "logger.h"
32 : #include "string_utils.h"
33 : #include "call.h"
34 : #include "conference.h"
35 : #include "congestion_control.h"
36 :
37 : #include "account_const.h"
38 :
39 : #include <dhtnet/ice_socket.h>
40 : #include <asio/post.hpp>
41 : #include <asio/io_context.hpp>
42 :
43 : #include <sstream>
44 : #include <map>
45 : #include <string>
46 : #include <thread>
47 : #include <chrono>
48 :
49 : namespace jami {
50 : namespace video {
51 :
52 : using std::string;
53 :
54 : static constexpr unsigned MAX_REMB_DEC {1};
55 :
56 : constexpr auto DELAY_AFTER_RESTART = std::chrono::milliseconds(1000);
57 : constexpr auto EXPIRY_TIME_RTCP = std::chrono::seconds(2);
58 : constexpr auto DELAY_AFTER_REMB_INC = std::chrono::seconds(1);
59 : constexpr auto DELAY_AFTER_REMB_DEC = std::chrono::milliseconds(500);
60 :
61 315 : VideoRtpSession::VideoRtpSession(const string& callId,
62 : const string& streamId,
63 : const DeviceParams& localVideoParams,
64 315 : const std::shared_ptr<MediaRecorder>& rec)
65 : : RtpSession(callId, streamId, MediaType::MEDIA_VIDEO)
66 315 : , localVideoParams_(localVideoParams)
67 315 : , videoBitrateInfo_ {}
68 997 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
69 630 : , cc(std::make_unique<CongestionControl>())
70 : {
71 315 : recorder_ = rec;
72 315 : setupVideoBitrateInfo(); // reset bitrate
73 945 : JAMI_LOG("[{:p}] Video RTP session created for call {} (recorder {:p})", fmt::ptr(this), callId_, fmt::ptr(recorder_));
74 315 : }
75 :
76 315 : VideoRtpSession::~VideoRtpSession()
77 : {
78 315 : deinitRecorder();
79 315 : stop();
80 945 : JAMI_LOG("[{:p}] Video RTP session destroyed", fmt::ptr(this));
81 315 : }
82 :
83 : const VideoBitrateInfo&
84 88 : VideoRtpSession::getVideoBitrateInfo()
85 : {
86 88 : return videoBitrateInfo_;
87 : }
88 :
89 : /// Setup internal VideoBitrateInfo structure from media descriptors.
90 : ///
91 : void
92 156 : VideoRtpSession::updateMedia(const MediaDescription& send, const MediaDescription& receive)
93 : {
94 156 : BaseType::updateMedia(send, receive);
95 : // adjust send->codec bitrate info for higher video resolutions
96 156 : auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec);
97 156 : if (codecVideo) {
98 156 : auto const pixels = localVideoParams_.height * localVideoParams_.width;
99 156 : codecVideo->bitrate = std::max((unsigned int)(pixels * 0.001), SystemCodecInfo::DEFAULT_VIDEO_BITRATE);
100 156 : codecVideo->maxBitrate = std::max((unsigned int)(pixels * 0.0015), SystemCodecInfo::DEFAULT_MAX_BITRATE);
101 : }
102 156 : setupVideoBitrateInfo();
103 156 : }
104 :
105 : void
106 156 : VideoRtpSession::setRequestKeyFrameCallback(std::function<void(void)> cb)
107 : {
108 156 : cbKeyFrameRequest_ = std::move(cb);
109 156 : }
110 :
111 : void
112 177 : VideoRtpSession::startSender()
113 : {
114 177 : std::lock_guard lock(mutex_);
115 :
116 177 : JAMI_DBG("[%p] Start video RTP sender: input [%s] - muted [%s]",
117 : this,
118 : conference_ ? "Video Mixer" : input_.c_str(),
119 : send_.onHold ? "YES" : "NO");
120 :
121 177 : if (not socketPair_) {
122 : // Ignore if the transport is not set yet
123 0 : JAMI_WARN("[%p] Transport not set yet", this);
124 0 : return;
125 : }
126 :
127 177 : if (send_.enabled and not send_.onHold) {
128 165 : if (sender_) {
129 20 : if (videoLocal_)
130 19 : videoLocal_->detach(sender_.get());
131 20 : if (videoMixer_)
132 18 : videoMixer_->detach(sender_.get());
133 20 : JAMI_WARN("[%p] Restarting video sender", this);
134 : }
135 :
136 165 : if (not conference_) {
137 116 : videoLocal_ = getVideoInput(input_);
138 116 : if (videoLocal_) {
139 116 : videoLocal_->setRecorderCallback(
140 0 : [w=weak_from_this()](const MediaStream& ms) {
141 0 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
142 0 : if (auto shared = w.lock())
143 0 : shared->attachLocalRecorder(ms);
144 0 : });
145 0 : });
146 116 : auto newParams = videoLocal_->getParams();
147 : try {
148 116 : if (newParams.valid()
149 116 : && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
150 106 : localVideoParams_ = newParams.get();
151 : } else {
152 10 : JAMI_ERR("[%p] No valid new video parameters", this);
153 10 : return;
154 : }
155 0 : } catch (const std::exception& e) {
156 0 : JAMI_ERR("Exception during retrieving video parameters: %s", e.what());
157 0 : return;
158 0 : }
159 116 : } else {
160 0 : JAMI_WARN("Unable to lock video input");
161 0 : return;
162 : }
163 :
164 : #if (defined(__ANDROID__) || (defined(TARGET_OS_IOS) && TARGET_OS_IOS))
165 : videoLocal_->setupSink(localVideoParams_.width, localVideoParams_.height);
166 : #endif
167 : }
168 :
169 : // be sure to not send any packets before saving last RTP seq value
170 155 : socketPair_->stopSendOp();
171 :
172 155 : auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec);
173 155 : auto autoQuality = codecVideo->isAutoQualityEnabled;
174 :
175 155 : send_.linkableHW = conference_ == nullptr;
176 155 : send_.bitrate = videoBitrateInfo_.videoBitrateCurrent;
177 : // NOTE:
178 : // Current implementation does not handle resolution change
179 : // (needed by window sharing feature) with HW codecs, so HW
180 : // codecs will be disabled for now.
181 155 : bool allowHwAccel = (localVideoParams_.format != "x11grab" && localVideoParams_.format != "dxgigrab" && localVideoParams_.format != "lavfi");
182 :
183 155 : if (socketPair_)
184 155 : initSeqVal_ = socketPair_->lastSeqValOut();
185 :
186 : try {
187 155 : sender_.reset();
188 155 : socketPair_->stopSendOp(false);
189 : MediaStream ms
190 155 : = !videoMixer_
191 : ? MediaStream("video sender",
192 : AV_PIX_FMT_YUV420P,
193 261 : 1 / static_cast<rational<int>>(localVideoParams_.framerate),
194 113 : localVideoParams_.width == 0 ? 1080u : localVideoParams_.width,
195 113 : localVideoParams_.height == 0 ? 720u : localVideoParams_.height,
196 106 : send_.bitrate,
197 : static_cast<rational<int>>(localVideoParams_.framerate))
198 571 : : videoMixer_->getStream("Video Sender");
199 155 : sender_.reset(new VideoSender(
200 310 : getRemoteRtpUri(), ms, send_, *socketPair_, initSeqVal_ + 1, mtu_, allowHwAccel));
201 155 : if (changeOrientationCallback_)
202 155 : sender_->setChangeOrientationCallback(changeOrientationCallback_);
203 155 : if (socketPair_)
204 155 : socketPair_->setPacketLossCallback([this]() { cbKeyFrameRequest_(); });
205 :
206 155 : } catch (const MediaEncoderException& e) {
207 0 : JAMI_ERR("%s", e.what());
208 0 : send_.enabled = false;
209 0 : }
210 155 : lastMediaRestart_ = clock::now();
211 155 : last_REMB_inc_ = clock::now();
212 155 : last_REMB_dec_ = clock::now();
213 155 : if (autoQuality and not rtcpCheckerThread_.isRunning())
214 135 : rtcpCheckerThread_.start();
215 20 : else if (not autoQuality and rtcpCheckerThread_.isRunning())
216 0 : rtcpCheckerThread_.join();
217 : // Block reads to received feedback packets
218 155 : if(socketPair_)
219 155 : socketPair_->setReadBlockingMode(true);
220 155 : }
221 177 : }
222 :
223 : void
224 21 : VideoRtpSession::restartSender()
225 : {
226 21 : std::lock_guard lock(mutex_);
227 :
228 : // ensure that start has been called before restart
229 21 : if (not socketPair_)
230 0 : return;
231 :
232 21 : startSender();
233 :
234 21 : if (conference_)
235 19 : setupConferenceVideoPipeline(*conference_, Direction::SEND);
236 : else
237 2 : setupVideoPipeline();
238 21 : }
239 :
240 : void
241 699 : VideoRtpSession::stopSender(bool forceStopSocket)
242 : {
243 : // Concurrency protection must be done by caller.
244 :
245 699 : JAMI_DBG("[%p] Stop video RTP sender: input [%s] - muted [%s]",
246 : this,
247 : conference_ ? "Video Mixer" : input_.c_str(),
248 : send_.onHold ? "YES" : "NO");
249 :
250 699 : if (sender_) {
251 135 : if (videoLocal_)
252 104 : videoLocal_->detach(sender_.get());
253 135 : if (videoMixer_)
254 4 : videoMixer_->detach(sender_.get());
255 135 : sender_.reset();
256 : }
257 :
258 699 : if (socketPair_) {
259 163 : bool const isReceivingVideo = receive_.enabled && !receive_.onHold;
260 163 : if(forceStopSocket || !isReceivingVideo) {
261 160 : socketPair_->stopSendOp();
262 160 : socketPair_->setReadBlockingMode(false);
263 : }
264 : }
265 699 : }
266 :
267 : void
268 156 : VideoRtpSession::startReceiver()
269 : {
270 : // Concurrency protection must be done by caller.
271 :
272 156 : JAMI_DBG("[%p] Starting receiver", this);
273 :
274 156 : if (receive_.enabled and not receive_.onHold) {
275 146 : if (receiveThread_)
276 15 : JAMI_WARN("[%p] Already has a receiver, restarting", this);
277 146 : receiveThread_.reset(
278 146 : new VideoReceiveThread(callId_, !conference_, receive_.receiving_sdp, mtu_));
279 :
280 : // ensure that start has been called
281 146 : if (not socketPair_)
282 0 : return;
283 :
284 : // XXX keyframe requests can timeout if unanswered
285 146 : receiveThread_->addIOContext(*socketPair_);
286 146 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
287 146 : receiveThread_->startLoop();
288 146 : receiveThread_->setRequestKeyFrameCallback([this]() { cbKeyFrameRequest_(); });
289 292 : receiveThread_->setRotation(rotation_.load());
290 146 : if (videoMixer_ and conference_) {
291 : // Note, this should be managed differently, this is a bit hacky
292 31 : auto audioId = streamId_;
293 31 : string_replace(audioId, "video", "audio");
294 31 : auto activeStream = videoMixer_->verifyActive(audioId);
295 31 : videoMixer_->removeAudioOnlySource(callId_, audioId);
296 31 : if (activeStream)
297 0 : videoMixer_->setActiveStream(streamId_);
298 31 : }
299 146 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
300 31 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
301 31 : if (auto shared = w.lock())
302 31 : shared->attachRemoteRecorder(ms);
303 31 : });
304 31 : });
305 146 : } else {
306 10 : JAMI_DBG("[%p] Video receiver disabled", this);
307 10 : if (videoMixer_ and conference_) {
308 : // Note, this should be managed differently, this is a bit hacky
309 0 : auto audioId_ = streamId_;
310 0 : string_replace(audioId_, "video", "audio");
311 0 : if (receiveThread_) {
312 0 : auto activeStream = videoMixer_->verifyActive(streamId_);
313 0 : videoMixer_->addAudioOnlySource(callId_, audioId_);
314 0 : receiveThread_->detach(videoMixer_.get());
315 0 : if (activeStream)
316 0 : videoMixer_->setActiveStream(audioId_);
317 : } else {
318 : // Add audio-only source when video is disabled or muted.
319 : // Called after ICE negotiation, when peers can properly create video sinks.
320 0 : if (not receive_.enabled or receive_.onHold) {
321 0 : videoMixer_->addAudioOnlySource(callId_, audioId_);
322 : }
323 : }
324 0 : }
325 : }
326 156 : if (socketPair_)
327 156 : socketPair_->setReadBlockingMode(true);
328 : }
329 :
330 : void
331 688 : VideoRtpSession::stopReceiver(bool forceStopSocket)
332 : {
333 : // Concurrency protection must be done by caller.
334 :
335 688 : JAMI_DBG("[%p] Stopping receiver", this);
336 :
337 688 : if (not receiveThread_)
338 389 : return;
339 :
340 299 : if (videoMixer_) {
341 4 : auto activeStream = videoMixer_->verifyActive(streamId_);
342 4 : auto audioId = streamId_;
343 4 : string_replace(audioId, "video", "audio");
344 4 : videoMixer_->addAudioOnlySource(callId_, audioId);
345 4 : receiveThread_->detach(videoMixer_.get());
346 4 : if (activeStream)
347 0 : videoMixer_->setActiveStream(audioId);
348 4 : }
349 :
350 : // We need to disable the read operation, otherwise the
351 : // receiver thread will block since the peer stopped sending
352 : // RTP packets.
353 299 : bool const isSendingVideo = send_.enabled && !send_.onHold;
354 299 : if (socketPair_) {
355 152 : if (forceStopSocket || !isSendingVideo) {
356 152 : socketPair_->setReadBlockingMode(false);
357 152 : socketPair_->stopSendOp();
358 : }
359 : }
360 :
361 299 : auto ms = receiveThread_->getInfo();
362 299 : if (auto ob = recorder_->getStream(ms.name)) {
363 31 : receiveThread_->detach(ob);
364 31 : recorder_->removeStream(ms);
365 : }
366 :
367 299 : if(forceStopSocket || !isSendingVideo)
368 299 : receiveThread_->stopLoop();
369 299 : receiveThread_->stopSink();
370 299 : }
371 :
372 : void
373 160 : VideoRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
374 : {
375 160 : std::lock_guard lock(mutex_);
376 :
377 160 : if (not send_.enabled and not receive_.enabled) {
378 4 : stop();
379 4 : return;
380 : }
381 :
382 : try {
383 156 : if (rtp_sock and rtcp_sock) {
384 156 : if (send_.addr) {
385 156 : rtp_sock->setDefaultRemoteAddress(send_.addr);
386 : }
387 :
388 156 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
389 156 : if (rtcpAddr) {
390 156 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
391 : }
392 156 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
393 : } else {
394 0 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
395 : }
396 :
397 156 : last_REMB_inc_ = clock::now();
398 156 : last_REMB_dec_ = clock::now();
399 :
400 156 : socketPair_->setRtpDelayCallback(
401 5670 : [&](int gradient, int deltaT) { delayMonitor(gradient, deltaT); });
402 :
403 156 : if (send_.crypto and receive_.crypto) {
404 624 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
405 312 : receive_.crypto.getSrtpKeyInfo().c_str(),
406 312 : send_.crypto.getCryptoSuite().c_str(),
407 312 : send_.crypto.getSrtpKeyInfo().c_str());
408 : }
409 0 : } catch (const std::runtime_error& e) {
410 0 : JAMI_ERR("[%p] Socket creation failed: %s", this, e.what());
411 0 : return;
412 0 : }
413 :
414 156 : startReceiver();
415 156 : startSender();
416 :
417 156 : if (conference_) {
418 31 : if (send_.enabled and not send_.onHold) {
419 31 : setupConferenceVideoPipeline(*conference_, Direction::SEND);
420 : }
421 31 : if (receive_.enabled and not receive_.onHold) {
422 31 : setupConferenceVideoPipeline(*conference_, Direction::RECV);
423 : }
424 : } else {
425 125 : setupVideoPipeline();
426 : }
427 160 : }
428 :
429 : void
430 686 : VideoRtpSession::stop()
431 : {
432 686 : std::lock_guard lock(mutex_);
433 :
434 686 : stopSender(true);
435 686 : stopReceiver(true);
436 :
437 686 : if (socketPair_)
438 156 : socketPair_->interrupt();
439 :
440 686 : rtcpCheckerThread_.join();
441 :
442 : // reset default video quality if exist
443 686 : if (videoBitrateInfo_.videoQualityCurrent != SystemCodecInfo::DEFAULT_NO_QUALITY)
444 154 : videoBitrateInfo_.videoQualityCurrent = SystemCodecInfo::DEFAULT_CODEC_QUALITY;
445 :
446 686 : videoBitrateInfo_.videoBitrateCurrent = SystemCodecInfo::DEFAULT_VIDEO_BITRATE;
447 686 : storeVideoBitrateInfo();
448 :
449 686 : socketPair_.reset();
450 686 : videoLocal_.reset();
451 686 : }
452 :
453 : void
454 324 : VideoRtpSession::setMuted(bool mute, Direction dir)
455 : {
456 324 : std::lock_guard lock(mutex_);
457 :
458 : // Sender
459 324 : if (dir == Direction::SEND) {
460 164 : if (send_.onHold == mute) {
461 151 : JAMI_DBG("[%p] Local already %s", this, mute ? "muted" : "un-muted");
462 151 : return;
463 : }
464 :
465 13 : if ((send_.onHold = mute)) {
466 13 : if (videoLocal_) {
467 5 : auto ms = videoLocal_->getInfo();
468 5 : if (auto ob = recorder_->getStream(ms.name)) {
469 0 : videoLocal_->detach(ob);
470 0 : recorder_->removeStream(ms);
471 : }
472 5 : }
473 13 : stopSender();
474 : } else {
475 0 : restartSender();
476 : }
477 13 : return;
478 : }
479 :
480 : // Receiver
481 160 : if (receive_.onHold == mute) {
482 158 : JAMI_DBG("[%p] Remote already %s", this, mute ? "muted" : "un-muted");
483 158 : return;
484 : }
485 :
486 2 : if ((receive_.onHold = mute)) {
487 2 : if (receiveThread_) {
488 0 : auto ms = receiveThread_->getInfo();
489 0 : if (auto ob = recorder_->getStream(ms.name)) {
490 0 : receiveThread_->detach(ob);
491 0 : recorder_->removeStream(ms);
492 : }
493 0 : }
494 2 : stopReceiver();
495 : } else {
496 0 : startReceiver();
497 0 : if (conference_ and not receive_.onHold) {
498 0 : setupConferenceVideoPipeline(*conference_, Direction::RECV);
499 : }
500 : }
501 324 : }
502 :
503 : void
504 160 : VideoRtpSession::forceKeyFrame()
505 : {
506 160 : std::lock_guard lock(mutex_);
507 : #if __ANDROID__
508 : if (videoLocal_)
509 : emitSignal<libjami::VideoSignal::RequestKeyFrame>(videoLocal_->getName());
510 : #else
511 160 : if (sender_)
512 91 : sender_->forceKeyFrame();
513 : #endif
514 160 : }
515 :
516 : void
517 364 : VideoRtpSession::setRotation(int rotation)
518 : {
519 364 : rotation_.store(rotation);
520 364 : if (receiveThread_)
521 48 : receiveThread_->setRotation(rotation);
522 364 : }
523 :
524 : void
525 127 : VideoRtpSession::setupVideoPipeline()
526 : {
527 127 : if (sender_) {
528 106 : if (videoLocal_) {
529 106 : JAMI_DBG("[%p] Setup video pipeline on local capture device", this);
530 106 : videoLocal_->attach(sender_.get());
531 : }
532 : } else {
533 21 : videoLocal_.reset();
534 : }
535 127 : }
536 :
537 : void
538 100 : VideoRtpSession::setupConferenceVideoPipeline(Conference& conference, Direction dir)
539 : {
540 100 : if (dir == Direction::SEND) {
541 50 : JAMI_DBG("[%p] Setup video sender pipeline on conference %s for call %s",
542 : this,
543 : conference.getConfId().c_str(),
544 : callId_.c_str());
545 50 : videoMixer_ = conference.getVideoMixer();
546 50 : if (sender_) {
547 : // Swap sender from local video to conference video mixer
548 49 : if (videoLocal_)
549 17 : videoLocal_->detach(sender_.get());
550 49 : if (videoMixer_)
551 49 : videoMixer_->attach(sender_.get());
552 : } else {
553 1 : JAMI_WARN("[%p] no sender", this);
554 : }
555 : } else {
556 50 : JAMI_DBG("[%p] Setup video receiver pipeline on conference %s for call %s",
557 : this,
558 : conference.getConfId().c_str(),
559 : callId_.c_str());
560 50 : if (receiveThread_) {
561 50 : receiveThread_->stopSink();
562 50 : if (videoMixer_)
563 50 : videoMixer_->attachVideo(receiveThread_.get(), callId_, streamId_);
564 : } else {
565 0 : JAMI_WARN("[%p] no receiver", this);
566 : }
567 : }
568 100 : }
569 :
570 : void
571 59 : VideoRtpSession::enterConference(Conference& conference)
572 : {
573 59 : std::lock_guard lock(mutex_);
574 :
575 59 : exitConference();
576 :
577 59 : conference_ = &conference;
578 59 : videoMixer_ = conference.getVideoMixer();
579 59 : JAMI_DBG("[%p] enterConference (conf: %s)", this, conference.getConfId().c_str());
580 :
581 59 : if (send_.enabled or receiveThread_) {
582 : // Restart encoder with conference parameter ON in order to unlink HW encoder
583 : // from HW decoder.
584 19 : restartSender();
585 19 : if (conference_) {
586 19 : setupConferenceVideoPipeline(conference, Direction::RECV);
587 : }
588 : }
589 59 : }
590 :
591 : void
592 117 : VideoRtpSession::exitConference()
593 : {
594 117 : std::lock_guard lock(mutex_);
595 :
596 117 : if (!conference_)
597 58 : return;
598 :
599 59 : JAMI_DBG("[%p] exitConference (conf: %s)", this, conference_->getConfId().c_str());
600 :
601 59 : if (videoMixer_) {
602 59 : if (sender_)
603 45 : videoMixer_->detach(sender_.get());
604 :
605 59 : if (receiveThread_) {
606 49 : auto activeStream = videoMixer_->verifyActive(streamId_);
607 49 : videoMixer_->detachVideo(receiveThread_.get());
608 49 : receiveThread_->startSink();
609 49 : if (activeStream)
610 0 : videoMixer_->setActiveStream(streamId_);
611 : }
612 :
613 59 : videoMixer_.reset();
614 : }
615 :
616 59 : conference_ = nullptr;
617 117 : }
618 :
619 : bool
620 412 : VideoRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
621 : {
622 412 : auto rtcpInfoVect = socketPair_->getRtcpRR();
623 412 : unsigned totalLost = 0;
624 412 : unsigned totalJitter = 0;
625 412 : unsigned nbDropNotNull = 0;
626 412 : auto vectSize = rtcpInfoVect.size();
627 :
628 412 : if (vectSize != 0) {
629 20 : for (const auto& it : rtcpInfoVect) {
630 10 : if (it.fraction_lost != 0) // Exclude null drop
631 0 : nbDropNotNull++;
632 10 : totalLost += it.fraction_lost;
633 10 : totalJitter += ntohl(it.jitter);
634 : }
635 10 : rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
636 : // Jitter is expressed in timestamp unit -> convert to milliseconds
637 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
638 10 : rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
639 10 : rtcpi.nb_sample = vectSize;
640 10 : rtcpi.latency = socketPair_->getLastLatency();
641 10 : return true;
642 : }
643 402 : return false;
644 412 : }
645 :
646 : bool
647 412 : VideoRtpSession::check_RCTP_Info_REMB(uint64_t* br)
648 : {
649 412 : auto rtcpInfoVect = socketPair_->getRtcpREMB();
650 :
651 412 : if (!rtcpInfoVect.empty()) {
652 183 : auto pkt = rtcpInfoVect.back();
653 183 : auto temp = cc->parseREMB(pkt);
654 183 : *br = (temp >> 10) | ((temp << 6) & 0xff00) | ((temp << 16) & 0x30000);
655 183 : return true;
656 : }
657 229 : return false;
658 412 : }
659 :
660 : void
661 412 : VideoRtpSession::adaptQualityAndBitrate()
662 : {
663 412 : setupVideoBitrateInfo();
664 :
665 : uint64_t br;
666 412 : if (check_RCTP_Info_REMB(&br)) {
667 183 : delayProcessing(br);
668 : }
669 :
670 412 : RTCPInfo rtcpi {};
671 412 : if (check_RCTP_Info_RR(rtcpi)) {
672 10 : dropProcessing(&rtcpi);
673 : }
674 412 : }
675 :
676 : void
677 10 : VideoRtpSession::dropProcessing(RTCPInfo* rtcpi)
678 : {
679 : // If bitrate has changed, let time to receive fresh RTCP packets
680 10 : auto now = clock::now();
681 10 : auto restartTimer = now - lastMediaRestart_;
682 10 : if (restartTimer < DELAY_AFTER_RESTART) {
683 0 : return;
684 : }
685 : #ifndef __ANDROID__
686 : // Do nothing if jitter is more than 1 second
687 10 : if (rtcpi->jitter > 1000) {
688 0 : return;
689 : }
690 : #endif
691 10 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
692 10 : auto oldBitrate = videoBitrateInfo_.videoBitrateCurrent;
693 10 : int newBitrate = oldBitrate;
694 :
695 : // Fill histoLoss and histoJitter_ with samples
696 10 : if (restartTimer < DELAY_AFTER_RESTART + std::chrono::seconds(1)) {
697 0 : return;
698 : } else {
699 : // If ponderate drops are inferior to 10% that mean drop are not from congestion but from
700 : // network...
701 : // ... we can increase
702 10 : if (pondLoss >= 5.0f && rtcpi->packetLoss > 0.0f) {
703 0 : newBitrate *= 1.0f - rtcpi->packetLoss / 150.0f;
704 0 : histoLoss_.clear();
705 0 : lastMediaRestart_ = now;
706 0 : JAMI_DBG(
707 : "[BandwidthAdapt] Detected transmission bandwidth overuse, decrease bitrate from "
708 : "%u Kbps to %d Kbps, ratio %f (ponderate loss: %f%%, packet loss rate: %f%%)",
709 : oldBitrate,
710 : newBitrate,
711 : (float) newBitrate / oldBitrate,
712 : pondLoss,
713 : rtcpi->packetLoss);
714 : }
715 : }
716 :
717 10 : setNewBitrate(newBitrate);
718 : }
719 :
720 : void
721 183 : VideoRtpSession::delayProcessing(int br)
722 : {
723 183 : int newBitrate = videoBitrateInfo_.videoBitrateCurrent;
724 183 : if (br == 0x6803)
725 0 : newBitrate *= 0.85f;
726 183 : else if (br == 0x7378) {
727 183 : auto now = clock::now();
728 183 : auto msSinceLastDecrease = std::chrono::duration_cast<std::chrono::milliseconds>(
729 183 : now - lastBitrateDecrease);
730 183 : auto increaseCoefficient = std::min(msSinceLastDecrease.count() / 600000.0f + 1.0f, 1.05f);
731 183 : newBitrate *= increaseCoefficient;
732 : } else
733 0 : return;
734 :
735 183 : setNewBitrate(newBitrate);
736 : }
737 :
738 : void
739 193 : VideoRtpSession::setNewBitrate(unsigned int newBR)
740 : {
741 193 : newBR = std::max(newBR, videoBitrateInfo_.videoBitrateMin);
742 193 : newBR = std::min(newBR, videoBitrateInfo_.videoBitrateMax);
743 :
744 193 : if (newBR < videoBitrateInfo_.videoBitrateCurrent)
745 0 : lastBitrateDecrease = clock::now();
746 :
747 193 : if (videoBitrateInfo_.videoBitrateCurrent != newBR) {
748 36 : videoBitrateInfo_.videoBitrateCurrent = newBR;
749 36 : storeVideoBitrateInfo();
750 :
751 : #if __ANDROID__
752 : if (auto input_device = std::dynamic_pointer_cast<VideoInput>(videoLocal_))
753 : emitSignal<libjami::VideoSignal::SetBitrate>(input_device->getConfig().name, (int) newBR);
754 : #endif
755 :
756 36 : if (sender_) {
757 36 : auto ret = sender_->setBitrate(newBR);
758 36 : if (ret == -1)
759 0 : JAMI_ERR("Fail to access the encoder");
760 36 : else if (ret == 0)
761 0 : restartSender();
762 : } else {
763 0 : JAMI_ERR("Fail to access the sender");
764 : }
765 : }
766 193 : }
767 :
768 : void
769 883 : VideoRtpSession::setupVideoBitrateInfo()
770 : {
771 883 : auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec);
772 883 : if (codecVideo) {
773 568 : videoBitrateInfo_ = {
774 568 : codecVideo->bitrate,
775 568 : codecVideo->minBitrate,
776 568 : codecVideo->maxBitrate,
777 568 : codecVideo->quality,
778 568 : codecVideo->minQuality,
779 568 : codecVideo->maxQuality,
780 568 : videoBitrateInfo_.cptBitrateChecking,
781 568 : videoBitrateInfo_.maxBitrateChecking,
782 568 : videoBitrateInfo_.packetLostThreshold,
783 : };
784 : } else {
785 : videoBitrateInfo_
786 315 : = {0, 0, 0, 0, 0, 0, 0, MAX_ADAPTATIVE_BITRATE_ITERATION, PACKET_LOSS_THRESHOLD};
787 : }
788 883 : }
789 :
790 : void
791 722 : VideoRtpSession::storeVideoBitrateInfo()
792 : {
793 722 : if (auto codecVideo = std::static_pointer_cast<jami::SystemVideoCodecInfo>(send_.codec)) {
794 477 : codecVideo->bitrate = videoBitrateInfo_.videoBitrateCurrent;
795 477 : codecVideo->quality = videoBitrateInfo_.videoQualityCurrent;
796 722 : }
797 722 : }
798 :
799 : void
800 412 : VideoRtpSession::processRtcpChecker()
801 : {
802 412 : adaptQualityAndBitrate();
803 412 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
804 412 : }
805 :
806 : void
807 32 : VideoRtpSession::attachRemoteRecorder(const MediaStream& ms)
808 : {
809 32 : std::lock_guard lock(mutex_);
810 32 : if (!recorder_ || !receiveThread_)
811 0 : return;
812 32 : if (auto ob = recorder_->addStream(ms)) {
813 32 : receiveThread_->attach(ob);
814 : }
815 32 : }
816 :
817 : void
818 0 : VideoRtpSession::attachLocalRecorder(const MediaStream& ms)
819 : {
820 0 : std::lock_guard lock(mutex_);
821 0 : if (!recorder_ || !videoLocal_ || !Manager::instance().videoPreferences.getRecordPreview())
822 0 : return;
823 0 : if (auto ob = recorder_->addStream(ms)) {
824 0 : videoLocal_->attach(ob);
825 : }
826 0 : }
827 :
828 : void
829 6 : VideoRtpSession::initRecorder()
830 : {
831 6 : if (!recorder_)
832 0 : return;
833 6 : if (receiveThread_) {
834 6 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
835 1 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
836 1 : if (auto shared = w.lock())
837 1 : shared->attachRemoteRecorder(ms);
838 1 : });
839 1 : });
840 : }
841 6 : if (videoLocal_ && !send_.onHold) {
842 4 : videoLocal_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
843 0 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
844 0 : if (auto shared = w.lock())
845 0 : shared->attachLocalRecorder(ms);
846 0 : });
847 0 : });
848 : }
849 : }
850 :
851 : void
852 319 : VideoRtpSession::deinitRecorder()
853 : {
854 319 : if (!recorder_)
855 0 : return;
856 319 : if (receiveThread_) {
857 132 : auto ms = receiveThread_->getInfo();
858 132 : if (auto ob = recorder_->getStream(ms.name)) {
859 0 : receiveThread_->detach(ob);
860 0 : recorder_->removeStream(ms);
861 : }
862 132 : }
863 319 : if (videoLocal_) {
864 0 : auto ms = videoLocal_->getInfo();
865 0 : if (auto ob = recorder_->getStream(ms.name)) {
866 0 : videoLocal_->detach(ob);
867 0 : recorder_->removeStream(ms);
868 : }
869 0 : }
870 : }
871 :
872 : void
873 156 : VideoRtpSession::setChangeOrientationCallback(std::function<void(int)> cb)
874 : {
875 156 : changeOrientationCallback_ = std::move(cb);
876 156 : if (sender_)
877 11 : sender_->setChangeOrientationCallback(changeOrientationCallback_);
878 156 : }
879 :
880 : float
881 10 : VideoRtpSession::getPonderateLoss(float lastLoss)
882 : {
883 10 : float pond = 0.0f, pondLoss = 0.0f, totalPond = 0.0f;
884 10 : constexpr float coefficient_a = -1 / 100.0f;
885 10 : constexpr float coefficient_b = 100.0f;
886 :
887 10 : auto now = clock::now();
888 :
889 10 : histoLoss_.emplace_back(now, lastLoss);
890 :
891 20 : for (auto it = histoLoss_.begin(); it != histoLoss_.end();) {
892 10 : auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->first);
893 :
894 : // 1ms -> 100%
895 : // 2000ms -> 80%
896 10 : if (delay <= EXPIRY_TIME_RTCP) {
897 10 : if (it->second == 0.0f)
898 10 : pond = 20.0f; // Reduce weight of null drop
899 : else
900 0 : pond = std::min(delay.count() * coefficient_a + coefficient_b, 100.0f);
901 10 : totalPond += pond;
902 10 : pondLoss += it->second * pond;
903 10 : ++it;
904 : } else
905 0 : it = histoLoss_.erase(it);
906 : }
907 10 : if (totalPond == 0)
908 0 : return 0.0f;
909 :
910 10 : return pondLoss / totalPond;
911 : }
912 :
913 : void
914 5670 : VideoRtpSession::delayMonitor(int gradient, int deltaT)
915 : {
916 5670 : float estimation = cc->kalmanFilter(gradient);
917 5670 : float thresh = cc->get_thresh();
918 :
919 5670 : cc->update_thresh(estimation, deltaT);
920 :
921 5670 : BandwidthUsage bwState = cc->get_bw_state(estimation, thresh);
922 5670 : auto now = clock::now();
923 :
924 5670 : if (bwState == BandwidthUsage::bwOverusing) {
925 0 : auto remb_timer_dec = now - last_REMB_dec_;
926 0 : if ((not remb_dec_cnt_) or (remb_timer_dec > DELAY_AFTER_REMB_DEC)) {
927 0 : last_REMB_dec_ = now;
928 0 : remb_dec_cnt_ = 0;
929 : }
930 :
931 : // Limit REMB decrease to MAX_REMB_DEC every DELAY_AFTER_REMB_DEC ms
932 0 : if (remb_dec_cnt_ < MAX_REMB_DEC && remb_timer_dec < DELAY_AFTER_REMB_DEC) {
933 0 : remb_dec_cnt_++;
934 0 : JAMI_WARN("[BandwidthAdapt] Detected reception bandwidth overuse");
935 0 : uint8_t* buf = nullptr;
936 0 : uint64_t br = 0x6803; // Decrease 3
937 0 : auto v = cc->createREMB(br);
938 0 : buf = &v[0];
939 0 : socketPair_->writeData(buf, v.size());
940 0 : last_REMB_inc_ = clock::now();
941 0 : }
942 5670 : } else if (bwState == BandwidthUsage::bwNormal) {
943 4404 : auto remb_timer_inc = now - last_REMB_inc_;
944 4404 : if (remb_timer_inc > DELAY_AFTER_REMB_INC) {
945 183 : uint8_t* buf = nullptr;
946 183 : uint64_t br = 0x7378; // INcrease
947 183 : auto v = cc->createREMB(br);
948 183 : buf = &v[0];
949 183 : socketPair_->writeData(buf, v.size());
950 183 : last_REMB_inc_ = clock::now();
951 183 : }
952 : }
953 5670 : }
954 : } // namespace video
955 : } // namespace jami
|