Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 :
20 : #include "audio_rtp_session.h"
21 :
22 : #include "logger.h"
23 : #include "noncopyable.h"
24 : #include "sip/sdp.h"
25 :
26 : #include "audio_receive_thread.h"
27 : #include "audio_sender.h"
28 : #include "socket_pair.h"
29 : #include "media_recorder.h"
30 : #include "media_encoder.h"
31 : #include "media_decoder.h"
32 : #include "media_io_handle.h"
33 : #include "media_device.h"
34 : #include "media_const.h"
35 :
36 : #include "audio/audio_input.h"
37 : #include "audio/ringbufferpool.h"
38 : #include "audio/resampler.h"
39 : #include "client/videomanager.h"
40 : #include "manager.h"
41 : #include "observer.h"
42 :
43 : #include <asio/io_context.hpp>
44 : #include <asio/post.hpp>
45 : #include <sstream>
46 :
47 : namespace jami {
48 :
49 397 : AudioRtpSession::AudioRtpSession(const std::string& callId,
50 : const std::string& streamId,
51 397 : const std::shared_ptr<MediaRecorder>& rec)
52 : : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
53 1120 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
54 :
55 : {
56 397 : recorder_ = rec;
57 1191 : JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
58 :
59 : // don't move this into the initializer list or Cthulus will emerge
60 397 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
61 397 : }
62 :
63 397 : AudioRtpSession::~AudioRtpSession()
64 : {
65 397 : deinitRecorder();
66 397 : stop();
67 1191 : JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
68 397 : }
69 :
70 : void
71 190 : AudioRtpSession::startSender()
72 : {
73 190 : std::lock_guard lock(mutex_);
74 570 : JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]",
75 : input_,
76 : muteState_ ? "YES" : "NO");
77 :
78 190 : if (not send_.enabled or send_.onHold) {
79 0 : JAMI_WARNING("Audio sending disabled");
80 0 : if (sender_) {
81 0 : if (socketPair_)
82 0 : socketPair_->interrupt();
83 0 : if (audioInput_)
84 0 : audioInput_->detach(sender_.get());
85 0 : sender_.reset();
86 : }
87 0 : return;
88 : }
89 :
90 190 : if (sender_)
91 0 : JAMI_WARNING("Restarting audio sender");
92 190 : if (audioInput_)
93 0 : audioInput_->detach(sender_.get());
94 :
95 190 : bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
96 190 : auto audioInputId = streamId_;
97 190 : if (fileAudio) {
98 0 : auto suffix = input_;
99 0 : static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
100 0 : const auto pos = input_.find(sep);
101 0 : if (pos != std::string::npos) {
102 0 : suffix = input_.substr(pos + sep.size());
103 : }
104 0 : audioInputId = suffix;
105 0 : }
106 :
107 : // sender sets up input correctly, we just keep a reference in case startSender is called
108 190 : audioInput_ = jami::getAudioInput(audioInputId);
109 190 : audioInput_->setRecorderCallback(
110 0 : [w=weak_from_this()](const MediaStream& ms) {
111 0 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
112 0 : if (auto shared = w.lock())
113 0 : shared->attachLocalRecorder(ms);
114 0 : });
115 0 : });
116 190 : audioInput_->setMuted(muteState_);
117 190 : audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
118 190 : if (!fileAudio) {
119 190 : auto newParams = audioInput_->switchInput(input_);
120 : try {
121 190 : if (newParams.valid()
122 190 : && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
123 190 : localAudioParams_ = newParams.get();
124 : } else {
125 0 : JAMI_ERROR("No valid new audio parameters");
126 0 : return;
127 : }
128 0 : } catch (const std::exception& e) {
129 0 : JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
130 0 : return;
131 0 : }
132 190 : }
133 190 : if (streamId_ != audioInput_->getId())
134 0 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(streamId_, audioInput_->getId());
135 :
136 190 : send_.fecEnabled = true;
137 :
138 : // be sure to not send any packets before saving last RTP seq value
139 190 : socketPair_->stopSendOp();
140 190 : if (sender_)
141 0 : initSeqVal_ = sender_->getLastSeqValue() + 1;
142 : try {
143 190 : sender_.reset();
144 190 : socketPair_->stopSendOp(false);
145 190 : sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
146 0 : } catch (const MediaEncoderException& e) {
147 0 : JAMI_ERROR("{}", e.what());
148 0 : send_.enabled = false;
149 0 : }
150 :
151 190 : if (voiceCallback_)
152 190 : sender_->setVoiceCallback(voiceCallback_);
153 :
154 : // NOTE do after sender/encoder are ready
155 190 : auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
156 190 : audioInput_->setFormat(codec->audioformat);
157 190 : audioInput_->attach(sender_.get());
158 :
159 190 : if (not rtcpCheckerThread_.isRunning())
160 190 : rtcpCheckerThread_.start();
161 190 : }
162 :
163 : void
164 0 : AudioRtpSession::restartSender()
165 : {
166 0 : std::lock_guard lock(mutex_);
167 : // ensure that start has been called before restart
168 0 : if (not socketPair_) {
169 0 : return;
170 : }
171 :
172 0 : startSender();
173 0 : }
174 :
175 : void
176 190 : AudioRtpSession::startReceiver()
177 : {
178 190 : if (socketPair_)
179 190 : socketPair_->setReadBlockingMode(true);
180 :
181 190 : if (not receive_.enabled or receive_.onHold) {
182 0 : JAMI_WARNING("Audio receiving disabled");
183 0 : receiveThread_.reset();
184 0 : return;
185 : }
186 :
187 190 : if (receiveThread_)
188 0 : JAMI_WARNING("Restarting audio receiver");
189 :
190 190 : auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
191 190 : receiveThread_.reset(new AudioReceiveThread(streamId_,
192 190 : accountAudioCodec->audioformat,
193 190 : receive_.receiving_sdp,
194 190 : mtu_));
195 :
196 190 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
197 0 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
198 0 : if (auto shared = w.lock())
199 0 : shared->attachRemoteRecorder(ms);
200 0 : });
201 0 : });
202 190 : receiveThread_->addIOContext(*socketPair_);
203 190 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
204 190 : receiveThread_->startReceiver();
205 190 : }
206 :
207 : void
208 190 : AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
209 : {
210 190 : std::lock_guard lock(mutex_);
211 :
212 190 : if (not send_.enabled and not receive_.enabled) {
213 0 : stop();
214 0 : return;
215 : }
216 :
217 : try {
218 190 : if (rtp_sock and rtcp_sock) {
219 182 : if (send_.addr) {
220 182 : rtp_sock->setDefaultRemoteAddress(send_.addr);
221 : }
222 :
223 182 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
224 182 : if (rtcpAddr) {
225 182 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
226 : }
227 :
228 182 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
229 : } else {
230 8 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
231 : }
232 :
233 190 : if (send_.crypto and receive_.crypto) {
234 760 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
235 380 : receive_.crypto.getSrtpKeyInfo().c_str(),
236 380 : send_.crypto.getCryptoSuite().c_str(),
237 380 : send_.crypto.getSrtpKeyInfo().c_str());
238 : }
239 0 : } catch (const std::runtime_error& e) {
240 0 : JAMI_ERROR("Socket creation failed: {}", e.what());
241 0 : return;
242 0 : }
243 :
244 190 : startSender();
245 190 : startReceiver();
246 190 : }
247 :
248 : void
249 847 : AudioRtpSession::stop()
250 : {
251 847 : std::lock_guard lock(mutex_);
252 :
253 2541 : JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
254 :
255 847 : if (not receiveThread_)
256 657 : return;
257 :
258 190 : if (socketPair_)
259 190 : socketPair_->setReadBlockingMode(false);
260 :
261 190 : receiveThread_->stopReceiver();
262 :
263 190 : if (audioInput_)
264 190 : audioInput_->detach(sender_.get());
265 :
266 190 : if (socketPair_)
267 190 : socketPair_->interrupt();
268 :
269 190 : rtcpCheckerThread_.join();
270 :
271 190 : receiveThread_.reset();
272 190 : sender_.reset();
273 190 : socketPair_.reset();
274 190 : audioInput_.reset();
275 847 : }
276 :
277 : void
278 200 : AudioRtpSession::setMuted(bool muted, Direction dir)
279 : {
280 200 : asio::post(*Manager::instance().ioContext(), [w=weak_from_this(), muted, dir]() {
281 200 : if (auto shared = w.lock()) {
282 200 : std::lock_guard lock(shared->mutex_);
283 200 : if (dir == Direction::SEND) {
284 195 : shared->muteState_ = muted;
285 195 : if (shared->audioInput_) {
286 21 : shared->audioInput_->setMuted(muted);
287 : }
288 : } else {
289 5 : if (shared->receiveThread_) {
290 2 : auto ms = shared->receiveThread_->getInfo();
291 2 : ms.name = shared->streamId_ + ":remote";
292 2 : if (muted) {
293 2 : if (auto ob = shared->recorder_->getStream(ms.name)) {
294 0 : shared->receiveThread_->detach(ob);
295 0 : shared->recorder_->removeStream(ms);
296 : }
297 : } else {
298 0 : if (auto ob = shared->recorder_->addStream(ms)) {
299 0 : shared->receiveThread_->attach(ob);
300 : }
301 : }
302 2 : }
303 : }
304 400 : }
305 200 : });
306 200 : }
307 :
308 : void
309 190 : AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
310 : {
311 190 : std::lock_guard lock(mutex_);
312 190 : voiceCallback_ = std::move(cb);
313 190 : if (sender_) {
314 19 : sender_->setVoiceCallback(voiceCallback_);
315 : }
316 190 : }
317 :
318 : bool
319 343 : AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
320 : {
321 343 : auto rtcpInfoVect = socketPair_->getRtcpRR();
322 343 : unsigned totalLost = 0;
323 343 : unsigned totalJitter = 0;
324 343 : unsigned nbDropNotNull = 0;
325 343 : auto vectSize = rtcpInfoVect.size();
326 :
327 343 : if (vectSize != 0) {
328 0 : for (const auto& it : rtcpInfoVect) {
329 0 : if (it.fraction_lost != 0) // Exclude null drop
330 0 : nbDropNotNull++;
331 0 : totalLost += it.fraction_lost;
332 0 : totalJitter += ntohl(it.jitter);
333 : }
334 0 : rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
335 : // Jitter is expressed in timestamp unit -> convert to milliseconds
336 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
337 0 : rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
338 0 : rtcpi.nb_sample = vectSize;
339 0 : rtcpi.latency = socketPair_->getLastLatency();
340 0 : return true;
341 : }
342 343 : return false;
343 343 : }
344 :
345 : void
346 343 : AudioRtpSession::adaptQualityAndBitrate()
347 : {
348 343 : RTCPInfo rtcpi {};
349 343 : if (check_RCTP_Info_RR(rtcpi)) {
350 0 : dropProcessing(&rtcpi);
351 : }
352 343 : }
353 :
354 : void
355 0 : AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
356 : {
357 0 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
358 0 : setNewPacketLoss(pondLoss);
359 0 : }
360 :
361 : void
362 0 : AudioRtpSession::setNewPacketLoss(unsigned int newPL)
363 : {
364 0 : newPL = std::clamp((int) newPL, 0, 100);
365 0 : if (newPL != packetLoss_) {
366 0 : if (sender_) {
367 0 : auto ret = sender_->setPacketLoss(newPL);
368 0 : packetLoss_ = newPL;
369 0 : if (ret == -1)
370 0 : JAMI_ERROR("Fail to access the encoder");
371 : } else {
372 0 : JAMI_ERROR("Fail to access the sender");
373 : }
374 : }
375 0 : }
376 :
377 : float
378 0 : AudioRtpSession::getPonderateLoss(float lastLoss)
379 : {
380 : static float pond = 10.0f;
381 :
382 0 : pond = floor(0.5 * lastLoss + 0.5 * pond);
383 0 : if (lastLoss > pond) {
384 0 : return lastLoss;
385 : } else {
386 0 : return pond;
387 : }
388 : }
389 :
390 : void
391 343 : AudioRtpSession::processRtcpChecker()
392 : {
393 343 : adaptQualityAndBitrate();
394 343 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
395 343 : }
396 :
397 : void
398 0 : AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
399 : {
400 0 : std::lock_guard lock(mutex_);
401 0 : if (!recorder_ || !receiveThread_)
402 0 : return;
403 0 : MediaStream remoteMS = ms;
404 0 : remoteMS.name = streamId_ + ":remote";
405 0 : if (auto ob = recorder_->addStream(remoteMS)) {
406 0 : receiveThread_->attach(ob);
407 : }
408 0 : }
409 :
410 : void
411 0 : AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
412 : {
413 0 : std::lock_guard lock(mutex_);
414 0 : if (!recorder_ || !audioInput_)
415 0 : return;
416 0 : MediaStream localMS = ms;
417 0 : localMS.name = streamId_ + ":local";
418 0 : if (auto ob = recorder_->addStream(localMS)) {
419 0 : audioInput_->attach(ob);
420 : }
421 0 : }
422 :
423 : void
424 8 : AudioRtpSession::initRecorder()
425 : {
426 8 : if (!recorder_)
427 0 : return;
428 8 : if (receiveThread_)
429 3 : receiveThread_->setRecorderCallback(
430 0 : [w=weak_from_this()](const MediaStream& ms) {
431 0 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
432 0 : if (auto shared = w.lock())
433 0 : shared->attachRemoteRecorder(ms);
434 0 : });
435 0 : });
436 8 : if (audioInput_)
437 8 : audioInput_->setRecorderCallback(
438 0 : [w=weak_from_this()](const MediaStream& ms) {
439 0 : asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
440 0 : if (auto shared = w.lock())
441 0 : shared->attachLocalRecorder(ms);
442 0 : });
443 0 : });
444 : }
445 :
446 : void
447 401 : AudioRtpSession::deinitRecorder()
448 : {
449 401 : if (!recorder_)
450 0 : return;
451 401 : if (receiveThread_) {
452 1 : auto ms = receiveThread_->getInfo();
453 1 : ms.name = streamId_ + ":remote";
454 1 : if (auto ob = recorder_->getStream(ms.name)) {
455 0 : receiveThread_->detach(ob);
456 0 : recorder_->removeStream(ms);
457 : }
458 1 : }
459 401 : if (audioInput_) {
460 2 : auto ms = audioInput_->getInfo();
461 2 : ms.name = streamId_ + ":local";
462 2 : if (auto ob = recorder_->getStream(ms.name)) {
463 0 : audioInput_->detach(ob);
464 0 : recorder_->removeStream(ms);
465 : }
466 2 : }
467 : }
468 :
469 : } // namespace jami
|