Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "ringbufferpool.h"
19 : #include "ringbuffer.h"
20 : #include "ring_types.h" // for SIZEBUF
21 : #include "logger.h"
22 :
23 : #include <limits>
24 : #include <utility> // for std::pair
25 : #include <cstring>
26 : #include <algorithm>
27 :
28 : namespace jami {
29 :
30 : const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
31 :
32 38 : RingBufferPool::RingBufferPool()
33 38 : : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
34 38 : {}
35 :
36 38 : RingBufferPool::~RingBufferPool()
37 : {
38 38 : readBindingsMap_.clear();
39 38 : defaultRingBuffer_.reset();
40 :
41 : // Verify ringbuffer not removed yet
42 : // XXXX: With a good design this should never happen! :-P
43 196 : for (const auto& item : ringBufferMap_) {
44 158 : const auto& weak = item.second;
45 158 : if (not weak.expired())
46 0 : JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
47 : }
48 38 : }
49 :
50 : void
51 0 : RingBufferPool::setInternalSamplingRate(unsigned sr)
52 : {
53 0 : std::lock_guard lk(stateLock_);
54 :
55 0 : if (sr != internalAudioFormat_.sample_rate) {
56 0 : flushAllBuffers();
57 0 : internalAudioFormat_.sample_rate = sr;
58 : }
59 0 : }
60 :
61 : void
62 0 : RingBufferPool::setInternalAudioFormat(AudioFormat format)
63 : {
64 0 : std::lock_guard lk(stateLock_);
65 :
66 0 : if (format != internalAudioFormat_) {
67 0 : flushAllBuffers();
68 0 : internalAudioFormat_ = format;
69 0 : for (auto& wrb : ringBufferMap_)
70 0 : if (auto rb = wrb.second.lock())
71 0 : rb->setFormat(internalAudioFormat_);
72 : }
73 0 : }
74 :
75 : std::shared_ptr<RingBuffer>
76 2443 : RingBufferPool::getRingBuffer(const std::string& id)
77 : {
78 2443 : std::lock_guard lk(stateLock_);
79 :
80 2443 : const auto& it = ringBufferMap_.find(id);
81 2443 : if (it != ringBufferMap_.cend()) {
82 1968 : if (const auto& sptr = it->second.lock())
83 1968 : return sptr;
84 14 : ringBufferMap_.erase(it);
85 : }
86 :
87 489 : return nullptr;
88 2443 : }
89 :
90 : std::shared_ptr<RingBuffer>
91 0 : RingBufferPool::getRingBuffer(const std::string& id) const
92 : {
93 0 : std::lock_guard lk(stateLock_);
94 :
95 0 : const auto& it = ringBufferMap_.find(id);
96 0 : if (it != ringBufferMap_.cend())
97 0 : return it->second.lock();
98 :
99 0 : return nullptr;
100 0 : }
101 :
102 : std::shared_ptr<RingBuffer>
103 872 : RingBufferPool::createRingBuffer(const std::string& id)
104 : {
105 872 : std::lock_guard lk(stateLock_);
106 :
107 872 : auto rbuf = getRingBuffer(id);
108 872 : if (rbuf) {
109 1149 : JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
110 383 : return rbuf;
111 : }
112 :
113 489 : rbuf.reset(new RingBuffer(id, SIZEBUF, internalAudioFormat_));
114 489 : ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
115 489 : return rbuf;
116 872 : }
117 :
118 : const RingBufferPool::ReadBindings*
119 0 : RingBufferPool::getReadBindings(const std::string& ringbufferId) const
120 : {
121 0 : const auto& iter = readBindingsMap_.find(ringbufferId);
122 0 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
123 : }
124 :
125 : RingBufferPool::ReadBindings*
126 47912 : RingBufferPool::getReadBindings(const std::string& ringbufferId)
127 : {
128 47912 : const auto& iter = readBindingsMap_.find(ringbufferId);
129 47912 : return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
130 : }
131 :
132 : void
133 354 : RingBufferPool::removeReadBindings(const std::string& ringbufferId)
134 : {
135 354 : if (not readBindingsMap_.erase(ringbufferId))
136 0 : JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
137 354 : }
138 :
139 : /**
140 : * Make given ringbuffer a reader of given ring buffer
141 : */
142 : void
143 800 : RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer>& rbuf,
144 : const std::string& ringbufferId)
145 : {
146 800 : if (ringbufferId != DEFAULT_ID and rbuf->getId() == ringbufferId)
147 30 : JAMI_WARNING("RingBuffer has a readoffset on itself");
148 :
149 800 : rbuf->createReadOffset(ringbufferId);
150 800 : readBindingsMap_[ringbufferId].insert(rbuf); // bindings list created if not existing
151 2400 : JAMI_DEBUG("Bind rbuf '{}' to ringbuffer '{}'", rbuf->getId(), ringbufferId);
152 800 : }
153 :
154 : void
155 1257 : RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer>& rbuf,
156 : const std::string& ringbufferId)
157 : {
158 1257 : if (auto bindings = getReadBindings(ringbufferId)) {
159 741 : bindings->erase(rbuf);
160 741 : if (bindings->empty())
161 354 : removeReadBindings(ringbufferId);
162 : }
163 :
164 1257 : rbuf->removeReadOffset(ringbufferId);
165 1257 : }
166 :
167 : void
168 386 : RingBufferPool::bindRingbuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
169 : {
170 1158 : JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
171 :
172 386 : const auto& rb1 = getRingBuffer(ringbufferId1);
173 386 : if (not rb1) {
174 0 : JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
175 0 : return;
176 : }
177 :
178 386 : const auto& rb2 = getRingBuffer(ringbufferId2);
179 386 : if (not rb2) {
180 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
181 0 : return;
182 : }
183 :
184 386 : std::lock_guard lk(stateLock_);
185 :
186 386 : addReaderToRingBuffer(rb1, ringbufferId2);
187 386 : addReaderToRingBuffer(rb2, ringbufferId1);
188 386 : }
189 :
190 : void
191 28 : RingBufferPool::bindHalfDuplexOut(const std::string& processId, const std::string& ringbufferId)
192 : {
193 : /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
194 : * do nothing */
195 28 : if (const auto& rb = getRingBuffer(ringbufferId)) {
196 28 : std::lock_guard lk(stateLock_);
197 :
198 28 : addReaderToRingBuffer(rb, processId);
199 56 : }
200 28 : }
201 :
202 : void
203 71 : RingBufferPool::unbindRingbuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
204 : {
205 213 : JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
206 :
207 71 : const auto& rb1 = getRingBuffer(ringbufferId1);
208 71 : if (not rb1) {
209 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
210 0 : return;
211 : }
212 :
213 71 : const auto& rb2 = getRingBuffer(ringbufferId2);
214 71 : if (not rb2) {
215 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
216 0 : return;
217 : }
218 :
219 71 : std::lock_guard lk(stateLock_);
220 :
221 71 : removeReaderFromRingBuffer(rb1, ringbufferId2);
222 71 : removeReaderFromRingBuffer(rb2, ringbufferId1);
223 71 : }
224 :
225 : void
226 38 : RingBufferPool::unBindHalfDuplexOut(const std::string& process_id, const std::string& ringbufferId)
227 : {
228 38 : std::lock_guard lk(stateLock_);
229 :
230 38 : if (const auto& rb = getRingBuffer(ringbufferId))
231 38 : removeReaderFromRingBuffer(rb, process_id);
232 38 : }
233 :
234 : void
235 95 : RingBufferPool::unBindAllHalfDuplexOut(const std::string& ringbufferId)
236 : {
237 95 : const auto& rb = getRingBuffer(ringbufferId);
238 95 : if (not rb) {
239 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
240 0 : return;
241 : }
242 :
243 95 : std::lock_guard lk(stateLock_);
244 :
245 95 : auto bindings = getReadBindings(ringbufferId);
246 95 : if (not bindings)
247 54 : return;
248 :
249 41 : const auto bindings_copy = *bindings; // temporary copy
250 614 : for (const auto& rbuf : bindings_copy) {
251 573 : removeReaderFromRingBuffer(rb, rbuf->getId());
252 : }
253 149 : }
254 :
255 : void
256 460 : RingBufferPool::unBindAll(const std::string& ringbufferId)
257 : {
258 1380 : JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
259 :
260 460 : const auto& rb = getRingBuffer(ringbufferId);
261 460 : if (not rb) {
262 0 : JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
263 0 : return;
264 : }
265 :
266 460 : std::lock_guard lk(stateLock_);
267 :
268 460 : auto bindings = getReadBindings(ringbufferId);
269 460 : if (not bindings)
270 234 : return;
271 :
272 226 : const auto bindings_copy = *bindings; // temporary copy
273 478 : for (const auto& rbuf : bindings_copy) {
274 252 : removeReaderFromRingBuffer(rbuf, ringbufferId);
275 252 : removeReaderFromRingBuffer(rb, rbuf->getId());
276 : }
277 694 : }
278 :
279 : std::shared_ptr<AudioFrame>
280 45522 : RingBufferPool::getData(const std::string& ringbufferId)
281 : {
282 45522 : std::lock_guard lk(stateLock_);
283 :
284 45522 : const auto bindings = getReadBindings(ringbufferId);
285 45522 : if (not bindings)
286 522 : return {};
287 :
288 : // No mixing
289 45000 : if (bindings->size() == 1)
290 70132 : return (*bindings->cbegin())->get(ringbufferId);
291 :
292 9934 : auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
293 9934 : auto mixed = false;
294 33647 : for (const auto& rbuf : *bindings) {
295 23713 : if (auto b = rbuf->get(ringbufferId)) {
296 0 : mixed = true;
297 0 : mixBuffer->mix(*b);
298 :
299 : // voice is true if any of mixed frames has voice
300 0 : mixBuffer->has_voice |= b->has_voice;
301 23713 : }
302 : }
303 :
304 9934 : return mixed ? mixBuffer : nullptr;
305 45522 : }
306 :
307 : bool
308 0 : RingBufferPool::waitForDataAvailable(const std::string& ringbufferId,
309 : const std::chrono::microseconds& max_wait) const
310 : {
311 0 : std::unique_lock<std::recursive_mutex> lk(stateLock_);
312 :
313 : // convert to absolute time
314 0 : const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
315 :
316 0 : auto bindings = getReadBindings(ringbufferId);
317 0 : if (not bindings)
318 0 : return 0;
319 :
320 0 : const auto bindings_copy = *bindings; // temporary copy
321 0 : for (const auto& rbuf : bindings_copy) {
322 0 : lk.unlock();
323 0 : if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
324 0 : return false;
325 0 : lk.lock();
326 : }
327 0 : return true;
328 0 : }
329 :
330 : std::shared_ptr<AudioFrame>
331 0 : RingBufferPool::getAvailableData(const std::string& ringbufferId)
332 : {
333 0 : std::lock_guard lk(stateLock_);
334 :
335 0 : auto bindings = getReadBindings(ringbufferId);
336 0 : if (not bindings)
337 0 : return 0;
338 :
339 : // No mixing
340 0 : if (bindings->size() == 1) {
341 0 : return (*bindings->cbegin())->get(ringbufferId);
342 : }
343 :
344 0 : size_t availableFrames = 0;
345 :
346 0 : for (const auto& rbuf : *bindings)
347 0 : availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
348 :
349 0 : if (availableFrames == 0)
350 0 : return {};
351 :
352 0 : auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
353 0 : for (const auto& rbuf : *bindings) {
354 0 : if (auto b = rbuf->get(ringbufferId)) {
355 0 : buf->mix(*b);
356 :
357 : // voice is true if any of mixed frames has voice
358 0 : buf->has_voice |= b->has_voice;
359 0 : }
360 : }
361 :
362 0 : return buf;
363 0 : }
364 :
365 : size_t
366 0 : RingBufferPool::availableForGet(const std::string& ringbufferId) const
367 : {
368 0 : std::lock_guard lk(stateLock_);
369 :
370 0 : const auto bindings = getReadBindings(ringbufferId);
371 0 : if (not bindings)
372 0 : return 0;
373 :
374 : // No mixing
375 0 : if (bindings->size() == 1) {
376 0 : return (*bindings->begin())->availableForGet(ringbufferId);
377 : }
378 :
379 0 : size_t availableSamples = std::numeric_limits<size_t>::max();
380 :
381 0 : for (const auto& rbuf : *bindings) {
382 0 : const size_t nbSamples = rbuf->availableForGet(ringbufferId);
383 0 : if (nbSamples != 0)
384 0 : availableSamples = std::min(availableSamples, nbSamples);
385 : }
386 :
387 0 : return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
388 0 : }
389 :
390 : size_t
391 0 : RingBufferPool::discard(size_t toDiscard, const std::string& ringbufferId)
392 : {
393 0 : std::lock_guard lk(stateLock_);
394 :
395 0 : const auto bindings = getReadBindings(ringbufferId);
396 0 : if (not bindings)
397 0 : return 0;
398 :
399 0 : for (const auto& rbuf : *bindings)
400 0 : rbuf->discard(toDiscard, ringbufferId);
401 :
402 0 : return toDiscard;
403 0 : }
404 :
405 : void
406 578 : RingBufferPool::flush(const std::string& ringbufferId)
407 : {
408 578 : std::lock_guard lk(stateLock_);
409 :
410 578 : const auto bindings = getReadBindings(ringbufferId);
411 578 : if (not bindings)
412 189 : return;
413 :
414 6792 : for (const auto& rbuf : *bindings)
415 6403 : rbuf->flush(ringbufferId);
416 578 : }
417 :
418 : void
419 301 : RingBufferPool::flushAllBuffers()
420 : {
421 301 : std::lock_guard lk(stateLock_);
422 :
423 6795 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
424 6494 : if (const auto rb = item->second.lock()) {
425 6177 : rb->flushAll();
426 6177 : ++item;
427 : } else {
428 : // Use this version of erase to avoid using invalidated iterator
429 317 : item = ringBufferMap_.erase(item);
430 6494 : }
431 : }
432 301 : }
433 :
434 : bool
435 0 : RingBufferPool::isAudioMeterActive(const std::string& id)
436 : {
437 0 : std::lock_guard lk(stateLock_);
438 0 : if (!id.empty()) {
439 0 : if (auto rb = getRingBuffer(id)) {
440 0 : return rb->isAudioMeterActive();
441 0 : }
442 : } else {
443 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
444 0 : if (const auto rb = item->second.lock()) {
445 0 : if (rb->isAudioMeterActive()) {
446 0 : return true;
447 : }
448 0 : }
449 : }
450 : }
451 0 : return false;
452 0 : }
453 :
454 : void
455 0 : RingBufferPool::setAudioMeterState(const std::string& id, bool state)
456 : {
457 0 : std::lock_guard lk(stateLock_);
458 0 : if (!id.empty()) {
459 0 : if (auto rb = getRingBuffer(id)) {
460 0 : rb->setAudioMeterState(state);
461 0 : }
462 : } else {
463 0 : for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
464 0 : if (const auto rb = item->second.lock()) {
465 0 : rb->setAudioMeterState(state);
466 0 : }
467 : }
468 : }
469 0 : }
470 :
471 : } // namespace jami
|