Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 :
20 : #include "audio_rtp_session.h"
21 :
22 : #include "logger.h"
23 : #include "noncopyable.h"
24 : #include "sip/sdp.h"
25 :
26 : #include "audio_receive_thread.h"
27 : #include "audio_sender.h"
28 : #include "socket_pair.h"
29 : #include "media_recorder.h"
30 : #include "media_encoder.h"
31 : #include "media_decoder.h"
32 : #include "media_io_handle.h"
33 : #include "media_device.h"
34 : #include "media_const.h"
35 :
36 : #include "audio/audio_input.h"
37 : #include "audio/ringbufferpool.h"
38 : #include "audio/resampler.h"
39 : #include "client/videomanager.h"
40 : #include "manager.h"
41 : #include "observer.h"
42 :
43 : #include <asio/io_context.hpp>
44 : #include <asio/post.hpp>
45 : #include <sstream>
46 :
47 : namespace jami {
48 :
49 373 : AudioRtpSession::AudioRtpSession(const std::string& callId,
50 : const std::string& streamId,
51 373 : const std::shared_ptr<MediaRecorder>& rec)
52 : : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
53 1095 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
54 :
55 : {
56 373 : recorder_ = rec;
57 1492 : JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
58 :
59 : // don't move this into the initializer list or Cthulus will emerge
60 373 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
61 373 : }
62 :
63 373 : AudioRtpSession::~AudioRtpSession()
64 : {
65 373 : deinitRecorder();
66 373 : stop();
67 1492 : JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
68 373 : }
69 :
70 : void
71 180 : AudioRtpSession::startSender()
72 : {
73 180 : std::lock_guard lock(mutex_);
74 720 : JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]", input_, muteState_ ? "YES" : "NO");
75 :
76 180 : if (not send_.enabled or send_.onHold) {
77 0 : JAMI_WARNING("Audio sending disabled");
78 0 : if (sender_) {
79 0 : if (socketPair_)
80 0 : socketPair_->interrupt();
81 0 : if (audioInput_)
82 0 : audioInput_->detach(sender_.get());
83 0 : sender_.reset();
84 : }
85 0 : return;
86 : }
87 :
88 180 : if (sender_)
89 0 : JAMI_WARNING("Restarting audio sender");
90 180 : if (audioInput_)
91 0 : audioInput_->detach(sender_.get());
92 :
93 180 : bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
94 180 : auto audioInputId = streamId_;
95 180 : if (fileAudio) {
96 0 : auto suffix = input_;
97 0 : static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
98 0 : const auto pos = input_.find(sep);
99 0 : if (pos != std::string::npos) {
100 0 : suffix = input_.substr(pos + sep.size());
101 : }
102 0 : audioInputId = suffix;
103 0 : }
104 :
105 : // sender sets up input correctly, we just keep a reference in case startSender is called
106 180 : audioInput_ = jami::getAudioInput(audioInputId);
107 180 : audioInput_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
108 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
109 0 : if (auto shared = w.lock())
110 0 : shared->attachLocalRecorder(ms);
111 0 : });
112 0 : });
113 180 : audioInput_->setMuted(muteState_);
114 180 : audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
115 180 : if (!fileAudio) {
116 180 : auto newParams = audioInput_->switchInput(input_);
117 : try {
118 180 : if (newParams.valid() && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
119 180 : localAudioParams_ = newParams.get();
120 : } else {
121 0 : JAMI_ERROR("No valid new audio parameters");
122 0 : return;
123 : }
124 0 : } catch (const std::exception& e) {
125 0 : JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
126 0 : return;
127 0 : }
128 180 : }
129 180 : if (streamId_ != audioInput_->getId())
130 0 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(streamId_, audioInput_->getId());
131 :
132 180 : send_.fecEnabled = true;
133 :
134 : // be sure to not send any packets before saving last RTP seq value
135 180 : socketPair_->stopSendOp();
136 180 : if (sender_)
137 0 : initSeqVal_ = sender_->getLastSeqValue() + 1;
138 : try {
139 180 : sender_.reset();
140 180 : socketPair_->stopSendOp(false);
141 180 : sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
142 0 : } catch (const MediaEncoderException& e) {
143 0 : JAMI_ERROR("{}", e.what());
144 0 : send_.enabled = false;
145 0 : }
146 :
147 180 : if (voiceCallback_)
148 180 : sender_->setVoiceCallback(voiceCallback_);
149 :
150 : // NOTE do after sender/encoder are ready
151 180 : auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
152 180 : audioInput_->setFormat(codec->audioformat);
153 180 : audioInput_->attach(sender_.get());
154 :
155 180 : if (not rtcpCheckerThread_.isRunning())
156 180 : rtcpCheckerThread_.start();
157 180 : }
158 :
159 : void
160 0 : AudioRtpSession::restartSender()
161 : {
162 0 : std::lock_guard lock(mutex_);
163 : // ensure that start has been called before restart
164 0 : if (not socketPair_) {
165 0 : return;
166 : }
167 :
168 0 : startSender();
169 0 : }
170 :
171 : void
172 180 : AudioRtpSession::startReceiver()
173 : {
174 180 : if (socketPair_)
175 180 : socketPair_->setReadBlockingMode(true);
176 :
177 180 : if (not receive_.enabled or receive_.onHold) {
178 0 : JAMI_WARNING("Audio receiving disabled");
179 0 : receiveThread_.reset();
180 0 : return;
181 : }
182 :
183 180 : if (receiveThread_)
184 0 : JAMI_WARNING("Restarting audio receiver");
185 :
186 180 : auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
187 180 : receiveThread_.reset(
188 180 : new AudioReceiveThread(streamId_, accountAudioCodec->audioformat, receive_.receiving_sdp, mtu_));
189 :
190 180 : receiveThread_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
191 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
192 0 : if (auto shared = w.lock())
193 0 : shared->attachRemoteRecorder(ms);
194 0 : });
195 0 : });
196 180 : receiveThread_->addIOContext(*socketPair_);
197 180 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
198 180 : receiveThread_->startReceiver();
199 180 : }
200 :
201 : void
202 180 : AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
203 : {
204 180 : std::lock_guard lock(mutex_);
205 :
206 180 : if (not send_.enabled and not receive_.enabled) {
207 0 : stop();
208 0 : return;
209 : }
210 :
211 : try {
212 180 : if (rtp_sock and rtcp_sock) {
213 173 : if (send_.addr) {
214 173 : rtp_sock->setDefaultRemoteAddress(send_.addr);
215 : }
216 :
217 174 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
218 174 : if (rtcpAddr) {
219 174 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
220 : }
221 :
222 174 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
223 : } else {
224 6 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
225 : }
226 :
227 180 : if (send_.crypto and receive_.crypto) {
228 720 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
229 360 : receive_.crypto.getSrtpKeyInfo().c_str(),
230 360 : send_.crypto.getCryptoSuite().c_str(),
231 360 : send_.crypto.getSrtpKeyInfo().c_str());
232 : }
233 0 : } catch (const std::runtime_error& e) {
234 0 : JAMI_ERROR("Socket creation failed: {}", e.what());
235 0 : return;
236 0 : }
237 :
238 180 : startSender();
239 180 : startReceiver();
240 180 : }
241 :
242 : void
243 797 : AudioRtpSession::stop()
244 : {
245 797 : std::lock_guard lock(mutex_);
246 :
247 3188 : JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
248 :
249 797 : if (not receiveThread_)
250 617 : return;
251 :
252 180 : if (socketPair_)
253 180 : socketPair_->setReadBlockingMode(false);
254 :
255 180 : receiveThread_->stopReceiver();
256 :
257 180 : if (audioInput_)
258 180 : audioInput_->detach(sender_.get());
259 :
260 180 : if (socketPair_)
261 180 : socketPair_->interrupt();
262 :
263 180 : rtcpCheckerThread_.join();
264 :
265 180 : receiveThread_.reset();
266 180 : sender_.reset();
267 180 : socketPair_.reset();
268 180 : audioInput_.reset();
269 797 : }
270 :
271 : void
272 190 : AudioRtpSession::setMuted(bool muted, Direction dir)
273 : {
274 190 : asio::post(*Manager::instance().ioContext(), [w = weak_from_this(), muted, dir]() {
275 190 : if (auto shared = w.lock()) {
276 190 : std::lock_guard lock(shared->mutex_);
277 190 : if (dir == Direction::SEND) {
278 185 : shared->muteState_ = muted;
279 185 : if (shared->audioInput_) {
280 23 : shared->audioInput_->setMuted(muted);
281 : }
282 : } else {
283 5 : if (shared->receiveThread_) {
284 2 : auto ms = shared->receiveThread_->getInfo();
285 2 : ms.name = shared->streamId_ + ":remote";
286 2 : if (muted) {
287 2 : if (auto ob = shared->recorder_->getStream(ms.name)) {
288 0 : shared->receiveThread_->detach(ob);
289 0 : shared->recorder_->removeStream(ms);
290 : }
291 : } else {
292 0 : if (auto ob = shared->recorder_->addStream(ms)) {
293 0 : shared->receiveThread_->attach(ob);
294 : }
295 : }
296 2 : }
297 : }
298 380 : }
299 190 : });
300 190 : }
301 :
302 : void
303 180 : AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
304 : {
305 180 : std::lock_guard lock(mutex_);
306 180 : voiceCallback_ = std::move(cb);
307 180 : if (sender_) {
308 21 : sender_->setVoiceCallback(voiceCallback_);
309 : }
310 180 : }
311 :
312 : bool
313 363 : AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
314 : {
315 363 : auto rtcpInfoVect = socketPair_->getRtcpRR();
316 363 : unsigned totalLost = 0;
317 363 : unsigned totalJitter = 0;
318 363 : unsigned nbDropNotNull = 0;
319 363 : auto vectSize = rtcpInfoVect.size();
320 :
321 363 : if (vectSize != 0) {
322 0 : for (const auto& it : rtcpInfoVect) {
323 0 : if (it.fraction_lost != 0) // Exclude null drop
324 0 : nbDropNotNull++;
325 0 : totalLost += it.fraction_lost;
326 0 : totalJitter += ntohl(it.jitter);
327 : }
328 0 : rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
329 : // Jitter is expressed in timestamp unit -> convert to milliseconds
330 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
331 0 : rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
332 0 : rtcpi.nb_sample = vectSize;
333 0 : rtcpi.latency = socketPair_->getLastLatency();
334 0 : return true;
335 : }
336 363 : return false;
337 363 : }
338 :
339 : void
340 363 : AudioRtpSession::adaptQualityAndBitrate()
341 : {
342 363 : RTCPInfo rtcpi {};
343 363 : if (check_RCTP_Info_RR(rtcpi)) {
344 0 : dropProcessing(&rtcpi);
345 : }
346 363 : }
347 :
348 : void
349 0 : AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
350 : {
351 0 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
352 0 : setNewPacketLoss(pondLoss);
353 0 : }
354 :
355 : void
356 0 : AudioRtpSession::setNewPacketLoss(unsigned int newPL)
357 : {
358 0 : newPL = std::clamp((int) newPL, 0, 100);
359 0 : if (newPL != packetLoss_) {
360 0 : if (sender_) {
361 0 : auto ret = sender_->setPacketLoss(newPL);
362 0 : packetLoss_ = newPL;
363 0 : if (ret == -1)
364 0 : JAMI_ERROR("Fail to access the encoder");
365 : } else {
366 0 : JAMI_ERROR("Fail to access the sender");
367 : }
368 : }
369 0 : }
370 :
371 : float
372 0 : AudioRtpSession::getPonderateLoss(float lastLoss)
373 : {
374 : static float pond = 10.0f;
375 :
376 0 : pond = floor(0.5 * lastLoss + 0.5 * pond);
377 0 : if (lastLoss > pond) {
378 0 : return lastLoss;
379 : } else {
380 0 : return pond;
381 : }
382 : }
383 :
384 : void
385 363 : AudioRtpSession::processRtcpChecker()
386 : {
387 363 : adaptQualityAndBitrate();
388 363 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
389 362 : }
390 :
391 : void
392 0 : AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
393 : {
394 0 : std::lock_guard lock(mutex_);
395 0 : if (!recorder_ || !receiveThread_)
396 0 : return;
397 0 : MediaStream remoteMS = ms;
398 0 : remoteMS.name = streamId_ + ":remote";
399 0 : if (auto ob = recorder_->addStream(remoteMS)) {
400 0 : receiveThread_->attach(ob);
401 : }
402 0 : }
403 :
404 : void
405 0 : AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
406 : {
407 0 : std::lock_guard lock(mutex_);
408 0 : if (!recorder_ || !audioInput_)
409 0 : return;
410 0 : MediaStream localMS = ms;
411 0 : localMS.name = streamId_ + ":local";
412 0 : if (auto ob = recorder_->addStream(localMS)) {
413 0 : audioInput_->attach(ob);
414 : }
415 0 : }
416 :
417 : void
418 1 : AudioRtpSession::initRecorder()
419 : {
420 1 : if (!recorder_)
421 0 : return;
422 1 : if (receiveThread_)
423 1 : receiveThread_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
424 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
425 0 : if (auto shared = w.lock())
426 0 : shared->attachRemoteRecorder(ms);
427 0 : });
428 0 : });
429 1 : if (audioInput_)
430 1 : audioInput_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
431 0 : asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
432 0 : if (auto shared = w.lock())
433 0 : shared->attachLocalRecorder(ms);
434 0 : });
435 0 : });
436 : }
437 :
438 : void
439 375 : AudioRtpSession::deinitRecorder()
440 : {
441 375 : if (!recorder_)
442 0 : return;
443 375 : if (receiveThread_) {
444 1 : auto ms = receiveThread_->getInfo();
445 1 : ms.name = streamId_ + ":remote";
446 1 : if (auto ob = recorder_->getStream(ms.name)) {
447 0 : receiveThread_->detach(ob);
448 0 : recorder_->removeStream(ms);
449 : }
450 1 : }
451 375 : if (audioInput_) {
452 1 : auto ms = audioInput_->getInfo();
453 1 : ms.name = streamId_ + ":local";
454 1 : if (auto ob = recorder_->getStream(ms.name)) {
455 0 : audioInput_->detach(ob);
456 0 : recorder_->removeStream(ms);
457 : }
458 1 : }
459 : }
460 :
461 : } // namespace jami
|