Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "ringbufferpool.h"
19 : #include "ringbuffer.h"
20 : #include "ring_types.h" // for SIZEBUF
21 : #include "logger.h"
22 :
23 : #include <limits>
24 : #include <utility> // for std::pair
25 : #include <cstring>
26 : #include <algorithm>
27 :
28 : namespace jami {
29 :
30 : const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
31 :
32 39 : RingBufferPool::RingBufferPool()
33 39 : : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
34 39 : {}
35 :
36 39 : RingBufferPool::~RingBufferPool()
37 : {
38 39 : readBindingsMap_.clear();
39 39 : defaultRingBuffer_.reset();
40 :
41 : // Verify ringbuffer not removed yet
42 : // XXXX: With a good design this should never happen! :-P
43 197 : for (const auto& item : ringBufferMap_) {
44 158 : const auto& weak = item.second;
45 158 : if (not weak.expired())
46 0 : JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
47 : }
48 39 : }
49 :
50 : void
51 0 : RingBufferPool::setInternalSamplingRate(unsigned sr)
52 : {
53 0 : std::lock_guard lk(stateLock_);
54 :
55 0 : if (sr != internalAudioFormat_.sample_rate) {
56 0 : flushAllBuffers();
57 0 : internalAudioFormat_.sample_rate = sr;
58 : }
59 0 : }
60 :
61 : void
62 0 : RingBufferPool::setInternalAudioFormat(AudioFormat format)
63 : {
64 0 : std::lock_guard lk(stateLock_);
65 :
66 0 : if (format != internalAudioFormat_) {
67 0 : flushAllBuffers();
68 0 : internalAudioFormat_ = format;
69 0 : for (auto& wrb : ringBufferMap_)
70 0 : if (auto rb = wrb.second.lock())
71 0 : rb->setFormat(internalAudioFormat_);
72 : }
73 0 : }
74 :
75 : std::shared_ptr<RingBuffer>
76 2440 : RingBufferPool::getRingBuffer(const std::string& id)
77 : {
78 2440 : std::lock_guard lk(stateLock_);
79 :
80 2440 : const auto& it = ringBufferMap_.find(id);
81 2440 : if (it != ringBufferMap_.cend()) {
82 1964 : if (const auto& sptr = it->second.lock())
83 1964 : return sptr;
84 14 : ringBufferMap_.erase(it);
85 : }
86 :
87 490 : return nullptr;
88 2440 : }
89 :
90 : std::shared_ptr<RingBuffer>
91 0 : RingBufferPool::getRingBuffer(const std::string& id) const
92 : {
93 0 : std::lock_guard lk(stateLock_);
94 :
95 0 : const auto& it = ringBufferMap_.find(id);
96 0 : if (it != ringBufferMap_.cend())
97 0 : return it->second.lock();
98 :
99 0 : return nullptr;
100 0 : }
101 :
102 : std::shared_ptr<RingBuffer>
103 871 : RingBufferPool::createRingBuffer(const std::string& id)
104 : {
105 871 : std::lock_guard lk(stateLock_);
106 :
107 871 : auto rbuf = getRingBuffer(id);
108 871 : if (rbuf) {
109 1143 : JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
110 381 : return rbuf;
111 : }
112 :
113 490 : rbuf.reset(new RingBuffer(id, SIZEBUF, internalAudioFormat_));
114 490 : ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
115 490 : return rbuf;
116 871 : }
117 :
118 : const RingBufferPool::ReadBindings*
119 0 : RingBufferPool::getReadBindings(const std::string& ringbufferId) const
120 : {
121 0 : const auto& iter = readBindingsMap_.find(ringbufferId);
122 0 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
123 : }
124 :
125 : RingBufferPool::ReadBindings*
126 48868 : RingBufferPool::getReadBindings(const std::string& ringbufferId)
127 : {
128 48868 : const auto& iter = readBindingsMap_.find(ringbufferId);
129 48868 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
130 : }
131 :
132 : void
133 353 : RingBufferPool::removeReadBindings(const std::string& ringbufferId)
134 : {
135 353 : if (not readBindingsMap_.erase(ringbufferId))
136 0 : JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
137 353 : }
138 :
139 : void
140 798 : RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer> &sourceBuffer,
141 : const std::string &readerBufferId) {
142 798 : if (readerBufferId != DEFAULT_ID and sourceBuffer->getId() == readerBufferId)
143 30 : JAMI_WARNING("RingBuffer has a readoffset on itself");
144 :
145 798 : sourceBuffer->createReadOffset(readerBufferId);
146 798 : readBindingsMap_[readerBufferId].insert(sourceBuffer);
147 798 : }
148 :
149 : void
150 842 : RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer> &sourceBuffer,
151 : const std::string &readerBufferId) {
152 842 : if (auto bindings = getReadBindings(readerBufferId)) {
153 773 : bindings->erase(sourceBuffer);
154 773 : if (bindings->empty())
155 353 : removeReadBindings(readerBufferId);
156 : }
157 :
158 842 : sourceBuffer->removeReadOffset(readerBufferId);
159 842 : }
160 :
161 : void
162 385 : RingBufferPool::bindRingBuffers(const std::string &ringbufferId1,
163 : const std::string &ringbufferId2) {
164 1155 : JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
165 :
166 385 : const auto &rb1 = getRingBuffer(ringbufferId1);
167 385 : if (not rb1) {
168 0 : JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
169 0 : return;
170 : }
171 :
172 385 : const auto &rb2 = getRingBuffer(ringbufferId2);
173 385 : if (not rb2) {
174 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
175 0 : return;
176 : }
177 :
178 385 : std::lock_guard lk(stateLock_);
179 :
180 385 : addReaderToRingBuffer(rb1, ringbufferId2);
181 385 : addReaderToRingBuffer(rb2, ringbufferId1);
182 385 : }
183 :
184 : void
185 28 : RingBufferPool::bindHalfDuplexOut(const std::string &readerBufferId,
186 : const std::string &sourceBufferId) {
187 : /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
188 : * do nothing */
189 28 : if (const auto &rb = getRingBuffer(sourceBufferId)) {
190 28 : std::lock_guard lk(stateLock_);
191 :
192 : // p1 est le binding de p2 (p2 lit le stream de p1)
193 28 : addReaderToRingBuffer(rb, readerBufferId);
194 56 : }
195 28 : }
196 :
197 : void
198 71 : RingBufferPool::unbindRingBuffers(const std::string &ringbufferId1,
199 : const std::string &ringbufferId2) {
200 213 : JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
201 :
202 71 : const auto &rb1 = getRingBuffer(ringbufferId1);
203 71 : if (not rb1) {
204 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
205 0 : return;
206 : }
207 :
208 71 : const auto &rb2 = getRingBuffer(ringbufferId2);
209 71 : if (not rb2) {
210 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
211 0 : return;
212 : }
213 :
214 71 : std::lock_guard lk(stateLock_);
215 :
216 71 : removeReaderFromRingBuffer(rb1, ringbufferId2);
217 71 : removeReaderFromRingBuffer(rb2, ringbufferId1);
218 71 : }
219 :
220 : void
221 38 : RingBufferPool::unBindHalfDuplexOut(const std::string &readerBufferId,
222 : const std::string &sourceBufferId) {
223 38 : std::lock_guard lk(stateLock_);
224 :
225 38 : if (const auto &rb = getRingBuffer(sourceBufferId))
226 38 : removeReaderFromRingBuffer(rb, readerBufferId);
227 38 : }
228 :
229 : void
230 0 : RingBufferPool::unBindAllHalfDuplexOut(const std::string &ringbufferId) {
231 0 : const auto &rb = getRingBuffer(ringbufferId);
232 0 : if (not rb) {
233 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
234 0 : return;
235 : }
236 0 : std::lock_guard lk(stateLock_);
237 0 : auto bindings = getReadBindings(ringbufferId);
238 0 : if (not bindings)
239 0 : return;
240 0 : const auto bindings_copy = *bindings; // temporary copy
241 0 : for (const auto &rbuf: bindings_copy) {
242 0 : removeReaderFromRingBuffer(rb, rbuf->getId());
243 : }
244 0 : }
245 :
246 : void
247 95 : RingBufferPool::unBindAllHalfDuplexIn(const std::string &sourceBufferId) {
248 95 : std::lock_guard lk(stateLock_);
249 95 : const std::shared_ptr<RingBuffer> &ringBuffer = getRingBuffer(sourceBufferId);
250 95 : const std::vector<std::string> &subscribers = ringBuffer->getSubscribers();
251 263 : for (const auto &subscriber: subscribers) {
252 168 : removeReaderFromRingBuffer(ringBuffer, subscriber);
253 : }
254 95 : }
255 :
256 : void
257 460 : RingBufferPool::unBindAll(const std::string &ringbufferId) {
258 1380 : JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
259 :
260 460 : const auto &rb = getRingBuffer(ringbufferId);
261 460 : if (not rb) {
262 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
263 0 : return;
264 : }
265 :
266 460 : std::lock_guard lk(stateLock_);
267 :
268 460 : auto bindings = getReadBindings(ringbufferId);
269 460 : if (not bindings)
270 235 : return;
271 :
272 225 : const auto bindings_copy = *bindings; // temporary copy
273 472 : for (const auto &rbuf: bindings_copy) {
274 247 : removeReaderFromRingBuffer(rbuf, ringbufferId);
275 247 : removeReaderFromRingBuffer(rb, rbuf->getId());
276 : }
277 695 : }
278 :
279 : std::shared_ptr<AudioFrame>
280 46998 : RingBufferPool::getData(const std::string& ringbufferId)
281 : {
282 46998 : std::lock_guard lk(stateLock_);
283 :
284 46998 : const auto bindings = getReadBindings(ringbufferId);
285 46998 : if (not bindings)
286 463 : return {};
287 :
288 : // No mixing
289 46535 : if (bindings->size() == 1)
290 72906 : return (*bindings->cbegin())->get(ringbufferId);
291 :
292 10082 : auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
293 10082 : auto mixed = false;
294 34105 : for (const auto& rbuf : *bindings) {
295 24023 : if (auto b = rbuf->get(ringbufferId)) {
296 0 : mixed = true;
297 0 : mixBuffer->mix(*b);
298 :
299 : // voice is true if any of mixed frames has voice
300 0 : mixBuffer->has_voice |= b->has_voice;
301 24023 : }
302 : }
303 :
304 10082 : return mixed ? mixBuffer : nullptr;
305 46998 : }
306 :
307 : bool
308 0 : RingBufferPool::waitForDataAvailable(const std::string& ringbufferId,
309 : const std::chrono::microseconds& max_wait) const
310 : {
311 0 : std::unique_lock<std::recursive_mutex> lk(stateLock_);
312 :
313 : // convert to absolute time
314 0 : const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
315 :
316 0 : auto bindings = getReadBindings(ringbufferId);
317 0 : if (not bindings)
318 0 : return 0;
319 :
320 0 : const auto bindings_copy = *bindings; // temporary copy
321 0 : for (const auto& rbuf : bindings_copy) {
322 0 : lk.unlock();
323 0 : if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
324 0 : return false;
325 0 : lk.lock();
326 : }
327 0 : return true;
328 0 : }
329 :
330 : std::shared_ptr<AudioFrame>
331 0 : RingBufferPool::getAvailableData(const std::string& ringbufferId)
332 : {
333 0 : std::lock_guard lk(stateLock_);
334 :
335 0 : auto bindings = getReadBindings(ringbufferId);
336 0 : if (not bindings)
337 0 : return 0;
338 :
339 : // No mixing
340 0 : if (bindings->size() == 1) {
341 0 : return (*bindings->cbegin())->get(ringbufferId);
342 : }
343 :
344 0 : size_t availableFrames = 0;
345 :
346 0 : for (const auto& rbuf : *bindings)
347 0 : availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
348 :
349 0 : if (availableFrames == 0)
350 0 : return {};
351 :
352 0 : auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
353 0 : for (const auto& rbuf : *bindings) {
354 0 : if (auto b = rbuf->get(ringbufferId)) {
355 0 : buf->mix(*b);
356 :
357 : // voice is true if any of mixed frames has voice
358 0 : buf->has_voice |= b->has_voice;
359 0 : }
360 : }
361 :
362 0 : return buf;
363 0 : }
364 :
365 : size_t
366 0 : RingBufferPool::availableForGet(const std::string& ringbufferId) const
367 : {
368 0 : std::lock_guard lk(stateLock_);
369 :
370 0 : const auto bindings = getReadBindings(ringbufferId);
371 0 : if (not bindings)
372 0 : return 0;
373 :
374 : // No mixing
375 0 : if (bindings->size() == 1) {
376 0 : return (*bindings->begin())->availableForGet(ringbufferId);
377 : }
378 :
379 0 : size_t availableSamples = std::numeric_limits<size_t>::max();
380 :
381 0 : for (const auto& rbuf : *bindings) {
382 0 : const size_t nbSamples = rbuf->availableForGet(ringbufferId);
383 0 : if (nbSamples != 0)
384 0 : availableSamples = std::min(availableSamples, nbSamples);
385 : }
386 :
387 0 : return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
388 0 : }
389 :
390 : size_t
391 0 : RingBufferPool::discard(size_t toDiscard, const std::string& ringbufferId)
392 : {
393 0 : std::lock_guard lk(stateLock_);
394 :
395 0 : const auto bindings = getReadBindings(ringbufferId);
396 0 : if (not bindings)
397 0 : return 0;
398 :
399 0 : for (const auto& rbuf : *bindings)
400 0 : rbuf->discard(toDiscard, ringbufferId);
401 :
402 0 : return toDiscard;
403 0 : }
404 :
405 : void
406 568 : RingBufferPool::flush(const std::string& ringbufferId)
407 : {
408 568 : std::lock_guard lk(stateLock_);
409 :
410 568 : const auto bindings = getReadBindings(ringbufferId);
411 568 : if (not bindings)
412 188 : return;
413 :
414 6271 : for (const auto& rbuf : *bindings)
415 5891 : rbuf->flush(ringbufferId);
416 568 : }
417 :
418 : void
419 299 : RingBufferPool::flushAllBuffers()
420 : {
421 299 : std::lock_guard lk(stateLock_);
422 :
423 6444 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
424 6145 : if (const auto rb = item->second.lock()) {
425 5827 : rb->flushAll();
426 5827 : ++item;
427 : } else {
428 : // Use this version of erase to avoid using invalidated iterator
429 318 : item = ringBufferMap_.erase(item);
430 6145 : }
431 : }
432 299 : }
433 :
434 : bool
435 0 : RingBufferPool::isAudioMeterActive(const std::string& id)
436 : {
437 0 : std::lock_guard lk(stateLock_);
438 0 : if (!id.empty()) {
439 0 : if (auto rb = getRingBuffer(id)) {
440 0 : return rb->isAudioMeterActive();
441 0 : }
442 : } else {
443 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
444 0 : if (const auto rb = item->second.lock()) {
445 0 : if (rb->isAudioMeterActive()) {
446 0 : return true;
447 : }
448 0 : }
449 : }
450 : }
451 0 : return false;
452 0 : }
453 :
454 : void
455 0 : RingBufferPool::setAudioMeterState(const std::string& id, bool state)
456 : {
457 0 : std::lock_guard lk(stateLock_);
458 0 : if (!id.empty()) {
459 0 : if (auto rb = getRingBuffer(id)) {
460 0 : rb->setAudioMeterState(state);
461 0 : }
462 : } else {
463 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
464 0 : if (const auto rb = item->second.lock()) {
465 0 : rb->setAudioMeterState(state);
466 0 : }
467 : }
468 : }
469 0 : }
470 :
471 : } // namespace jami
|