Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Alexandre Savard <alexandre.savard@savoirfairelinux.com>
5 : * Author: Yan Morin <yan.morin@savoirfairelinux.com>
6 : * Author: Laurielle Lea <laurielle.lea@savoirfairelinux.com>
7 : * Author: Adrien Beraud <adrien.beraud@gmail.com>
8 : *
9 : * Portions (c) Dominic Mazzoni (Audacity)
10 : *
11 : * This program is free software; you can redistribute it and/or modify
12 : * it under the terms of the GNU General Public License as published by
13 : * the Free Software Foundation; either version 3 of the License, or
14 : * (at your option) any later version.
15 : *
16 : * This program is distributed in the hope that it will be useful,
17 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 : * GNU General Public License for more details.
20 : *
21 : * You should have received a copy of the GNU General Public License
22 : * along with this program; if not, write to the Free Software
23 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 : */
25 :
26 : #include "ringbuffer.h"
27 : #include "logger.h"
28 : #include "client/ring_signal.h"
29 : #include "media_buffer.h"
30 : #include "libav_deps.h"
31 :
32 : #include <chrono>
33 : #include <cstdlib>
34 : #include <cstring>
35 : #include <algorithm>
36 :
37 : namespace jami {
38 :
39 : // corresponds to 160 ms (about 5 rtp packets)
40 : static const size_t MIN_BUFFER_SIZE = 1024;
41 :
42 : static constexpr const int RMS_SIGNAL_INTERVAL = 5;
43 :
44 475 : RingBuffer::RingBuffer(const std::string& rbuf_id, size_t /*size*/, AudioFormat format)
45 475 : : id(rbuf_id)
46 475 : , endPos_(0)
47 475 : , format_(format)
48 475 : , lock_()
49 475 : , not_empty_()
50 475 : , readoffsets_()
51 475 : , resizer_(format_, format_.sample_rate / 50, [this](std::shared_ptr<AudioFrame>&& frame) {
52 0 : putToBuffer(std::move(frame));
53 1900 : })
54 : {
55 1425 : JAMI_LOG("Create new RingBuffer {}", id);
56 475 : }
57 :
58 950 : RingBuffer::~RingBuffer()
59 : {
60 1425 : JAMI_LOG("Destroy RingBuffer {}", id);
61 475 : }
62 :
63 : void
64 6355 : RingBuffer::flush(const std::string& ringbufferId)
65 : {
66 6355 : storeReadOffset(endPos_, ringbufferId);
67 6355 : }
68 :
69 : void
70 6935 : RingBuffer::flushAll()
71 : {
72 13659 : for (auto& offset : readoffsets_)
73 6724 : offset.second.offset = endPos_;
74 6935 : }
75 :
76 : size_t
77 0 : RingBuffer::putLength() const
78 : {
79 0 : const size_t buffer_size = buffer_.size();
80 0 : if (buffer_size == 0)
81 0 : return 0;
82 0 : const size_t startPos = getSmallestReadOffset();
83 0 : return (endPos_ + buffer_size - startPos) % buffer_size;
84 : }
85 :
86 : size_t
87 5343 : RingBuffer::getLength(const std::string& ringbufferId) const
88 : {
89 5343 : const size_t buffer_size = buffer_.size();
90 5343 : if (buffer_size == 0)
91 0 : return 0;
92 5343 : return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size;
93 : }
94 :
95 : void
96 0 : RingBuffer::debug()
97 : {
98 0 : JAMI_DBG("Start=%zu; End=%zu; BufferSize=%zu", getSmallestReadOffset(), endPos_, buffer_.size());
99 0 : }
100 :
101 : size_t
102 5343 : RingBuffer::getReadOffset(const std::string& ringbufferId) const
103 : {
104 5343 : auto iter = readoffsets_.find(ringbufferId);
105 5343 : return (iter != readoffsets_.end()) ? iter->second.offset : 0;
106 : }
107 :
108 : size_t
109 0 : RingBuffer::getSmallestReadOffset() const
110 : {
111 0 : if (hasNoReadOffsets())
112 0 : return 0;
113 0 : size_t smallest = buffer_.size();
114 0 : for (auto const& iter : readoffsets_)
115 0 : smallest = std::min(smallest, iter.second.offset);
116 0 : return smallest;
117 : }
118 :
119 : void
120 6355 : RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId)
121 : {
122 6355 : ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId);
123 :
124 6355 : if (iter != readoffsets_.end())
125 6355 : iter->second.offset = offset;
126 : else
127 0 : JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId);
128 6355 : }
129 :
130 : void
131 1158 : RingBuffer::createReadOffset(const std::string& ringbufferId)
132 : {
133 1158 : std::lock_guard l(lock_);
134 1158 : if (!hasThisReadOffset(ringbufferId))
135 774 : readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}});
136 1158 : }
137 :
138 : void
139 676 : RingBuffer::removeReadOffset(const std::string& ringbufferId)
140 : {
141 676 : std::lock_guard l(lock_);
142 676 : auto iter = readoffsets_.find(ringbufferId);
143 676 : if (iter != readoffsets_.end())
144 653 : readoffsets_.erase(iter);
145 676 : }
146 :
147 : bool
148 1158 : RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const
149 : {
150 1158 : return readoffsets_.find(ringbufferId) != readoffsets_.end();
151 : }
152 :
153 : bool
154 0 : RingBuffer::hasNoReadOffsets() const
155 : {
156 0 : return readoffsets_.empty();
157 : }
158 :
159 : //
160 : // For the writer only:
161 : //
162 :
163 : void
164 0 : RingBuffer::put(std::shared_ptr<AudioFrame>&& data)
165 : {
166 0 : std::lock_guard l(writeLock_);
167 0 : resizer_.enqueue(resampler_.resample(std::move(data), format_));
168 0 : }
169 :
170 : // This one puts some data inside the ring buffer.
171 : void
172 0 : RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data)
173 : {
174 0 : std::lock_guard l(lock_);
175 0 : const size_t buffer_size = buffer_.size();
176 0 : if (buffer_size == 0)
177 0 : return;
178 :
179 0 : size_t len = buffer_size - putLength();
180 0 : if (len == 0)
181 0 : discard(1);
182 :
183 0 : size_t pos = endPos_;
184 :
185 0 : buffer_[pos] = std::move(data);
186 0 : const auto& newBuf = buffer_[pos];
187 0 : pos = (pos + 1) % buffer_size;
188 :
189 0 : endPos_ = pos;
190 :
191 0 : if (rmsSignal_) {
192 0 : ++rmsFrameCount_;
193 0 : rmsLevel_ += newBuf->calcRMS();
194 0 : if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) {
195 0 : emitSignal<libjami::AudioSignal::AudioMeter>(id, rmsLevel_ / RMS_SIGNAL_INTERVAL);
196 0 : rmsLevel_ = 0;
197 0 : rmsFrameCount_ = 0;
198 : }
199 : }
200 :
201 0 : for (auto& offset : readoffsets_) {
202 0 : if (offset.second.callback)
203 0 : offset.second.callback(newBuf);
204 : }
205 :
206 0 : not_empty_.notify_all();
207 0 : }
208 :
209 : //
210 : // For the reader only:
211 : //
212 :
213 : size_t
214 0 : RingBuffer::availableForGet(const std::string& ringbufferId) const
215 : {
216 : // Used space
217 0 : return getLength(ringbufferId);
218 : }
219 :
220 : std::shared_ptr<AudioFrame>
221 57338 : RingBuffer::get(const std::string& ringbufferId)
222 : {
223 57338 : std::lock_guard l(lock_);
224 :
225 57338 : auto offset = readoffsets_.find(ringbufferId);
226 57338 : if (offset == readoffsets_.end())
227 0 : return {};
228 :
229 57338 : const size_t buffer_size = buffer_.size();
230 57338 : if (buffer_size == 0)
231 0 : return {};
232 :
233 57338 : size_t startPos = offset->second.offset;
234 57338 : size_t len = (endPos_ + buffer_size - startPos) % buffer_size;
235 57338 : if (len == 0)
236 57338 : return {};
237 :
238 0 : auto ret = buffer_[startPos];
239 0 : offset->second.offset = (startPos + 1) % buffer_size;
240 0 : return ret;
241 57338 : }
242 :
243 : size_t
244 0 : RingBuffer::waitForDataAvailable(const std::string& ringbufferId, const time_point& deadline) const
245 : {
246 0 : std::unique_lock l(lock_);
247 :
248 0 : if (buffer_.empty())
249 0 : return 0;
250 0 : if (readoffsets_.find(ringbufferId) == readoffsets_.end())
251 0 : return 0;
252 :
253 0 : size_t getl = 0;
254 0 : auto check = [=, &getl] {
255 : // Re-find read_ptr: it may be destroyed during the wait
256 0 : const size_t buffer_size = buffer_.size();
257 0 : const auto read_ptr = readoffsets_.find(ringbufferId);
258 0 : if (buffer_size == 0 || read_ptr == readoffsets_.end())
259 0 : return true;
260 0 : getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size;
261 0 : return getl != 0;
262 0 : };
263 :
264 0 : if (deadline == time_point::max()) {
265 : // no timeout provided, wait as long as necessary
266 0 : not_empty_.wait(l, check);
267 : } else {
268 0 : not_empty_.wait_until(l, deadline, check);
269 : }
270 :
271 0 : return getl;
272 0 : }
273 :
274 : size_t
275 0 : RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId)
276 : {
277 0 : std::lock_guard l(lock_);
278 :
279 0 : const size_t buffer_size = buffer_.size();
280 0 : if (buffer_size == 0)
281 0 : return 0;
282 :
283 0 : auto offset = readoffsets_.find(ringbufferId);
284 0 : if (offset == readoffsets_.end())
285 0 : return 0;
286 :
287 0 : size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size;
288 0 : toDiscard = std::min(toDiscard, len);
289 :
290 0 : offset->second.offset = (offset->second.offset + toDiscard) % buffer_size;
291 0 : return toDiscard;
292 0 : }
293 :
294 : size_t
295 0 : RingBuffer::discard(size_t toDiscard)
296 : {
297 0 : const size_t buffer_size = buffer_.size();
298 0 : if (buffer_size == 0)
299 0 : return 0;
300 :
301 0 : for (auto& r : readoffsets_) {
302 0 : size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size;
303 0 : if (dst < toDiscard)
304 0 : r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size;
305 : }
306 0 : return toDiscard;
307 : }
308 :
309 : } // namespace jami
|