Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "ringbuffer.h"
19 : #include "logger.h"
20 : #include "client/jami_signal.h"
21 : #include "media_buffer.h"
22 : #include "libav_deps.h"
23 :
24 : #include <chrono>
25 : #include <cstdlib>
26 : #include <cstring>
27 : #include <algorithm>
28 :
29 : namespace jami {
30 :
31 : static constexpr const int RMS_SIGNAL_INTERVAL = 5;
32 :
33 516 : RingBuffer::RingBuffer(const std::string& rbuf_id, AudioFormat format)
34 516 : : id(rbuf_id)
35 516 : , endPos_(0)
36 516 : , format_(format)
37 516 : , lock_()
38 516 : , not_empty_()
39 516 : , readoffsets_()
40 516 : , resizer_(format_, static_cast<int>(format_.sample_rate) / 50, [this](std::shared_ptr<AudioFrame>&& frame) {
41 0 : putToBuffer(std::move(frame));
42 3096 : })
43 : {
44 2064 : JAMI_LOG("Create new RingBuffer {}", id);
45 516 : }
46 :
47 1032 : RingBuffer::~RingBuffer()
48 : {
49 2064 : JAMI_LOG("Destroy RingBuffer {}", id);
50 516 : }
51 :
52 : void
53 1816 : RingBuffer::flush(const std::string& ringbufferId)
54 : {
55 1816 : storeReadOffset(endPos_, ringbufferId);
56 1816 : }
57 :
58 : void
59 2714 : RingBuffer::flushAll()
60 : {
61 5201 : for (auto& offset : readoffsets_)
62 2487 : offset.second.offset = endPos_;
63 2714 : }
64 :
65 : std::vector<std::string>
66 95 : RingBuffer::getSubscribers()
67 : {
68 95 : std::vector<std::string> subscribers;
69 259 : for (const auto& offset : readoffsets_) {
70 164 : subscribers.push_back(offset.first);
71 : }
72 95 : return subscribers;
73 0 : }
74 :
75 : size_t
76 0 : RingBuffer::putLength() const
77 : {
78 0 : const size_t buffer_size = buffer_.size();
79 0 : if (buffer_size == 0)
80 0 : return 0;
81 0 : const size_t startPos = getSmallestReadOffset();
82 0 : return (endPos_ + buffer_size - startPos) % buffer_size;
83 : }
84 :
85 : size_t
86 10 : RingBuffer::getLength(const std::string& ringbufferId) const
87 : {
88 10 : const size_t buffer_size = buffer_.size();
89 10 : if (buffer_size == 0)
90 0 : return 0;
91 10 : return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size;
92 : }
93 :
94 : void
95 0 : RingBuffer::debug()
96 : {
97 0 : JAMI_LOG("Start={}; End={}; BufferSize={}", getSmallestReadOffset(), endPos_, buffer_.size());
98 0 : }
99 :
100 : size_t
101 10 : RingBuffer::getReadOffset(const std::string& ringbufferId) const
102 : {
103 10 : auto iter = readoffsets_.find(ringbufferId);
104 10 : return (iter != readoffsets_.end()) ? iter->second.offset : 0;
105 : }
106 :
107 : size_t
108 0 : RingBuffer::getSmallestReadOffset() const
109 : {
110 0 : if (hasNoReadOffsets())
111 0 : return 0;
112 0 : size_t smallest = buffer_.size();
113 0 : for (auto const& iter : readoffsets_)
114 0 : smallest = std::min(smallest, iter.second.offset);
115 0 : return smallest;
116 : }
117 :
118 : void
119 1816 : RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId)
120 : {
121 1816 : ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId);
122 :
123 1816 : if (iter != readoffsets_.end())
124 1816 : iter->second.offset = offset;
125 : else
126 0 : JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId);
127 1816 : }
128 :
129 : void
130 978 : RingBuffer::createReadOffset(const std::string& ringbufferId)
131 : {
132 978 : std::lock_guard l(lock_);
133 978 : if (!hasThisReadOffset(ringbufferId))
134 780 : readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}});
135 978 : }
136 :
137 : void
138 1009 : RingBuffer::removeReadOffset(const std::string& ringbufferId)
139 : {
140 1009 : std::lock_guard l(lock_);
141 1009 : auto iter = readoffsets_.find(ringbufferId);
142 1009 : if (iter != readoffsets_.end())
143 737 : readoffsets_.erase(iter);
144 1009 : }
145 :
146 : bool
147 978 : RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const
148 : {
149 978 : return readoffsets_.find(ringbufferId) != readoffsets_.end();
150 : }
151 :
152 : bool
153 0 : RingBuffer::hasNoReadOffsets() const
154 : {
155 0 : return readoffsets_.empty();
156 : }
157 :
158 : //
159 : // For the writer only:
160 : //
161 :
162 : void
163 0 : RingBuffer::put(std::shared_ptr<AudioFrame>&& data)
164 : {
165 0 : std::lock_guard l(writeLock_);
166 0 : resizer_.enqueue(resampler_.resample(std::move(data), format_));
167 0 : }
168 :
169 : // This one puts some data inside the ring buffer.
170 : void
171 0 : RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data)
172 : {
173 0 : std::lock_guard l(lock_);
174 0 : const size_t buffer_size = buffer_.size();
175 0 : if (buffer_size == 0)
176 0 : return;
177 :
178 0 : size_t len = buffer_size - putLength();
179 0 : if (len == 0)
180 0 : discard(1);
181 :
182 0 : size_t pos = endPos_;
183 :
184 0 : buffer_[pos] = std::move(data);
185 0 : const auto& newBuf = buffer_[pos];
186 0 : pos = (pos + 1) % buffer_size;
187 :
188 0 : endPos_ = pos;
189 :
190 0 : if (rmsSignal_) {
191 0 : ++rmsFrameCount_;
192 0 : rmsLevel_ += newBuf->calcRMS();
193 0 : if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) {
194 0 : emitSignal<libjami::AudioSignal::AudioMeter>(id, rmsLevel_ / RMS_SIGNAL_INTERVAL);
195 0 : rmsLevel_ = 0;
196 0 : rmsFrameCount_ = 0;
197 : }
198 : }
199 :
200 0 : for (auto& offset : readoffsets_) {
201 0 : if (offset.second.callback)
202 0 : offset.second.callback(newBuf);
203 : }
204 :
205 0 : not_empty_.notify_all();
206 0 : }
207 :
208 : //
209 : // For the reader only:
210 : //
211 :
212 : size_t
213 0 : RingBuffer::availableForGet(const std::string& ringbufferId) const
214 : {
215 : // Used space
216 0 : return getLength(ringbufferId);
217 : }
218 :
219 : std::shared_ptr<AudioFrame>
220 49800 : RingBuffer::get(const std::string& ringbufferId)
221 : {
222 49800 : std::lock_guard l(lock_);
223 :
224 49800 : auto offset = readoffsets_.find(ringbufferId);
225 49800 : if (offset == readoffsets_.end())
226 0 : return {};
227 :
228 49800 : const size_t buffer_size = buffer_.size();
229 49800 : if (buffer_size == 0)
230 0 : return {};
231 :
232 49800 : size_t startPos = offset->second.offset;
233 49800 : size_t len = (endPos_ + buffer_size - startPos) % buffer_size;
234 49800 : if (len == 0)
235 49800 : return {};
236 :
237 0 : auto ret = buffer_[startPos];
238 0 : offset->second.offset = (startPos + 1) % buffer_size;
239 0 : return ret;
240 0 : }
241 :
242 : size_t
243 36015 : RingBuffer::waitForDataAvailable(const std::string& ringbufferId, const time_point& deadline) const
244 : {
245 36015 : std::unique_lock l(lock_);
246 :
247 36015 : if (buffer_.empty())
248 0 : return 0;
249 36015 : if (readoffsets_.find(ringbufferId) == readoffsets_.end())
250 0 : return 0;
251 :
252 36015 : size_t getl = 0;
253 72030 : auto check = [=, &getl] {
254 : // Re-find read_ptr: it may be destroyed during the wait
255 72030 : const size_t buffer_size = buffer_.size();
256 72030 : const auto read_ptr = readoffsets_.find(ringbufferId);
257 72030 : if (buffer_size == 0 || read_ptr == readoffsets_.end())
258 127 : return true;
259 71903 : getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size;
260 71903 : return getl != 0;
261 36015 : };
262 :
263 36015 : if (deadline == time_point::max()) {
264 : // no timeout provided, wait as long as necessary
265 0 : not_empty_.wait(l, check);
266 : } else {
267 36015 : not_empty_.wait_until(l, deadline, check);
268 : }
269 :
270 36015 : return getl;
271 36015 : }
272 :
273 : size_t
274 0 : RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId)
275 : {
276 0 : std::lock_guard l(lock_);
277 :
278 0 : const size_t buffer_size = buffer_.size();
279 0 : if (buffer_size == 0)
280 0 : return 0;
281 :
282 0 : auto offset = readoffsets_.find(ringbufferId);
283 0 : if (offset == readoffsets_.end())
284 0 : return 0;
285 :
286 0 : size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size;
287 0 : toDiscard = std::min(toDiscard, len);
288 :
289 0 : offset->second.offset = (offset->second.offset + toDiscard) % buffer_size;
290 0 : return toDiscard;
291 0 : }
292 :
293 : size_t
294 0 : RingBuffer::discard(size_t toDiscard)
295 : {
296 0 : const size_t buffer_size = buffer_.size();
297 0 : if (buffer_size == 0)
298 0 : return 0;
299 :
300 0 : for (auto& r : readoffsets_) {
301 0 : size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size;
302 0 : if (dst < toDiscard)
303 0 : r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size;
304 : }
305 0 : return toDiscard;
306 : }
307 :
308 : } // namespace jami
|