Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 :
20 : #include "audio_rtp_session.h"
21 :
22 : #include "logger.h"
23 :
24 : #include "audio_receive_thread.h"
25 : #include "audio_sender.h"
26 : #include "socket_pair.h"
27 : #include "media_recorder.h"
28 : #include "media_encoder.h"
29 : #include "media_device.h"
30 : #include "media_const.h"
31 :
32 : #include "audio/audio_input.h"
33 : #include "audio/ringbufferpool.h"
34 : #include "client/videomanager.h"
35 : #include "manager.h"
36 :
37 : #include <asio/io_context.hpp>
38 : #include <asio/post.hpp>
39 :
40 : namespace jami {
41 :
42 363 : AudioRtpSession::AudioRtpSession(const std::string& callId,
43 : const std::string& streamId,
44 363 : const std::shared_ptr<MediaRecorder>& rec)
45 : : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
46 970 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
47 :
48 : {
49 363 : recorder_ = rec;
50 1452 : JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
51 :
52 : // don't move this into the initializer list or Cthulus will emerge
53 363 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
54 363 : }
55 :
56 363 : AudioRtpSession::~AudioRtpSession()
57 : {
58 363 : deinitRecorder();
59 363 : stop();
60 1452 : JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
61 363 : }
62 :
63 : void
64 166 : AudioRtpSession::startSender()
65 : {
66 166 : std::lock_guard lock(mutex_);
67 664 : JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]", input_, muteState_ ? "YES" : "NO");
68 :
69 166 : if (not send_.enabled or send_.hold) {
70 0 : JAMI_WARNING("Audio sending disabled");
71 0 : if (sender_) {
72 0 : if (socketPair_)
73 0 : socketPair_->interrupt();
74 0 : if (audioInput_)
75 0 : audioInput_->detach(sender_.get());
76 0 : sender_.reset();
77 : }
78 0 : return;
79 : }
80 :
81 166 : if (sender_)
82 0 : JAMI_WARNING("Restarting audio sender");
83 166 : if (audioInput_)
84 0 : audioInput_->detach(sender_.get());
85 :
86 166 : bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
87 166 : auto audioInputId = streamId_;
88 166 : if (fileAudio) {
89 0 : auto suffix = input_;
90 0 : static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
91 0 : const auto pos = input_.find(sep);
92 0 : if (pos != std::string::npos) {
93 0 : suffix = input_.substr(pos + sep.size());
94 : }
95 0 : audioInputId = suffix;
96 0 : }
97 :
98 : // sender sets up input correctly, we just keep a reference in case startSender is called
99 166 : audioInput_ = jami::getAudioInput(audioInputId);
100 166 : audioInput_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
101 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
102 0 : if (auto shared = w.lock())
103 0 : shared->attachLocalRecorder(ms);
104 0 : });
105 0 : });
106 166 : audioInput_->setMuted(muteState_);
107 166 : audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
108 166 : if (!fileAudio) {
109 166 : auto newParams = audioInput_->switchInput(input_);
110 : try {
111 166 : if (newParams.valid() && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
112 166 : localAudioParams_ = newParams.get();
113 : } else {
114 0 : JAMI_ERROR("No valid new audio parameters");
115 0 : return;
116 : }
117 0 : } catch (const std::exception& e) {
118 0 : JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
119 0 : return;
120 0 : }
121 166 : }
122 166 : if (streamId_ != audioInput_->getId())
123 0 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(streamId_, audioInput_->getId());
124 :
125 166 : send_.fecEnabled = true;
126 :
127 : // be sure to not send any packets before saving last RTP seq value
128 166 : socketPair_->stopSendOp();
129 166 : if (sender_)
130 0 : initSeqVal_ = sender_->getLastSeqValue() + 1;
131 : try {
132 166 : sender_.reset();
133 166 : socketPair_->stopSendOp(false);
134 166 : sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
135 0 : } catch (const MediaEncoderException& e) {
136 0 : JAMI_ERROR("{}", e.what());
137 0 : send_.enabled = false;
138 0 : }
139 :
140 166 : if (voiceCallback_)
141 166 : sender_->setVoiceCallback(voiceCallback_);
142 :
143 : // NOTE do after sender/encoder are ready
144 166 : auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
145 166 : audioInput_->setFormat(codec->audioformat);
146 166 : audioInput_->attach(sender_.get());
147 :
148 166 : if (not rtcpCheckerThread_.isRunning())
149 166 : rtcpCheckerThread_.start();
150 166 : }
151 :
152 : void
153 0 : AudioRtpSession::restartSender()
154 : {
155 0 : std::lock_guard lock(mutex_);
156 : // ensure that start has been called before restart
157 0 : if (not socketPair_) {
158 0 : return;
159 : }
160 :
161 0 : startSender();
162 0 : }
163 :
164 : void
165 166 : AudioRtpSession::startReceiver()
166 : {
167 166 : if (socketPair_)
168 166 : socketPair_->setReadBlockingMode(true);
169 :
170 166 : if ((not receive_.enabled) or receive_.hold) {
171 0 : JAMI_WARNING("Audio receiving disabled");
172 0 : receiveThread_.reset();
173 0 : return;
174 : }
175 :
176 166 : if (receiveThread_)
177 0 : JAMI_WARNING("Restarting audio receiver");
178 :
179 166 : auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
180 166 : receiveThread_.reset(
181 166 : new AudioReceiveThread(streamId_, accountAudioCodec->audioformat, receive_.receiving_sdp, mtu_));
182 :
183 166 : receiveThread_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
184 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
185 0 : if (auto shared = w.lock())
186 0 : shared->attachRemoteRecorder(ms);
187 0 : });
188 0 : });
189 166 : receiveThread_->addIOContext(*socketPair_);
190 166 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
191 166 : receiveThread_->startReceiver();
192 :
193 : // Make the default ring buffer read the audio from the stream
194 166 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(RingBufferPool::DEFAULT_ID, streamId_);
195 166 : }
196 :
197 : void
198 166 : AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
199 : {
200 166 : std::lock_guard lock(mutex_);
201 :
202 166 : if (not send_.enabled and not receive_.enabled) {
203 0 : stop();
204 0 : return;
205 : }
206 :
207 : try {
208 166 : if (rtp_sock and rtcp_sock) {
209 166 : if (send_.addr) {
210 166 : rtp_sock->setDefaultRemoteAddress(send_.addr);
211 : }
212 :
213 166 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
214 166 : if (rtcpAddr) {
215 166 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
216 : }
217 :
218 166 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
219 : } else {
220 0 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
221 : }
222 :
223 166 : if (send_.crypto and receive_.crypto) {
224 664 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
225 332 : receive_.crypto.getSrtpKeyInfo().c_str(),
226 332 : send_.crypto.getCryptoSuite().c_str(),
227 332 : send_.crypto.getSrtpKeyInfo().c_str());
228 : }
229 0 : } catch (const std::runtime_error& e) {
230 0 : JAMI_ERROR("Socket creation failed: {}", e.what());
231 0 : return;
232 0 : }
233 :
234 166 : startSender();
235 166 : startReceiver();
236 166 : }
237 :
238 : void
239 761 : AudioRtpSession::stop()
240 : {
241 761 : std::lock_guard lock(mutex_);
242 :
243 3044 : JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
244 :
245 760 : if (not receiveThread_)
246 595 : return;
247 :
248 166 : if (socketPair_)
249 166 : socketPair_->setReadBlockingMode(false);
250 :
251 166 : receiveThread_->stopReceiver();
252 :
253 : // Unbind the default ring buffer from this audio stream
254 166 : Manager::instance().getRingBufferPool().unBindHalfDuplexOut(RingBufferPool::DEFAULT_ID, streamId_);
255 :
256 166 : if (audioInput_)
257 166 : audioInput_->detach(sender_.get());
258 :
259 166 : if (socketPair_)
260 166 : socketPair_->interrupt();
261 :
262 166 : rtcpCheckerThread_.join();
263 :
264 166 : receiveThread_.reset();
265 166 : sender_.reset();
266 166 : socketPair_.reset();
267 166 : audioInput_.reset();
268 761 : }
269 :
270 : void
271 176 : AudioRtpSession::setMuted(bool muted, Direction dir)
272 : {
273 176 : asio::post(*Manager::instance().ioContext(), [w = weak_from_this(), muted, dir]() {
274 176 : if (auto shared = w.lock()) {
275 176 : std::lock_guard lock(shared->mutex_);
276 176 : if (dir == Direction::SEND) {
277 171 : shared->muteState_ = muted;
278 171 : if (shared->audioInput_) {
279 167 : shared->audioInput_->setMuted(muted);
280 : }
281 : } else {
282 5 : if (shared->receiveThread_) {
283 2 : auto ms = shared->receiveThread_->getInfo();
284 2 : ms.name = shared->streamId_ + ":remote";
285 2 : if (muted) {
286 2 : if (auto* ob = shared->recorder_->getStream(ms.name)) {
287 0 : shared->receiveThread_->detach(ob);
288 0 : shared->recorder_->removeStream(ms);
289 : }
290 : } else {
291 0 : if (auto* ob = shared->recorder_->addStream(ms)) {
292 0 : shared->receiveThread_->attach(ob);
293 : }
294 : }
295 2 : }
296 : }
297 352 : }
298 176 : });
299 176 : }
300 :
301 : void
302 166 : AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
303 : {
304 166 : std::lock_guard lock(mutex_);
305 166 : voiceCallback_ = std::move(cb);
306 166 : if (sender_) {
307 20 : sender_->setVoiceCallback(voiceCallback_);
308 : }
309 166 : }
310 :
311 : bool
312 276 : AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
313 : {
314 276 : auto rtcpInfoVect = socketPair_->getRtcpRR();
315 275 : unsigned totalLost = 0;
316 275 : unsigned totalJitter = 0;
317 275 : unsigned nbDropNotNull = 0;
318 275 : auto vectSize = rtcpInfoVect.size();
319 :
320 275 : if (vectSize != 0) {
321 0 : for (const auto& it : rtcpInfoVect) {
322 0 : if (it.fraction_lost != 0) // Exclude null drop
323 0 : nbDropNotNull++;
324 0 : totalLost += it.fraction_lost;
325 0 : totalJitter += ntohl(it.jitter);
326 : }
327 0 : rtcpi.packetLoss = nbDropNotNull ? static_cast<float>((100 * totalLost) / (256.0 * nbDropNotNull)) : 0;
328 : // Jitter is expressed in timestamp unit -> convert to milliseconds
329 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
330 0 : rtcpi.jitter = static_cast<unsigned int>(
331 0 : (static_cast<float>(totalJitter) / static_cast<float>(vectSize) / 90000.0f) * 1000.0f);
332 0 : rtcpi.nb_sample = vectSize;
333 0 : rtcpi.latency = static_cast<float>(socketPair_->getLastLatency());
334 0 : return true;
335 : }
336 275 : return false;
337 275 : }
338 :
339 : void
340 275 : AudioRtpSession::adaptQualityAndBitrate()
341 : {
342 275 : RTCPInfo rtcpi {};
343 275 : if (check_RCTP_Info_RR(rtcpi)) {
344 0 : dropProcessing(&rtcpi);
345 : }
346 275 : }
347 :
348 : void
349 0 : AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
350 : {
351 0 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
352 0 : setNewPacketLoss(static_cast<unsigned int>(pondLoss));
353 0 : }
354 :
355 : void
356 0 : AudioRtpSession::setNewPacketLoss(unsigned int newPL)
357 : {
358 0 : newPL = std::clamp((int) newPL, 0, 100);
359 0 : if (newPL != packetLoss_) {
360 0 : if (sender_) {
361 0 : auto ret = sender_->setPacketLoss(newPL);
362 0 : packetLoss_ = newPL;
363 0 : if (ret == -1)
364 0 : JAMI_ERROR("Fail to access the encoder");
365 : } else {
366 0 : JAMI_ERROR("Fail to access the sender");
367 : }
368 : }
369 0 : }
370 :
371 : float
372 0 : AudioRtpSession::getPonderateLoss(float lastLoss)
373 : {
374 : static float pond = 10.0f;
375 :
376 0 : pond = floor(0.5 * lastLoss + 0.5 * pond);
377 0 : if (lastLoss > pond) {
378 0 : return lastLoss;
379 : } else {
380 0 : return pond;
381 : }
382 : }
383 :
384 : void
385 275 : AudioRtpSession::processRtcpChecker()
386 : {
387 275 : adaptQualityAndBitrate();
388 275 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
389 276 : }
390 :
391 : void
392 0 : AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
393 : {
394 0 : std::lock_guard lock(mutex_);
395 0 : if (!recorder_ || !receiveThread_)
396 0 : return;
397 0 : MediaStream remoteMS = ms;
398 0 : remoteMS.name = streamId_ + ":remote";
399 0 : if (auto* ob = recorder_->addStream(remoteMS)) {
400 0 : receiveThread_->attach(ob);
401 : }
402 0 : }
403 :
404 : void
405 0 : AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
406 : {
407 0 : std::lock_guard lock(mutex_);
408 0 : if (!recorder_ || !audioInput_)
409 0 : return;
410 0 : MediaStream localMS = ms;
411 0 : localMS.name = streamId_ + ":local";
412 0 : if (auto* ob = recorder_->addStream(localMS)) {
413 0 : audioInput_->attach(ob);
414 : }
415 0 : }
416 :
417 : void
418 1 : AudioRtpSession::initRecorder()
419 : {
420 1 : if (!recorder_)
421 0 : return;
422 1 : if (receiveThread_)
423 1 : receiveThread_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
424 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
425 0 : if (auto shared = w.lock())
426 0 : shared->attachRemoteRecorder(ms);
427 0 : });
428 0 : });
429 1 : if (audioInput_)
430 1 : audioInput_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
431 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
432 0 : if (auto shared = w.lock())
433 0 : shared->attachLocalRecorder(ms);
434 0 : });
435 0 : });
436 : }
437 :
438 : void
439 364 : AudioRtpSession::deinitRecorder()
440 : {
441 364 : if (!recorder_)
442 0 : return;
443 364 : if (receiveThread_) {
444 0 : auto ms = receiveThread_->getInfo();
445 0 : ms.name = streamId_ + ":remote";
446 0 : if (auto* ob = recorder_->getStream(ms.name)) {
447 0 : receiveThread_->detach(ob);
448 0 : recorder_->removeStream(ms);
449 : }
450 0 : }
451 364 : if (audioInput_) {
452 0 : auto ms = audioInput_->getInfo();
453 0 : ms.name = streamId_ + ":local";
454 0 : if (auto* ob = recorder_->getStream(ms.name)) {
455 0 : audioInput_->detach(ob);
456 0 : recorder_->removeStream(ms);
457 : }
458 0 : }
459 : }
460 :
461 : } // namespace jami
|