Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "ringbufferpool.h"
19 : #include "ringbuffer.h"
20 : #include "logger.h"
21 :
22 : #include <limits>
23 : #include <utility> // for std::pair
24 : #include <cstring>
25 : #include <algorithm>
26 :
27 : namespace jami {
28 :
29 : const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
30 :
31 38 : RingBufferPool::RingBufferPool()
32 38 : : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
33 38 : {}
34 :
35 38 : RingBufferPool::~RingBufferPool()
36 : {
37 38 : readBindingsMap_.clear();
38 38 : defaultRingBuffer_.reset();
39 :
40 : // Verify ringbuffer not removed yet
41 : // XXXX: With a good design this should never happen! :-P
42 189 : for (const auto& item : ringBufferMap_) {
43 151 : const auto& weak = item.second;
44 151 : if (not weak.expired())
45 0 : JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
46 : }
47 38 : }
48 :
49 : void
50 0 : RingBufferPool::setInternalSamplingRate(unsigned sr)
51 : {
52 0 : std::lock_guard lk(stateLock_);
53 :
54 0 : if (sr != internalAudioFormat_.sample_rate) {
55 0 : flushAllBuffers();
56 0 : internalAudioFormat_.sample_rate = sr;
57 : }
58 0 : }
59 :
60 : void
61 0 : RingBufferPool::setInternalAudioFormat(AudioFormat format)
62 : {
63 0 : std::lock_guard lk(stateLock_);
64 :
65 0 : if (format != internalAudioFormat_) {
66 0 : flushAllBuffers();
67 0 : internalAudioFormat_ = format;
68 0 : for (auto& wrb : ringBufferMap_)
69 0 : if (auto rb = wrb.second.lock())
70 0 : rb->setFormat(internalAudioFormat_);
71 : }
72 0 : }
73 :
74 : std::shared_ptr<RingBuffer>
75 2317 : RingBufferPool::getRingBuffer(const std::string& id)
76 : {
77 2317 : std::lock_guard lk(stateLock_);
78 :
79 2317 : const auto& it = ringBufferMap_.find(id);
80 2317 : if (it != ringBufferMap_.cend()) {
81 1846 : if (const auto& sptr = it->second.lock())
82 1846 : return sptr;
83 10 : ringBufferMap_.erase(it);
84 : }
85 :
86 481 : return nullptr;
87 2317 : }
88 :
89 : std::shared_ptr<RingBuffer>
90 0 : RingBufferPool::getRingBuffer(const std::string& id) const
91 : {
92 0 : std::lock_guard lk(stateLock_);
93 :
94 0 : const auto& it = ringBufferMap_.find(id);
95 0 : if (it != ringBufferMap_.cend())
96 0 : return it->second.lock();
97 :
98 0 : return nullptr;
99 0 : }
100 :
101 : std::shared_ptr<RingBuffer>
102 842 : RingBufferPool::createRingBuffer(const std::string& id)
103 : {
104 842 : std::lock_guard lk(stateLock_);
105 :
106 842 : auto rbuf = getRingBuffer(id);
107 842 : if (rbuf) {
108 1444 : JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
109 361 : return rbuf;
110 : }
111 :
112 481 : rbuf.reset(new RingBuffer(id, internalAudioFormat_));
113 481 : ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
114 481 : return rbuf;
115 842 : }
116 :
117 : const RingBufferPool::ReadBindings*
118 0 : RingBufferPool::getReadBindings(const std::string& ringbufferId) const
119 : {
120 0 : const auto& iter = readBindingsMap_.find(ringbufferId);
121 0 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
122 : }
123 :
124 : RingBufferPool::ReadBindings*
125 39423 : RingBufferPool::getReadBindings(const std::string& ringbufferId)
126 : {
127 39423 : const auto& iter = readBindingsMap_.find(ringbufferId);
128 39423 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
129 : }
130 :
131 : void
132 327 : RingBufferPool::removeReadBindings(const std::string& ringbufferId)
133 : {
134 327 : if (not readBindingsMap_.erase(ringbufferId))
135 0 : JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
136 327 : }
137 :
138 : void
139 753 : RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer>& sourceBuffer, const std::string& readerBufferId)
140 : {
141 753 : if (readerBufferId != DEFAULT_ID and sourceBuffer->getId() == readerBufferId)
142 20 : JAMI_WARNING("RingBuffer has a readoffset on itself");
143 :
144 753 : sourceBuffer->createReadOffset(readerBufferId);
145 753 : readBindingsMap_[readerBufferId].insert(sourceBuffer);
146 753 : }
147 :
148 : void
149 788 : RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer>& sourceBuffer,
150 : const std::string& readerBufferId)
151 : {
152 788 : if (auto bindings = getReadBindings(readerBufferId)) {
153 728 : bindings->erase(sourceBuffer);
154 728 : if (bindings->empty())
155 327 : removeReadBindings(readerBufferId);
156 : }
157 :
158 788 : sourceBuffer->removeReadOffset(readerBufferId);
159 788 : }
160 :
161 : void
162 369 : RingBufferPool::bindRingBuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
163 : {
164 1476 : JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
165 :
166 369 : const auto& rb1 = getRingBuffer(ringbufferId1);
167 369 : if (not rb1) {
168 0 : JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
169 0 : return;
170 : }
171 :
172 369 : const auto& rb2 = getRingBuffer(ringbufferId2);
173 369 : if (not rb2) {
174 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
175 0 : return;
176 : }
177 :
178 369 : std::lock_guard lk(stateLock_);
179 :
180 369 : addReaderToRingBuffer(rb1, ringbufferId2);
181 369 : addReaderToRingBuffer(rb2, ringbufferId1);
182 369 : }
183 :
184 : void
185 15 : RingBufferPool::bindHalfDuplexOut(const std::string& readerBufferId, const std::string& sourceBufferId)
186 : {
187 : /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
188 : * do nothing */
189 15 : if (const auto& rb = getRingBuffer(sourceBufferId)) {
190 15 : std::lock_guard lk(stateLock_);
191 :
192 : // p1 est le binding de p2 (p2 lit le stream de p1)
193 15 : addReaderToRingBuffer(rb, readerBufferId);
194 30 : }
195 15 : }
196 :
197 : void
198 70 : RingBufferPool::unbindRingBuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
199 : {
200 280 : JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
201 :
202 70 : const auto& rb1 = getRingBuffer(ringbufferId1);
203 70 : if (not rb1) {
204 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
205 0 : return;
206 : }
207 :
208 70 : const auto& rb2 = getRingBuffer(ringbufferId2);
209 70 : if (not rb2) {
210 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
211 0 : return;
212 : }
213 :
214 70 : std::lock_guard lk(stateLock_);
215 :
216 70 : removeReaderFromRingBuffer(rb1, ringbufferId2);
217 70 : removeReaderFromRingBuffer(rb2, ringbufferId1);
218 70 : }
219 :
220 : void
221 18 : RingBufferPool::unBindHalfDuplexOut(const std::string& readerBufferId, const std::string& sourceBufferId)
222 : {
223 18 : std::lock_guard lk(stateLock_);
224 :
225 18 : if (const auto& rb = getRingBuffer(sourceBufferId))
226 18 : removeReaderFromRingBuffer(rb, readerBufferId);
227 18 : }
228 :
229 : void
230 0 : RingBufferPool::unBindAllHalfDuplexOut(const std::string& ringbufferId)
231 : {
232 0 : const auto& rb = getRingBuffer(ringbufferId);
233 0 : if (not rb) {
234 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
235 0 : return;
236 : }
237 0 : std::lock_guard lk(stateLock_);
238 0 : auto bindings = getReadBindings(ringbufferId);
239 0 : if (not bindings)
240 0 : return;
241 0 : const auto bindings_copy = *bindings; // temporary copy
242 0 : for (const auto& rbuf : bindings_copy) {
243 0 : removeReaderFromRingBuffer(rb, rbuf->getId());
244 : }
245 0 : }
246 :
247 : void
248 93 : RingBufferPool::unBindAllHalfDuplexIn(const std::string& sourceBufferId)
249 : {
250 93 : std::lock_guard lk(stateLock_);
251 93 : const std::shared_ptr<RingBuffer>& ringBuffer = getRingBuffer(sourceBufferId);
252 93 : const std::vector<std::string>& subscribers = ringBuffer->getSubscribers();
253 255 : for (const auto& subscriber : subscribers) {
254 162 : removeReaderFromRingBuffer(ringBuffer, subscriber);
255 : }
256 93 : }
257 :
258 : void
259 436 : RingBufferPool::unBindAll(const std::string& ringbufferId)
260 : {
261 1744 : JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
262 :
263 436 : const auto& rb = getRingBuffer(ringbufferId);
264 436 : if (not rb) {
265 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
266 0 : return;
267 : }
268 :
269 436 : std::lock_guard lk(stateLock_);
270 :
271 436 : auto bindings = getReadBindings(ringbufferId);
272 436 : if (not bindings)
273 225 : return;
274 :
275 211 : const auto bindings_copy = *bindings; // temporary copy
276 445 : for (const auto& rbuf : bindings_copy) {
277 234 : removeReaderFromRingBuffer(rbuf, ringbufferId);
278 234 : removeReaderFromRingBuffer(rb, rbuf->getId());
279 : }
280 661 : }
281 :
282 : std::shared_ptr<AudioFrame>
283 37523 : RingBufferPool::getData(const std::string& ringbufferId)
284 : {
285 37523 : std::lock_guard lk(stateLock_);
286 :
287 37523 : const auto bindings = getReadBindings(ringbufferId);
288 37523 : if (not bindings)
289 471 : return {};
290 :
291 : // No mixing
292 37052 : if (bindings->size() == 1)
293 54238 : return (*bindings->cbegin())->get(ringbufferId);
294 :
295 9933 : auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
296 9933 : auto mixed = false;
297 33666 : for (const auto& rbuf : *bindings) {
298 23733 : if (auto b = rbuf->get(ringbufferId)) {
299 0 : mixed = true;
300 0 : mixBuffer->mix(*b);
301 :
302 : // voice is true if any of mixed frames has voice
303 0 : mixBuffer->has_voice |= b->has_voice;
304 23733 : }
305 : }
306 :
307 9933 : return mixed ? mixBuffer : nullptr;
308 37523 : }
309 :
310 : bool
311 0 : RingBufferPool::waitForDataAvailable(const std::string& ringbufferId, const std::chrono::microseconds& max_wait) const
312 : {
313 0 : std::unique_lock<std::recursive_mutex> lk(stateLock_);
314 :
315 : // convert to absolute time
316 0 : const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
317 :
318 0 : auto bindings = getReadBindings(ringbufferId);
319 0 : if (not bindings)
320 0 : return 0;
321 :
322 0 : const auto bindings_copy = *bindings; // temporary copy
323 0 : for (const auto& rbuf : bindings_copy) {
324 0 : lk.unlock();
325 0 : if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
326 0 : return false;
327 0 : lk.lock();
328 : }
329 0 : return true;
330 0 : }
331 :
332 : std::shared_ptr<AudioFrame>
333 0 : RingBufferPool::getAvailableData(const std::string& ringbufferId)
334 : {
335 0 : std::lock_guard lk(stateLock_);
336 :
337 0 : auto bindings = getReadBindings(ringbufferId);
338 0 : if (not bindings)
339 0 : return 0;
340 :
341 : // No mixing
342 0 : if (bindings->size() == 1) {
343 0 : return (*bindings->cbegin())->get(ringbufferId);
344 : }
345 :
346 0 : size_t availableFrames = 0;
347 :
348 0 : for (const auto& rbuf : *bindings)
349 0 : availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
350 :
351 0 : if (availableFrames == 0)
352 0 : return {};
353 :
354 0 : auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
355 0 : for (const auto& rbuf : *bindings) {
356 0 : if (auto b = rbuf->get(ringbufferId)) {
357 0 : buf->mix(*b);
358 :
359 : // voice is true if any of mixed frames has voice
360 0 : buf->has_voice |= b->has_voice;
361 0 : }
362 : }
363 :
364 0 : return buf;
365 0 : }
366 :
367 : size_t
368 0 : RingBufferPool::availableForGet(const std::string& ringbufferId) const
369 : {
370 0 : std::lock_guard lk(stateLock_);
371 :
372 0 : const auto bindings = getReadBindings(ringbufferId);
373 0 : if (not bindings)
374 0 : return 0;
375 :
376 : // No mixing
377 0 : if (bindings->size() == 1) {
378 0 : return (*bindings->begin())->availableForGet(ringbufferId);
379 : }
380 :
381 0 : size_t availableSamples = std::numeric_limits<size_t>::max();
382 :
383 0 : for (const auto& rbuf : *bindings) {
384 0 : const size_t nbSamples = rbuf->availableForGet(ringbufferId);
385 0 : if (nbSamples != 0)
386 0 : availableSamples = std::min(availableSamples, nbSamples);
387 : }
388 :
389 0 : return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
390 0 : }
391 :
392 : size_t
393 0 : RingBufferPool::discard(size_t toDiscard, const std::string& ringbufferId)
394 : {
395 0 : std::lock_guard lk(stateLock_);
396 :
397 0 : const auto bindings = getReadBindings(ringbufferId);
398 0 : if (not bindings)
399 0 : return 0;
400 :
401 0 : for (const auto& rbuf : *bindings)
402 0 : rbuf->discard(toDiscard, ringbufferId);
403 :
404 0 : return toDiscard;
405 0 : }
406 :
407 : void
408 676 : RingBufferPool::flush(const std::string& ringbufferId)
409 : {
410 676 : std::lock_guard lk(stateLock_);
411 :
412 676 : const auto bindings = getReadBindings(ringbufferId);
413 676 : if (not bindings)
414 205 : return;
415 :
416 6408 : for (const auto& rbuf : *bindings)
417 5937 : rbuf->flush(ringbufferId);
418 676 : }
419 :
420 : void
421 283 : RingBufferPool::flushAllBuffers()
422 : {
423 283 : std::lock_guard lk(stateLock_);
424 :
425 6416 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
426 6133 : if (const auto rb = item->second.lock()) {
427 5813 : rb->flushAll();
428 5813 : ++item;
429 : } else {
430 : // Use this version of erase to avoid using invalidated iterator
431 320 : item = ringBufferMap_.erase(item);
432 6133 : }
433 : }
434 283 : }
435 :
436 : bool
437 0 : RingBufferPool::isAudioMeterActive(const std::string& id)
438 : {
439 0 : std::lock_guard lk(stateLock_);
440 0 : if (!id.empty()) {
441 0 : if (auto rb = getRingBuffer(id)) {
442 0 : return rb->isAudioMeterActive();
443 0 : }
444 : } else {
445 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
446 0 : if (const auto rb = item->second.lock()) {
447 0 : if (rb->isAudioMeterActive()) {
448 0 : return true;
449 : }
450 0 : }
451 : }
452 : }
453 0 : return false;
454 0 : }
455 :
456 : void
457 0 : RingBufferPool::setAudioMeterState(const std::string& id, bool state)
458 : {
459 0 : std::lock_guard lk(stateLock_);
460 0 : if (!id.empty()) {
461 0 : if (auto rb = getRingBuffer(id)) {
462 0 : rb->setAudioMeterState(state);
463 0 : }
464 : } else {
465 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
466 0 : if (const auto rb = item->second.lock()) {
467 0 : rb->setAudioMeterState(state);
468 0 : }
469 : }
470 : }
471 0 : }
472 :
473 : } // namespace jami
|