Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Tristan Matthews <tristan.matthews@savoirfairelinux.com>
5 : * Author: Philippe Gorley <philippe.gorley@savoirfairelinux.com>
6 : *
7 : * This program is free software; you can redistribute it and/or modify
8 : * it under the terms of the GNU General Public License as published by
9 : * the Free Software Foundation; either version 3 of the License, or
10 : * (at your option) any later version.
11 : *
12 : * This program is distributed in the hope that it will be useful,
13 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : * GNU General Public License for more details.
16 : *
17 : * You should have received a copy of the GNU General Public License
18 : * along with this program; if not, write to the Free Software
19 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
23 :
24 : #include "audio_rtp_session.h"
25 :
26 : #include "logger.h"
27 : #include "noncopyable.h"
28 : #include "sip/sdp.h"
29 :
30 : #include "audio_receive_thread.h"
31 : #include "audio_sender.h"
32 : #include "socket_pair.h"
33 : #include "media_recorder.h"
34 : #include "media_encoder.h"
35 : #include "media_decoder.h"
36 : #include "media_io_handle.h"
37 : #include "media_device.h"
38 : #include "media_const.h"
39 :
40 : #include "audio/audio_input.h"
41 : #include "audio/ringbufferpool.h"
42 : #include "audio/resampler.h"
43 : #include "client/videomanager.h"
44 : #include "manager.h"
45 : #include "observer.h"
46 :
47 : #include <asio/io_context.hpp>
48 : #include <sstream>
49 :
50 : namespace jami {
51 :
52 408 : AudioRtpSession::AudioRtpSession(const std::string& callId,
53 : const std::string& streamId,
54 408 : const std::shared_ptr<MediaRecorder>& rec)
55 : : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
56 1113 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
57 :
58 : {
59 408 : recorder_ = rec;
60 1224 : JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
61 :
62 : // don't move this into the initializer list or Cthulus will emerge
63 408 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
64 408 : }
65 :
66 407 : AudioRtpSession::~AudioRtpSession()
67 : {
68 407 : deinitRecorder();
69 408 : stop();
70 1224 : JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
71 408 : }
72 :
73 : void
74 187 : AudioRtpSession::startSender()
75 : {
76 187 : std::lock_guard lock(mutex_);
77 561 : JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]",
78 : input_,
79 : muteState_ ? "YES" : "NO");
80 :
81 187 : if (not send_.enabled or send_.onHold) {
82 0 : JAMI_WARNING("Audio sending disabled");
83 0 : if (sender_) {
84 0 : if (socketPair_)
85 0 : socketPair_->interrupt();
86 0 : if (audioInput_)
87 0 : audioInput_->detach(sender_.get());
88 0 : sender_.reset();
89 : }
90 0 : return;
91 : }
92 :
93 187 : if (sender_)
94 0 : JAMI_WARNING("Restarting audio sender");
95 187 : if (audioInput_)
96 0 : audioInput_->detach(sender_.get());
97 :
98 187 : bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
99 187 : auto audioInputId = streamId_;
100 187 : if (fileAudio) {
101 0 : auto suffix = input_;
102 0 : static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
103 0 : const auto pos = input_.find(sep);
104 0 : if (pos != std::string::npos) {
105 0 : suffix = input_.substr(pos + sep.size());
106 : }
107 0 : audioInputId = suffix;
108 0 : }
109 :
110 : // sender sets up input correctly, we just keep a reference in case startSender is called
111 187 : audioInput_ = jami::getAudioInput(audioInputId);
112 187 : audioInput_->setRecorderCallback(
113 0 : [w=weak_from_this()](const MediaStream& ms) {
114 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
115 0 : if (auto shared = w.lock())
116 0 : shared->attachLocalRecorder(ms);
117 0 : });
118 0 : });
119 187 : audioInput_->setMuted(muteState_);
120 187 : audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
121 187 : if (!fileAudio) {
122 187 : auto newParams = audioInput_->switchInput(input_);
123 : try {
124 187 : if (newParams.valid()
125 187 : && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
126 187 : localAudioParams_ = newParams.get();
127 : } else {
128 0 : JAMI_ERROR("No valid new audio parameters");
129 0 : return;
130 : }
131 0 : } catch (const std::exception& e) {
132 0 : JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
133 0 : return;
134 0 : }
135 187 : }
136 187 : if (streamId_ != audioInput_->getId())
137 0 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(streamId_, audioInput_->getId());
138 :
139 187 : send_.fecEnabled = true;
140 :
141 : // be sure to not send any packets before saving last RTP seq value
142 187 : socketPair_->stopSendOp();
143 187 : if (sender_)
144 0 : initSeqVal_ = sender_->getLastSeqValue() + 1;
145 : try {
146 187 : sender_.reset();
147 187 : socketPair_->stopSendOp(false);
148 187 : sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
149 0 : } catch (const MediaEncoderException& e) {
150 0 : JAMI_ERROR("{}", e.what());
151 0 : send_.enabled = false;
152 0 : }
153 :
154 187 : if (voiceCallback_)
155 187 : sender_->setVoiceCallback(voiceCallback_);
156 :
157 : // NOTE do after sender/encoder are ready
158 187 : auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
159 187 : audioInput_->setFormat(codec->audioformat);
160 187 : audioInput_->attach(sender_.get());
161 :
162 187 : if (not rtcpCheckerThread_.isRunning())
163 187 : rtcpCheckerThread_.start();
164 187 : }
165 :
166 : void
167 0 : AudioRtpSession::restartSender()
168 : {
169 0 : std::lock_guard lock(mutex_);
170 : // ensure that start has been called before restart
171 0 : if (not socketPair_) {
172 0 : return;
173 : }
174 :
175 0 : startSender();
176 0 : }
177 :
178 : void
179 187 : AudioRtpSession::startReceiver()
180 : {
181 187 : if (socketPair_)
182 187 : socketPair_->setReadBlockingMode(true);
183 :
184 187 : if (not receive_.enabled or receive_.onHold) {
185 0 : JAMI_WARNING("Audio receiving disabled");
186 0 : receiveThread_.reset();
187 0 : return;
188 : }
189 :
190 187 : if (receiveThread_)
191 0 : JAMI_WARNING("Restarting audio receiver");
192 :
193 187 : auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
194 187 : receiveThread_.reset(new AudioReceiveThread(streamId_,
195 187 : accountAudioCodec->audioformat,
196 187 : receive_.receiving_sdp,
197 187 : mtu_));
198 :
199 187 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
200 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
201 0 : if (auto shared = w.lock())
202 0 : shared->attachRemoteRecorder(ms);
203 0 : });
204 0 : });
205 187 : receiveThread_->addIOContext(*socketPair_);
206 187 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
207 187 : receiveThread_->startReceiver();
208 187 : }
209 :
210 : void
211 188 : AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
212 : {
213 188 : std::lock_guard lock(mutex_);
214 :
215 188 : if (not send_.enabled and not receive_.enabled) {
216 1 : stop();
217 1 : return;
218 : }
219 :
220 : try {
221 187 : if (rtp_sock and rtcp_sock) {
222 180 : if (send_.addr) {
223 180 : rtp_sock->setDefaultRemoteAddress(send_.addr);
224 : }
225 :
226 180 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
227 180 : if (rtcpAddr) {
228 180 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
229 : }
230 :
231 180 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
232 : } else {
233 7 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
234 : }
235 :
236 187 : if (send_.crypto and receive_.crypto) {
237 748 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
238 374 : receive_.crypto.getSrtpKeyInfo().c_str(),
239 374 : send_.crypto.getCryptoSuite().c_str(),
240 374 : send_.crypto.getSrtpKeyInfo().c_str());
241 : }
242 0 : } catch (const std::runtime_error& e) {
243 0 : JAMI_ERROR("Socket creation failed: {}", e.what());
244 0 : return;
245 0 : }
246 :
247 187 : startSender();
248 187 : startReceiver();
249 188 : }
250 :
251 : void
252 864 : AudioRtpSession::stop()
253 : {
254 864 : std::lock_guard lock(mutex_);
255 :
256 2592 : JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
257 :
258 864 : if (not receiveThread_)
259 677 : return;
260 :
261 187 : if (socketPair_)
262 187 : socketPair_->setReadBlockingMode(false);
263 :
264 187 : receiveThread_->stopReceiver();
265 :
266 187 : if (audioInput_)
267 187 : audioInput_->detach(sender_.get());
268 :
269 187 : if (socketPair_)
270 187 : socketPair_->interrupt();
271 :
272 187 : rtcpCheckerThread_.join();
273 :
274 187 : receiveThread_.reset();
275 187 : sender_.reset();
276 187 : socketPair_.reset();
277 187 : audioInput_.reset();
278 864 : }
279 :
280 : void
281 207 : AudioRtpSession::setMuted(bool muted, Direction dir)
282 : {
283 207 : Manager::instance().ioContext()->post([w=weak_from_this(), muted, dir]() {
284 207 : if (auto shared = w.lock()) {
285 207 : std::lock_guard lock(shared->mutex_);
286 207 : if (dir == Direction::SEND) {
287 197 : shared->muteState_ = muted;
288 197 : if (shared->audioInput_) {
289 24 : shared->audioInput_->setMuted(muted);
290 : }
291 : } else {
292 10 : if (shared->receiveThread_) {
293 5 : auto ms = shared->receiveThread_->getInfo();
294 5 : ms.name = shared->streamId_ + ":remote";
295 5 : if (muted) {
296 2 : if (auto ob = shared->recorder_->getStream(ms.name)) {
297 0 : shared->receiveThread_->detach(ob);
298 0 : shared->recorder_->removeStream(ms);
299 : }
300 : } else {
301 3 : if (auto ob = shared->recorder_->addStream(ms)) {
302 0 : shared->receiveThread_->attach(ob);
303 : }
304 : }
305 5 : }
306 : }
307 414 : }
308 207 : });
309 207 : }
310 :
311 : void
312 187 : AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
313 : {
314 187 : std::lock_guard lock(mutex_);
315 187 : voiceCallback_ = std::move(cb);
316 187 : if (sender_) {
317 19 : sender_->setVoiceCallback(voiceCallback_);
318 : }
319 187 : }
320 :
321 : bool
322 331 : AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
323 : {
324 331 : auto rtcpInfoVect = socketPair_->getRtcpRR();
325 331 : unsigned totalLost = 0;
326 331 : unsigned totalJitter = 0;
327 331 : unsigned nbDropNotNull = 0;
328 331 : auto vectSize = rtcpInfoVect.size();
329 :
330 331 : if (vectSize != 0) {
331 0 : for (const auto& it : rtcpInfoVect) {
332 0 : if (it.fraction_lost != 0) // Exclude null drop
333 0 : nbDropNotNull++;
334 0 : totalLost += it.fraction_lost;
335 0 : totalJitter += ntohl(it.jitter);
336 : }
337 0 : rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
338 : // Jitter is expressed in timestamp unit -> convert to milliseconds
339 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
340 0 : rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
341 0 : rtcpi.nb_sample = vectSize;
342 0 : rtcpi.latency = socketPair_->getLastLatency();
343 0 : return true;
344 : }
345 331 : return false;
346 331 : }
347 :
348 : void
349 331 : AudioRtpSession::adaptQualityAndBitrate()
350 : {
351 331 : RTCPInfo rtcpi {};
352 331 : if (check_RCTP_Info_RR(rtcpi)) {
353 0 : dropProcessing(&rtcpi);
354 : }
355 331 : }
356 :
357 : void
358 0 : AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
359 : {
360 0 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
361 0 : setNewPacketLoss(pondLoss);
362 0 : }
363 :
364 : void
365 0 : AudioRtpSession::setNewPacketLoss(unsigned int newPL)
366 : {
367 0 : newPL = std::clamp((int) newPL, 0, 100);
368 0 : if (newPL != packetLoss_) {
369 0 : if (sender_) {
370 0 : auto ret = sender_->setPacketLoss(newPL);
371 0 : packetLoss_ = newPL;
372 0 : if (ret == -1)
373 0 : JAMI_ERROR("Fail to access the encoder");
374 : } else {
375 0 : JAMI_ERROR("Fail to access the sender");
376 : }
377 : }
378 0 : }
379 :
380 : float
381 0 : AudioRtpSession::getPonderateLoss(float lastLoss)
382 : {
383 : static float pond = 10.0f;
384 :
385 0 : pond = floor(0.5 * lastLoss + 0.5 * pond);
386 0 : if (lastLoss > pond) {
387 0 : return lastLoss;
388 : } else {
389 0 : return pond;
390 : }
391 : }
392 :
393 : void
394 331 : AudioRtpSession::processRtcpChecker()
395 : {
396 331 : adaptQualityAndBitrate();
397 331 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
398 331 : }
399 :
400 : void
401 0 : AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
402 : {
403 0 : std::lock_guard lock(mutex_);
404 0 : if (!recorder_ || !receiveThread_)
405 0 : return;
406 0 : MediaStream remoteMS = ms;
407 0 : remoteMS.name = streamId_ + ":remote";
408 0 : if (auto ob = recorder_->addStream(remoteMS)) {
409 0 : receiveThread_->attach(ob);
410 : }
411 0 : }
412 :
413 : void
414 0 : AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
415 : {
416 0 : std::lock_guard lock(mutex_);
417 0 : if (!recorder_ || !audioInput_)
418 0 : return;
419 0 : MediaStream localMS = ms;
420 0 : localMS.name = streamId_ + ":local";
421 0 : if (auto ob = recorder_->addStream(localMS)) {
422 0 : audioInput_->attach(ob);
423 : }
424 0 : }
425 :
426 : void
427 10 : AudioRtpSession::initRecorder()
428 : {
429 10 : if (!recorder_)
430 0 : return;
431 10 : if (receiveThread_)
432 6 : receiveThread_->setRecorderCallback(
433 0 : [w=weak_from_this()](const MediaStream& ms) {
434 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
435 0 : if (auto shared = w.lock())
436 0 : shared->attachRemoteRecorder(ms);
437 0 : });
438 0 : });
439 10 : if (audioInput_)
440 10 : audioInput_->setRecorderCallback(
441 0 : [w=weak_from_this()](const MediaStream& ms) {
442 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
443 0 : if (auto shared = w.lock())
444 0 : shared->attachLocalRecorder(ms);
445 0 : });
446 0 : });
447 : }
448 :
449 : void
450 414 : AudioRtpSession::deinitRecorder()
451 : {
452 414 : if (!recorder_)
453 0 : return;
454 414 : if (receiveThread_) {
455 4 : auto ms = receiveThread_->getInfo();
456 4 : ms.name = streamId_ + ":remote";
457 4 : if (auto ob = recorder_->getStream(ms.name)) {
458 0 : receiveThread_->detach(ob);
459 0 : recorder_->removeStream(ms);
460 : }
461 4 : }
462 414 : if (audioInput_) {
463 5 : auto ms = audioInput_->getInfo();
464 5 : ms.name = streamId_ + ":local";
465 5 : if (auto ob = recorder_->getStream(ms.name)) {
466 0 : audioInput_->detach(ob);
467 0 : recorder_->removeStream(ms);
468 : }
469 5 : }
470 : }
471 :
472 : } // namespace jami
|