Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "libav_deps.h" // MUST BE INCLUDED FIRST
19 :
20 : #include "audio_rtp_session.h"
21 :
22 : #include "logger.h"
23 : #include "noncopyable.h"
24 : #include "sip/sdp.h"
25 :
26 : #include "audio_receive_thread.h"
27 : #include "audio_sender.h"
28 : #include "socket_pair.h"
29 : #include "media_recorder.h"
30 : #include "media_encoder.h"
31 : #include "media_decoder.h"
32 : #include "media_io_handle.h"
33 : #include "media_device.h"
34 : #include "media_const.h"
35 :
36 : #include "audio/audio_input.h"
37 : #include "audio/ringbufferpool.h"
38 : #include "audio/resampler.h"
39 : #include "client/videomanager.h"
40 : #include "manager.h"
41 : #include "observer.h"
42 :
43 : #include <asio/io_context.hpp>
44 : #include <sstream>
45 :
46 : namespace jami {
47 :
48 397 : AudioRtpSession::AudioRtpSession(const std::string& callId,
49 : const std::string& streamId,
50 397 : const std::shared_ptr<MediaRecorder>& rec)
51 : : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
52 1125 : , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
53 :
54 : {
55 397 : recorder_ = rec;
56 1191 : JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
57 :
58 : // don't move this into the initializer list or Cthulus will emerge
59 397 : ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
60 397 : }
61 :
62 397 : AudioRtpSession::~AudioRtpSession()
63 : {
64 397 : deinitRecorder();
65 397 : stop();
66 1191 : JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
67 397 : }
68 :
69 : void
70 191 : AudioRtpSession::startSender()
71 : {
72 191 : std::lock_guard lock(mutex_);
73 573 : JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]",
74 : input_,
75 : muteState_ ? "YES" : "NO");
76 :
77 191 : if (not send_.enabled or send_.onHold) {
78 0 : JAMI_WARNING("Audio sending disabled");
79 0 : if (sender_) {
80 0 : if (socketPair_)
81 0 : socketPair_->interrupt();
82 0 : if (audioInput_)
83 0 : audioInput_->detach(sender_.get());
84 0 : sender_.reset();
85 : }
86 0 : return;
87 : }
88 :
89 191 : if (sender_)
90 0 : JAMI_WARNING("Restarting audio sender");
91 191 : if (audioInput_)
92 0 : audioInput_->detach(sender_.get());
93 :
94 191 : bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
95 191 : auto audioInputId = streamId_;
96 191 : if (fileAudio) {
97 0 : auto suffix = input_;
98 0 : static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
99 0 : const auto pos = input_.find(sep);
100 0 : if (pos != std::string::npos) {
101 0 : suffix = input_.substr(pos + sep.size());
102 : }
103 0 : audioInputId = suffix;
104 0 : }
105 :
106 : // sender sets up input correctly, we just keep a reference in case startSender is called
107 191 : audioInput_ = jami::getAudioInput(audioInputId);
108 191 : audioInput_->setRecorderCallback(
109 0 : [w=weak_from_this()](const MediaStream& ms) {
110 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
111 0 : if (auto shared = w.lock())
112 0 : shared->attachLocalRecorder(ms);
113 0 : });
114 0 : });
115 191 : audioInput_->setMuted(muteState_);
116 191 : audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
117 191 : if (!fileAudio) {
118 191 : auto newParams = audioInput_->switchInput(input_);
119 : try {
120 191 : if (newParams.valid()
121 191 : && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
122 191 : localAudioParams_ = newParams.get();
123 : } else {
124 0 : JAMI_ERROR("No valid new audio parameters");
125 0 : return;
126 : }
127 0 : } catch (const std::exception& e) {
128 0 : JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
129 0 : return;
130 0 : }
131 191 : }
132 191 : if (streamId_ != audioInput_->getId())
133 0 : Manager::instance().getRingBufferPool().bindHalfDuplexOut(streamId_, audioInput_->getId());
134 :
135 191 : send_.fecEnabled = true;
136 :
137 : // be sure to not send any packets before saving last RTP seq value
138 191 : socketPair_->stopSendOp();
139 191 : if (sender_)
140 0 : initSeqVal_ = sender_->getLastSeqValue() + 1;
141 : try {
142 191 : sender_.reset();
143 191 : socketPair_->stopSendOp(false);
144 191 : sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
145 0 : } catch (const MediaEncoderException& e) {
146 0 : JAMI_ERROR("{}", e.what());
147 0 : send_.enabled = false;
148 0 : }
149 :
150 191 : if (voiceCallback_)
151 191 : sender_->setVoiceCallback(voiceCallback_);
152 :
153 : // NOTE do after sender/encoder are ready
154 191 : auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
155 191 : audioInput_->setFormat(codec->audioformat);
156 191 : audioInput_->attach(sender_.get());
157 :
158 191 : if (not rtcpCheckerThread_.isRunning())
159 191 : rtcpCheckerThread_.start();
160 191 : }
161 :
162 : void
163 0 : AudioRtpSession::restartSender()
164 : {
165 0 : std::lock_guard lock(mutex_);
166 : // ensure that start has been called before restart
167 0 : if (not socketPair_) {
168 0 : return;
169 : }
170 :
171 0 : startSender();
172 0 : }
173 :
174 : void
175 191 : AudioRtpSession::startReceiver()
176 : {
177 191 : if (socketPair_)
178 191 : socketPair_->setReadBlockingMode(true);
179 :
180 191 : if (not receive_.enabled or receive_.onHold) {
181 0 : JAMI_WARNING("Audio receiving disabled");
182 0 : receiveThread_.reset();
183 0 : return;
184 : }
185 :
186 191 : if (receiveThread_)
187 0 : JAMI_WARNING("Restarting audio receiver");
188 :
189 191 : auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
190 191 : receiveThread_.reset(new AudioReceiveThread(streamId_,
191 191 : accountAudioCodec->audioformat,
192 191 : receive_.receiving_sdp,
193 191 : mtu_));
194 :
195 191 : receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
196 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
197 0 : if (auto shared = w.lock())
198 0 : shared->attachRemoteRecorder(ms);
199 0 : });
200 0 : });
201 191 : receiveThread_->addIOContext(*socketPair_);
202 191 : receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
203 191 : receiveThread_->startReceiver();
204 191 : }
205 :
206 : void
207 191 : AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
208 : {
209 191 : std::lock_guard lock(mutex_);
210 :
211 191 : if (not send_.enabled and not receive_.enabled) {
212 0 : stop();
213 0 : return;
214 : }
215 :
216 : try {
217 191 : if (rtp_sock and rtcp_sock) {
218 182 : if (send_.addr) {
219 182 : rtp_sock->setDefaultRemoteAddress(send_.addr);
220 : }
221 :
222 182 : auto& rtcpAddr = send_.rtcp_addr ? send_.rtcp_addr : send_.addr;
223 182 : if (rtcpAddr) {
224 182 : rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
225 : }
226 :
227 182 : socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
228 : } else {
229 9 : socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
230 : }
231 :
232 191 : if (send_.crypto and receive_.crypto) {
233 764 : socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
234 382 : receive_.crypto.getSrtpKeyInfo().c_str(),
235 382 : send_.crypto.getCryptoSuite().c_str(),
236 382 : send_.crypto.getSrtpKeyInfo().c_str());
237 : }
238 0 : } catch (const std::runtime_error& e) {
239 0 : JAMI_ERROR("Socket creation failed: {}", e.what());
240 0 : return;
241 0 : }
242 :
243 191 : startSender();
244 191 : startReceiver();
245 191 : }
246 :
247 : void
248 845 : AudioRtpSession::stop()
249 : {
250 845 : std::lock_guard lock(mutex_);
251 :
252 2535 : JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
253 :
254 845 : if (not receiveThread_)
255 654 : return;
256 :
257 191 : if (socketPair_)
258 191 : socketPair_->setReadBlockingMode(false);
259 :
260 191 : receiveThread_->stopReceiver();
261 :
262 191 : if (audioInput_)
263 191 : audioInput_->detach(sender_.get());
264 :
265 191 : if (socketPair_)
266 191 : socketPair_->interrupt();
267 :
268 191 : rtcpCheckerThread_.join();
269 :
270 191 : receiveThread_.reset();
271 191 : sender_.reset();
272 191 : socketPair_.reset();
273 191 : audioInput_.reset();
274 845 : }
275 :
276 : void
277 201 : AudioRtpSession::setMuted(bool muted, Direction dir)
278 : {
279 201 : Manager::instance().ioContext()->post([w=weak_from_this(), muted, dir]() {
280 201 : if (auto shared = w.lock()) {
281 201 : std::lock_guard lock(shared->mutex_);
282 201 : if (dir == Direction::SEND) {
283 196 : shared->muteState_ = muted;
284 196 : if (shared->audioInput_) {
285 22 : shared->audioInput_->setMuted(muted);
286 : }
287 : } else {
288 5 : if (shared->receiveThread_) {
289 2 : auto ms = shared->receiveThread_->getInfo();
290 2 : ms.name = shared->streamId_ + ":remote";
291 2 : if (muted) {
292 2 : if (auto ob = shared->recorder_->getStream(ms.name)) {
293 0 : shared->receiveThread_->detach(ob);
294 0 : shared->recorder_->removeStream(ms);
295 : }
296 : } else {
297 0 : if (auto ob = shared->recorder_->addStream(ms)) {
298 0 : shared->receiveThread_->attach(ob);
299 : }
300 : }
301 2 : }
302 : }
303 402 : }
304 201 : });
305 201 : }
306 :
307 : void
308 191 : AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
309 : {
310 191 : std::lock_guard lock(mutex_);
311 191 : voiceCallback_ = std::move(cb);
312 191 : if (sender_) {
313 19 : sender_->setVoiceCallback(voiceCallback_);
314 : }
315 191 : }
316 :
317 : bool
318 346 : AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
319 : {
320 346 : auto rtcpInfoVect = socketPair_->getRtcpRR();
321 346 : unsigned totalLost = 0;
322 346 : unsigned totalJitter = 0;
323 346 : unsigned nbDropNotNull = 0;
324 346 : auto vectSize = rtcpInfoVect.size();
325 :
326 346 : if (vectSize != 0) {
327 0 : for (const auto& it : rtcpInfoVect) {
328 0 : if (it.fraction_lost != 0) // Exclude null drop
329 0 : nbDropNotNull++;
330 0 : totalLost += it.fraction_lost;
331 0 : totalJitter += ntohl(it.jitter);
332 : }
333 0 : rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
334 : // Jitter is expressed in timestamp unit -> convert to milliseconds
335 : // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
336 0 : rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
337 0 : rtcpi.nb_sample = vectSize;
338 0 : rtcpi.latency = socketPair_->getLastLatency();
339 0 : return true;
340 : }
341 346 : return false;
342 346 : }
343 :
344 : void
345 346 : AudioRtpSession::adaptQualityAndBitrate()
346 : {
347 346 : RTCPInfo rtcpi {};
348 346 : if (check_RCTP_Info_RR(rtcpi)) {
349 0 : dropProcessing(&rtcpi);
350 : }
351 346 : }
352 :
353 : void
354 0 : AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
355 : {
356 0 : auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
357 0 : setNewPacketLoss(pondLoss);
358 0 : }
359 :
360 : void
361 0 : AudioRtpSession::setNewPacketLoss(unsigned int newPL)
362 : {
363 0 : newPL = std::clamp((int) newPL, 0, 100);
364 0 : if (newPL != packetLoss_) {
365 0 : if (sender_) {
366 0 : auto ret = sender_->setPacketLoss(newPL);
367 0 : packetLoss_ = newPL;
368 0 : if (ret == -1)
369 0 : JAMI_ERROR("Fail to access the encoder");
370 : } else {
371 0 : JAMI_ERROR("Fail to access the sender");
372 : }
373 : }
374 0 : }
375 :
376 : float
377 0 : AudioRtpSession::getPonderateLoss(float lastLoss)
378 : {
379 : static float pond = 10.0f;
380 :
381 0 : pond = floor(0.5 * lastLoss + 0.5 * pond);
382 0 : if (lastLoss > pond) {
383 0 : return lastLoss;
384 : } else {
385 0 : return pond;
386 : }
387 : }
388 :
389 : void
390 346 : AudioRtpSession::processRtcpChecker()
391 : {
392 346 : adaptQualityAndBitrate();
393 346 : socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
394 346 : }
395 :
396 : void
397 0 : AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
398 : {
399 0 : std::lock_guard lock(mutex_);
400 0 : if (!recorder_ || !receiveThread_)
401 0 : return;
402 0 : MediaStream remoteMS = ms;
403 0 : remoteMS.name = streamId_ + ":remote";
404 0 : if (auto ob = recorder_->addStream(remoteMS)) {
405 0 : receiveThread_->attach(ob);
406 : }
407 0 : }
408 :
409 : void
410 0 : AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
411 : {
412 0 : std::lock_guard lock(mutex_);
413 0 : if (!recorder_ || !audioInput_)
414 0 : return;
415 0 : MediaStream localMS = ms;
416 0 : localMS.name = streamId_ + ":local";
417 0 : if (auto ob = recorder_->addStream(localMS)) {
418 0 : audioInput_->attach(ob);
419 : }
420 0 : }
421 :
422 : void
423 10 : AudioRtpSession::initRecorder()
424 : {
425 10 : if (!recorder_)
426 0 : return;
427 10 : if (receiveThread_)
428 6 : receiveThread_->setRecorderCallback(
429 0 : [w=weak_from_this()](const MediaStream& ms) {
430 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
431 0 : if (auto shared = w.lock())
432 0 : shared->attachRemoteRecorder(ms);
433 0 : });
434 0 : });
435 10 : if (audioInput_)
436 10 : audioInput_->setRecorderCallback(
437 0 : [w=weak_from_this()](const MediaStream& ms) {
438 0 : Manager::instance().ioContext()->post([w=std::move(w), ms]() {
439 0 : if (auto shared = w.lock())
440 0 : shared->attachLocalRecorder(ms);
441 0 : });
442 0 : });
443 : }
444 :
445 : void
446 403 : AudioRtpSession::deinitRecorder()
447 : {
448 403 : if (!recorder_)
449 0 : return;
450 403 : if (receiveThread_) {
451 4 : auto ms = receiveThread_->getInfo();
452 4 : ms.name = streamId_ + ":remote";
453 4 : if (auto ob = recorder_->getStream(ms.name)) {
454 0 : receiveThread_->detach(ob);
455 0 : recorder_->removeStream(ms);
456 : }
457 4 : }
458 403 : if (audioInput_) {
459 5 : auto ms = audioInput_->getInfo();
460 5 : ms.name = streamId_ + ":local";
461 5 : if (auto ob = recorder_->getStream(ms.name)) {
462 0 : audioInput_->detach(ob);
463 0 : recorder_->removeStream(ms);
464 : }
465 5 : }
466 : }
467 :
468 : } // namespace jami
|