Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Alexandre Savard <alexandre.savard@savoirfairelinux.com>
5 : * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
6 : *
7 : * This program is free software; you can redistribute it and/or modify
8 : * it under the terms of the GNU General Public License as published by
9 : * the Free Software Foundation; either version 3 of the License, or
10 : * (at your option) any later version.
11 : *
12 : * This program is distributed in the hope that it will be useful,
13 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : * GNU General Public License for more details.
16 : *
17 : * You should have received a copy of the GNU General Public License
18 : * along with this program; if not, write to the Free Software
19 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "ringbufferpool.h"
23 : #include "ringbuffer.h"
24 : #include "ring_types.h" // for SIZEBUF
25 : #include "logger.h"
26 :
27 : #include <limits>
28 : #include <utility> // for std::pair
29 : #include <cstring>
30 : #include <algorithm>
31 :
32 : namespace jami {
33 :
34 : const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
35 :
36 38 : RingBufferPool::RingBufferPool()
37 38 : : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
38 38 : {}
39 :
40 38 : RingBufferPool::~RingBufferPool()
41 : {
42 38 : readBindingsMap_.clear();
43 38 : defaultRingBuffer_.reset();
44 :
45 : // Verify ringbuffer not removed yet
46 : // XXXX: With a good design this should never happen! :-P
47 226 : for (const auto& item : ringBufferMap_) {
48 188 : const auto& weak = item.second;
49 188 : if (not weak.expired())
50 0 : JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
51 : }
52 38 : }
53 :
54 : void
55 0 : RingBufferPool::setInternalSamplingRate(unsigned sr)
56 : {
57 0 : std::lock_guard lk(stateLock_);
58 :
59 0 : if (sr != internalAudioFormat_.sample_rate) {
60 0 : flushAllBuffers();
61 0 : internalAudioFormat_.sample_rate = sr;
62 : }
63 0 : }
64 :
65 : void
66 0 : RingBufferPool::setInternalAudioFormat(AudioFormat format)
67 : {
68 0 : std::lock_guard lk(stateLock_);
69 :
70 0 : if (format != internalAudioFormat_) {
71 0 : flushAllBuffers();
72 0 : internalAudioFormat_ = format;
73 0 : for (auto& wrb : ringBufferMap_)
74 0 : if (auto rb = wrb.second.lock())
75 0 : rb->setFormat(internalAudioFormat_);
76 : }
77 0 : }
78 :
79 : std::shared_ptr<RingBuffer>
80 2724 : RingBufferPool::getRingBuffer(const std::string& id)
81 : {
82 2724 : std::lock_guard lk(stateLock_);
83 :
84 2724 : const auto& it = ringBufferMap_.find(id);
85 2724 : if (it != ringBufferMap_.cend()) {
86 2236 : if (const auto& sptr = it->second.lock())
87 2236 : return sptr;
88 7 : ringBufferMap_.erase(it);
89 : }
90 :
91 495 : return nullptr;
92 2724 : }
93 :
94 : std::shared_ptr<RingBuffer>
95 0 : RingBufferPool::getRingBuffer(const std::string& id) const
96 : {
97 0 : std::lock_guard lk(stateLock_);
98 :
99 0 : const auto& it = ringBufferMap_.find(id);
100 0 : if (it != ringBufferMap_.cend())
101 0 : return it->second.lock();
102 :
103 0 : return nullptr;
104 0 : }
105 :
106 : std::shared_ptr<RingBuffer>
107 866 : RingBufferPool::createRingBuffer(const std::string& id)
108 : {
109 866 : std::lock_guard lk(stateLock_);
110 :
111 866 : auto rbuf = getRingBuffer(id);
112 866 : if (rbuf) {
113 1113 : JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
114 371 : return rbuf;
115 : }
116 :
117 495 : rbuf.reset(new RingBuffer(id, SIZEBUF, internalAudioFormat_));
118 495 : ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
119 495 : return rbuf;
120 866 : }
121 :
122 : const RingBufferPool::ReadBindings*
123 0 : RingBufferPool::getReadBindings(const std::string& ringbufferId) const
124 : {
125 0 : const auto& iter = readBindingsMap_.find(ringbufferId);
126 0 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
127 : }
128 :
129 : RingBufferPool::ReadBindings*
130 46003 : RingBufferPool::getReadBindings(const std::string& ringbufferId)
131 : {
132 46003 : const auto& iter = readBindingsMap_.find(ringbufferId);
133 46003 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
134 : }
135 :
136 : void
137 347 : RingBufferPool::removeReadBindings(const std::string& ringbufferId)
138 : {
139 347 : if (not readBindingsMap_.erase(ringbufferId))
140 0 : JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
141 347 : }
142 :
143 : /**
144 : * Make given ringbuffer a reader of given ring buffer
145 : */
146 : void
147 1293 : RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer>& rbuf,
148 : const std::string& ringbufferId)
149 : {
150 1293 : if (ringbufferId != DEFAULT_ID and rbuf->getId() == ringbufferId)
151 30 : JAMI_WARNING("RingBuffer has a readoffset on itself");
152 :
153 1293 : rbuf->createReadOffset(ringbufferId);
154 1293 : readBindingsMap_[ringbufferId].insert(rbuf); // bindings list created if not existing
155 3879 : JAMI_DEBUG("Bind rbuf '{}' to ringbuffer '{}'", rbuf->getId(), ringbufferId);
156 1293 : }
157 :
158 : void
159 794 : RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer>& rbuf,
160 : const std::string& ringbufferId)
161 : {
162 794 : if (auto bindings = getReadBindings(ringbufferId)) {
163 777 : bindings->erase(rbuf);
164 777 : if (bindings->empty())
165 347 : removeReadBindings(ringbufferId);
166 : }
167 :
168 794 : rbuf->removeReadOffset(ringbufferId);
169 794 : }
170 :
171 : void
172 537 : RingBufferPool::bindRingbuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
173 : {
174 1611 : JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
175 :
176 537 : const auto& rb1 = getRingBuffer(ringbufferId1);
177 537 : if (not rb1) {
178 0 : JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
179 0 : return;
180 : }
181 :
182 537 : const auto& rb2 = getRingBuffer(ringbufferId2);
183 537 : if (not rb2) {
184 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
185 0 : return;
186 : }
187 :
188 537 : std::lock_guard lk(stateLock_);
189 :
190 537 : addReaderToRingBuffer(rb1, ringbufferId2);
191 537 : addReaderToRingBuffer(rb2, ringbufferId1);
192 537 : }
193 :
194 : void
195 219 : RingBufferPool::bindHalfDuplexOut(const std::string& processId, const std::string& ringbufferId)
196 : {
197 : /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
198 : * do nothing */
199 219 : if (const auto& rb = getRingBuffer(ringbufferId)) {
200 219 : std::lock_guard lk(stateLock_);
201 :
202 219 : addReaderToRingBuffer(rb, processId);
203 438 : }
204 219 : }
205 :
206 : void
207 0 : RingBufferPool::unbindRingbuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
208 : {
209 0 : JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
210 :
211 0 : const auto& rb1 = getRingBuffer(ringbufferId1);
212 0 : if (not rb1) {
213 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
214 0 : return;
215 : }
216 :
217 0 : const auto& rb2 = getRingBuffer(ringbufferId2);
218 0 : if (not rb2) {
219 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
220 0 : return;
221 : }
222 :
223 0 : std::lock_guard lk(stateLock_);
224 :
225 0 : removeReaderFromRingBuffer(rb1, ringbufferId2);
226 0 : removeReaderFromRingBuffer(rb2, ringbufferId1);
227 0 : }
228 :
229 : void
230 38 : RingBufferPool::unBindHalfDuplexOut(const std::string& process_id, const std::string& ringbufferId)
231 : {
232 38 : std::lock_guard lk(stateLock_);
233 :
234 38 : if (const auto& rb = getRingBuffer(ringbufferId))
235 38 : removeReaderFromRingBuffer(rb, process_id);
236 38 : }
237 :
238 : void
239 7 : RingBufferPool::unBindAllHalfDuplexOut(const std::string& ringbufferId)
240 : {
241 7 : const auto& rb = getRingBuffer(ringbufferId);
242 7 : if (not rb) {
243 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
244 0 : return;
245 : }
246 :
247 7 : std::lock_guard lk(stateLock_);
248 :
249 7 : auto bindings = getReadBindings(ringbufferId);
250 7 : if (not bindings)
251 0 : return;
252 :
253 7 : const auto bindings_copy = *bindings; // temporary copy
254 29 : for (const auto& rbuf : bindings_copy) {
255 22 : removeReaderFromRingBuffer(rb, rbuf->getId());
256 : }
257 7 : }
258 :
259 : void
260 484 : RingBufferPool::unBindAll(const std::string& ringbufferId)
261 : {
262 1452 : JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
263 :
264 484 : const auto& rb = getRingBuffer(ringbufferId);
265 484 : if (not rb) {
266 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
267 0 : return;
268 : }
269 :
270 484 : std::lock_guard lk(stateLock_);
271 :
272 484 : auto bindings = getReadBindings(ringbufferId);
273 484 : if (not bindings)
274 173 : return;
275 :
276 311 : const auto bindings_copy = *bindings; // temporary copy
277 678 : for (const auto& rbuf : bindings_copy) {
278 367 : removeReaderFromRingBuffer(rbuf, ringbufferId);
279 367 : removeReaderFromRingBuffer(rb, rbuf->getId());
280 : }
281 657 : }
282 :
283 : std::shared_ptr<AudioFrame>
284 44096 : RingBufferPool::getData(const std::string& ringbufferId)
285 : {
286 44096 : std::lock_guard lk(stateLock_);
287 :
288 44096 : const auto bindings = getReadBindings(ringbufferId);
289 44096 : if (not bindings)
290 339 : return {};
291 :
292 : // No mixing
293 43757 : if (bindings->size() == 1)
294 68700 : return (*bindings->cbegin())->get(ringbufferId);
295 :
296 9407 : auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
297 9407 : auto mixed = false;
298 32090 : for (const auto& rbuf : *bindings) {
299 22683 : if (auto b = rbuf->get(ringbufferId)) {
300 0 : mixed = true;
301 0 : mixBuffer->mix(*b);
302 :
303 : // voice is true if any of mixed frames has voice
304 0 : mixBuffer->has_voice |= b->has_voice;
305 22683 : }
306 : }
307 :
308 9407 : return mixed ? mixBuffer : nullptr;
309 44096 : }
310 :
311 : bool
312 0 : RingBufferPool::waitForDataAvailable(const std::string& ringbufferId,
313 : const std::chrono::microseconds& max_wait) const
314 : {
315 0 : std::unique_lock<std::recursive_mutex> lk(stateLock_);
316 :
317 : // convert to absolute time
318 0 : const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
319 :
320 0 : auto bindings = getReadBindings(ringbufferId);
321 0 : if (not bindings)
322 0 : return 0;
323 :
324 0 : const auto bindings_copy = *bindings; // temporary copy
325 0 : for (const auto& rbuf : bindings_copy) {
326 0 : lk.unlock();
327 0 : if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
328 0 : return false;
329 0 : lk.lock();
330 : }
331 0 : return true;
332 0 : }
333 :
334 : std::shared_ptr<AudioFrame>
335 0 : RingBufferPool::getAvailableData(const std::string& ringbufferId)
336 : {
337 0 : std::lock_guard lk(stateLock_);
338 :
339 0 : auto bindings = getReadBindings(ringbufferId);
340 0 : if (not bindings)
341 0 : return 0;
342 :
343 : // No mixing
344 0 : if (bindings->size() == 1) {
345 0 : return (*bindings->cbegin())->get(ringbufferId);
346 : }
347 :
348 0 : size_t availableFrames = 0;
349 :
350 0 : for (const auto& rbuf : *bindings)
351 0 : availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
352 :
353 0 : if (availableFrames == 0)
354 0 : return {};
355 :
356 0 : auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
357 0 : for (const auto& rbuf : *bindings) {
358 0 : if (auto b = rbuf->get(ringbufferId)) {
359 0 : buf->mix(*b);
360 :
361 : // voice is true if any of mixed frames has voice
362 0 : buf->has_voice |= b->has_voice;
363 0 : }
364 : }
365 :
366 0 : return buf;
367 0 : }
368 :
369 : size_t
370 0 : RingBufferPool::availableForGet(const std::string& ringbufferId) const
371 : {
372 0 : std::lock_guard lk(stateLock_);
373 :
374 0 : const auto bindings = getReadBindings(ringbufferId);
375 0 : if (not bindings)
376 0 : return 0;
377 :
378 : // No mixing
379 0 : if (bindings->size() == 1) {
380 0 : return (*bindings->begin())->availableForGet(ringbufferId);
381 : }
382 :
383 0 : size_t availableSamples = std::numeric_limits<size_t>::max();
384 :
385 0 : for (const auto& rbuf : *bindings) {
386 0 : const size_t nbSamples = rbuf->availableForGet(ringbufferId);
387 0 : if (nbSamples != 0)
388 0 : availableSamples = std::min(availableSamples, nbSamples);
389 : }
390 :
391 0 : return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
392 0 : }
393 :
394 : size_t
395 0 : RingBufferPool::discard(size_t toDiscard, const std::string& ringbufferId)
396 : {
397 0 : std::lock_guard lk(stateLock_);
398 :
399 0 : const auto bindings = getReadBindings(ringbufferId);
400 0 : if (not bindings)
401 0 : return 0;
402 :
403 0 : for (const auto& rbuf : *bindings)
404 0 : rbuf->discard(toDiscard, ringbufferId);
405 :
406 0 : return toDiscard;
407 0 : }
408 :
409 : void
410 622 : RingBufferPool::flush(const std::string& ringbufferId)
411 : {
412 622 : std::lock_guard lk(stateLock_);
413 :
414 622 : const auto bindings = getReadBindings(ringbufferId);
415 622 : if (not bindings)
416 136 : return;
417 :
418 7060 : for (const auto& rbuf : *bindings)
419 6574 : rbuf->flush(ringbufferId);
420 622 : }
421 :
422 : void
423 292 : RingBufferPool::flushAllBuffers()
424 : {
425 292 : std::lock_guard lk(stateLock_);
426 :
427 7287 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
428 6995 : if (const auto rb = item->second.lock()) {
429 6695 : rb->flushAll();
430 6695 : ++item;
431 : } else {
432 : // Use this version of erase to avoid using invalidated iterator
433 300 : item = ringBufferMap_.erase(item);
434 6995 : }
435 : }
436 292 : }
437 :
438 : bool
439 0 : RingBufferPool::isAudioMeterActive(const std::string& id)
440 : {
441 0 : std::lock_guard lk(stateLock_);
442 0 : if (!id.empty()) {
443 0 : if (auto rb = getRingBuffer(id)) {
444 0 : return rb->isAudioMeterActive();
445 0 : }
446 : } else {
447 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
448 0 : if (const auto rb = item->second.lock()) {
449 0 : if (rb->isAudioMeterActive()) {
450 0 : return true;
451 : }
452 0 : }
453 : }
454 : }
455 0 : return false;
456 0 : }
457 :
458 : void
459 0 : RingBufferPool::setAudioMeterState(const std::string& id, bool state)
460 : {
461 0 : std::lock_guard lk(stateLock_);
462 0 : if (!id.empty()) {
463 0 : if (auto rb = getRingBuffer(id)) {
464 0 : rb->setAudioMeterState(state);
465 0 : }
466 : } else {
467 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
468 0 : if (const auto rb = item->second.lock()) {
469 0 : rb->setAudioMeterState(state);
470 0 : }
471 : }
472 : }
473 0 : }
474 :
475 : } // namespace jami
|