Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "ringbufferpool.h"
19 : #include "ringbuffer.h"
20 : #include "logger.h"
21 :
22 : #include <cstring>
23 :
24 : namespace jami {
25 :
26 : const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
27 :
28 37 : RingBufferPool::RingBufferPool()
29 37 : : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
30 37 : {}
31 :
32 37 : RingBufferPool::~RingBufferPool()
33 : {
34 37 : readBindingsMap_.clear();
35 37 : defaultRingBuffer_.reset();
36 :
37 : // Verify ringbuffer not removed yet
38 : // XXXX: With a good design this should never happen! :-P
39 146 : for (const auto& item : ringBufferMap_) {
40 109 : const auto& weak = item.second;
41 109 : if (not weak.expired())
42 0 : JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
43 : }
44 37 : }
45 :
46 : void
47 0 : RingBufferPool::setInternalSamplingRate(unsigned sr)
48 : {
49 0 : std::lock_guard lk(stateLock_);
50 :
51 0 : if (sr != internalAudioFormat_.sample_rate) {
52 0 : flushAllBuffersLocked();
53 0 : internalAudioFormat_.sample_rate = sr;
54 : }
55 0 : }
56 :
57 : void
58 0 : RingBufferPool::setInternalAudioFormat(AudioFormat format)
59 : {
60 0 : std::lock_guard lk(stateLock_);
61 :
62 0 : if (format != internalAudioFormat_) {
63 0 : flushAllBuffersLocked();
64 0 : internalAudioFormat_ = format;
65 0 : for (auto& wrb : ringBufferMap_)
66 0 : if (auto rb = wrb.second.lock())
67 0 : rb->setFormat(internalAudioFormat_);
68 : }
69 0 : }
70 :
71 : std::shared_ptr<RingBuffer>
72 2656 : RingBufferPool::getRingBufferLocked(const std::string& id)
73 : {
74 2656 : const auto& it = ringBufferMap_.find(id);
75 2656 : if (it != ringBufferMap_.cend()) {
76 2196 : if (const auto& sptr = it->second.lock())
77 2196 : return sptr;
78 10 : ringBufferMap_.erase(it);
79 : }
80 :
81 470 : return nullptr;
82 : }
83 :
84 : std::shared_ptr<RingBuffer>
85 0 : RingBufferPool::getRingBufferLocked(const std::string& id) const
86 : {
87 0 : const auto& it = ringBufferMap_.find(id);
88 0 : if (it != ringBufferMap_.cend())
89 0 : return it->second.lock();
90 :
91 0 : return nullptr;
92 : }
93 :
94 : std::shared_ptr<RingBuffer>
95 34 : RingBufferPool::getRingBuffer(const std::string& id)
96 : {
97 34 : std::lock_guard lk(stateLock_);
98 68 : return getRingBufferLocked(id);
99 34 : }
100 :
101 : std::shared_ptr<RingBuffer>
102 0 : RingBufferPool::getRingBuffer(const std::string& id) const
103 : {
104 0 : std::lock_guard lk(stateLock_);
105 0 : return getRingBufferLocked(id);
106 0 : }
107 :
108 : std::shared_ptr<RingBuffer>
109 803 : RingBufferPool::createRingBuffer(const std::string& id)
110 : {
111 803 : std::lock_guard lk(stateLock_);
112 :
113 803 : auto rbuf = getRingBufferLocked(id);
114 803 : if (rbuf) {
115 1332 : JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
116 333 : return rbuf;
117 : }
118 :
119 470 : rbuf.reset(new RingBuffer(id, internalAudioFormat_));
120 470 : ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
121 470 : return rbuf;
122 803 : }
123 :
124 : const RingBufferPool::ReadBindings*
125 41593 : RingBufferPool::getReadBindings(const std::string& ringbufferId) const
126 : {
127 41593 : const auto& iter = readBindingsMap_.find(ringbufferId);
128 41593 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
129 : }
130 :
131 : RingBufferPool::ReadBindings*
132 43743 : RingBufferPool::getReadBindings(const std::string& ringbufferId)
133 : {
134 43743 : const auto& iter = readBindingsMap_.find(ringbufferId);
135 43743 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
136 : }
137 :
138 : void
139 316 : RingBufferPool::removeReadBindings(const std::string& ringbufferId)
140 : {
141 316 : if (not readBindingsMap_.erase(ringbufferId))
142 0 : JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
143 316 : }
144 :
145 : void
146 905 : RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer>& sourceBuffer, const std::string& readerBufferId)
147 : {
148 905 : if (readerBufferId != DEFAULT_ID and sourceBuffer->getId() == readerBufferId)
149 20 : JAMI_WARNING("RingBuffer has a readoffset on itself");
150 :
151 905 : sourceBuffer->createReadOffset(readerBufferId);
152 905 : readBindingsMap_[readerBufferId].insert(sourceBuffer);
153 905 : }
154 :
155 : void
156 978 : RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer>& sourceBuffer,
157 : const std::string& readerBufferId)
158 : {
159 978 : if (auto* bindings = getReadBindings(readerBufferId)) {
160 892 : bindings->erase(sourceBuffer);
161 892 : if (bindings->empty())
162 316 : removeReadBindings(readerBufferId);
163 : }
164 :
165 978 : sourceBuffer->removeReadOffset(readerBufferId);
166 978 : }
167 :
168 : void
169 361 : RingBufferPool::bindRingBuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
170 : {
171 1444 : JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
172 :
173 361 : std::lock_guard lk(stateLock_);
174 :
175 361 : const auto& rb1 = getRingBufferLocked(ringbufferId1);
176 361 : if (not rb1) {
177 0 : JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
178 0 : return;
179 : }
180 :
181 361 : const auto& rb2 = getRingBufferLocked(ringbufferId2);
182 361 : if (not rb2) {
183 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
184 0 : return;
185 : }
186 :
187 361 : addReaderToRingBuffer(rb1, ringbufferId2);
188 361 : addReaderToRingBuffer(rb2, ringbufferId1);
189 361 : }
190 :
191 : void
192 183 : RingBufferPool::bindHalfDuplexOut(const std::string& readerBufferId, const std::string& sourceBufferId)
193 : {
194 : /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
195 : * do nothing */
196 183 : std::lock_guard lk(stateLock_);
197 :
198 183 : if (const auto& rb = getRingBufferLocked(sourceBufferId)) {
199 : // p1 est le binding de p2 (p2 lit le stream de p1)
200 183 : addReaderToRingBuffer(rb, readerBufferId);
201 183 : }
202 183 : }
203 :
204 : void
205 70 : RingBufferPool::unbindRingBuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
206 : {
207 280 : JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
208 :
209 70 : std::lock_guard lk(stateLock_);
210 :
211 70 : const auto& rb1 = getRingBufferLocked(ringbufferId1);
212 70 : if (not rb1) {
213 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
214 0 : return;
215 : }
216 :
217 70 : const auto& rb2 = getRingBufferLocked(ringbufferId2);
218 70 : if (not rb2) {
219 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
220 0 : return;
221 : }
222 :
223 70 : removeReaderFromRingBuffer(rb1, ringbufferId2);
224 70 : removeReaderFromRingBuffer(rb2, ringbufferId1);
225 70 : }
226 :
227 : void
228 184 : RingBufferPool::unBindHalfDuplexOut(const std::string& readerBufferId, const std::string& sourceBufferId)
229 : {
230 184 : std::lock_guard lk(stateLock_);
231 :
232 184 : if (const auto& rb = getRingBufferLocked(sourceBufferId))
233 184 : removeReaderFromRingBuffer(rb, readerBufferId);
234 184 : }
235 :
236 : void
237 68 : RingBufferPool::unBindAllHalfDuplexOut(const std::string& ringbufferId)
238 : {
239 68 : std::lock_guard lk(stateLock_);
240 :
241 68 : const auto& rb = getRingBufferLocked(ringbufferId);
242 68 : if (not rb) {
243 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
244 0 : return;
245 : }
246 :
247 68 : auto* bindings = getReadBindings(ringbufferId);
248 68 : if (not bindings)
249 52 : return;
250 16 : const auto bindings_copy = *bindings; // temporary copy
251 52 : for (const auto& rbuf : bindings_copy) {
252 36 : removeReaderFromRingBuffer(rb, rbuf->getId());
253 : }
254 120 : }
255 :
256 : void
257 95 : RingBufferPool::unBindAllHalfDuplexIn(const std::string& sourceBufferId)
258 : {
259 95 : std::lock_guard lk(stateLock_);
260 :
261 95 : auto ringBuffer = getRingBufferLocked(sourceBufferId);
262 95 : if (not ringBuffer) {
263 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", sourceBufferId);
264 0 : return;
265 : }
266 :
267 95 : const std::vector<std::string>& subscribers = ringBuffer->getSubscribers();
268 263 : for (const auto& subscriber : subscribers) {
269 168 : removeReaderFromRingBuffer(ringBuffer, subscriber);
270 : }
271 95 : }
272 :
273 : void
274 427 : RingBufferPool::unBindAll(const std::string& ringbufferId)
275 : {
276 1708 : JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
277 :
278 427 : std::lock_guard lk(stateLock_);
279 :
280 427 : const auto& rb = getRingBufferLocked(ringbufferId);
281 427 : if (not rb) {
282 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
283 0 : return;
284 : }
285 :
286 427 : auto* bindings = getReadBindings(ringbufferId);
287 427 : if (not bindings)
288 223 : return;
289 :
290 204 : const auto bindings_copy = *bindings; // temporary copy
291 429 : for (const auto& rbuf : bindings_copy) {
292 225 : removeReaderFromRingBuffer(rbuf, ringbufferId);
293 225 : removeReaderFromRingBuffer(rb, rbuf->getId());
294 : }
295 650 : }
296 :
297 : std::shared_ptr<AudioFrame>
298 41593 : RingBufferPool::getData(const std::string& ringbufferId)
299 : {
300 41593 : std::lock_guard lk(stateLock_);
301 :
302 41593 : auto* const bindings = getReadBindings(ringbufferId);
303 41593 : if (not bindings)
304 6012 : return {};
305 :
306 : // No mixing
307 35581 : if (bindings->size() == 1)
308 51264 : return (*bindings->cbegin())->get(ringbufferId);
309 :
310 9949 : auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
311 9949 : auto mixed = false;
312 33839 : for (const auto& rbuf : *bindings) {
313 23890 : if (auto b = rbuf->get(ringbufferId)) {
314 0 : mixed = true;
315 0 : mixBuffer->mix(*b);
316 :
317 : // voice is true if any of mixed frames has voice
318 0 : mixBuffer->has_voice |= b->has_voice;
319 23890 : }
320 : }
321 :
322 9949 : return mixed ? mixBuffer : nullptr;
323 41593 : }
324 :
325 : bool
326 0 : RingBufferPool::waitForDataAvailable(const std::string& ringbufferId, const duration& max_wait) const
327 : {
328 0 : return waitForDataAvailable(ringbufferId, clock::now() + max_wait);
329 : }
330 :
331 : bool
332 41593 : RingBufferPool::waitForDataAvailable(const std::string& ringbufferId, const time_point& deadline) const
333 : {
334 41593 : std::unique_lock lk(stateLock_);
335 41593 : const auto* bindings = getReadBindings(ringbufferId);
336 41593 : if (not bindings)
337 5896 : return false;
338 35697 : const auto bindings_copy = *bindings; // temporary copy
339 :
340 35697 : lk.unlock();
341 35697 : for (const auto& rbuf : bindings_copy) {
342 35697 : if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
343 35697 : return false;
344 : }
345 0 : return true;
346 41593 : }
347 :
348 : std::shared_ptr<AudioFrame>
349 0 : RingBufferPool::getAvailableData(const std::string& ringbufferId)
350 : {
351 0 : std::lock_guard lk(stateLock_);
352 :
353 0 : auto* bindings = getReadBindings(ringbufferId);
354 0 : if (not bindings)
355 0 : return {};
356 :
357 : // No mixing
358 0 : if (bindings->size() == 1) {
359 0 : return (*bindings->cbegin())->get(ringbufferId);
360 : }
361 :
362 0 : size_t availableFrames = 0;
363 :
364 0 : for (const auto& rbuf : *bindings)
365 0 : availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
366 :
367 0 : if (availableFrames == 0)
368 0 : return {};
369 :
370 0 : auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
371 0 : for (const auto& rbuf : *bindings) {
372 0 : if (auto b = rbuf->get(ringbufferId)) {
373 0 : buf->mix(*b);
374 :
375 : // voice is true if any of mixed frames has voice
376 0 : buf->has_voice |= b->has_voice;
377 0 : }
378 : }
379 :
380 0 : return buf;
381 0 : }
382 :
383 : size_t
384 0 : RingBufferPool::availableForGet(const std::string& ringbufferId) const
385 : {
386 0 : std::lock_guard lk(stateLock_);
387 :
388 0 : const auto* const bindings = getReadBindings(ringbufferId);
389 0 : if (not bindings)
390 0 : return 0;
391 :
392 : // No mixing
393 0 : if (bindings->size() == 1) {
394 0 : return (*bindings->begin())->availableForGet(ringbufferId);
395 : }
396 :
397 0 : size_t availableSamples = std::numeric_limits<size_t>::max();
398 :
399 0 : for (const auto& rbuf : *bindings) {
400 0 : const size_t nbSamples = rbuf->availableForGet(ringbufferId);
401 0 : if (nbSamples != 0)
402 0 : availableSamples = std::min(availableSamples, nbSamples);
403 : }
404 :
405 0 : return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
406 0 : }
407 :
408 : size_t
409 0 : RingBufferPool::discard(size_t toDiscard, const std::string& ringbufferId)
410 : {
411 0 : std::lock_guard lk(stateLock_);
412 :
413 0 : auto* const bindings = getReadBindings(ringbufferId);
414 0 : if (not bindings)
415 0 : return 0;
416 :
417 0 : for (const auto& rbuf : *bindings)
418 0 : rbuf->discard(toDiscard, ringbufferId);
419 :
420 0 : return toDiscard;
421 0 : }
422 :
423 : void
424 677 : RingBufferPool::flush(const std::string& ringbufferId)
425 : {
426 677 : std::lock_guard lk(stateLock_);
427 :
428 677 : auto* const bindings = getReadBindings(ringbufferId);
429 677 : if (not bindings)
430 200 : return;
431 :
432 2170 : for (const auto& rbuf : *bindings)
433 1693 : rbuf->flush(ringbufferId);
434 677 : }
435 :
436 : void
437 264 : RingBufferPool::flushAllBuffersLocked()
438 : {
439 2750 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
440 2486 : if (const auto rb = item->second.lock()) {
441 2135 : rb->flushAll();
442 2135 : ++item;
443 : } else {
444 : // Use this version of erase to avoid using invalidated iterator
445 351 : item = ringBufferMap_.erase(item);
446 2486 : }
447 : }
448 264 : }
449 :
450 : void
451 264 : RingBufferPool::flushAllBuffers()
452 : {
453 264 : std::lock_guard lk(stateLock_);
454 264 : flushAllBuffersLocked();
455 264 : }
456 :
457 : bool
458 0 : RingBufferPool::isAudioMeterActive(const std::string& id)
459 : {
460 0 : std::lock_guard lk(stateLock_);
461 0 : if (!id.empty()) {
462 0 : if (auto rb = getRingBufferLocked(id)) {
463 0 : return rb->isAudioMeterActive();
464 0 : }
465 : } else {
466 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
467 0 : if (const auto rb = item->second.lock()) {
468 0 : if (rb->isAudioMeterActive()) {
469 0 : return true;
470 : }
471 0 : }
472 : }
473 : }
474 0 : return false;
475 0 : }
476 :
477 : void
478 0 : RingBufferPool::setAudioMeterState(const std::string& id, bool state)
479 : {
480 0 : std::lock_guard lk(stateLock_);
481 0 : if (!id.empty()) {
482 0 : if (auto rb = getRingBufferLocked(id)) {
483 0 : rb->setAudioMeterState(state);
484 0 : }
485 : } else {
486 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
487 0 : if (const auto rb = item->second.lock()) {
488 0 : rb->setAudioMeterState(state);
489 0 : }
490 : }
491 : }
492 0 : }
493 :
494 : } // namespace jami
|