Line data Source code
1 : /* 2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "ringbuffer.h" 19 : #include "logger.h" 20 : #include "client/ring_signal.h" 21 : #include "media_buffer.h" 22 : #include "libav_deps.h" 23 : 24 : #include <chrono> 25 : #include <cstdlib> 26 : #include <cstring> 27 : #include <algorithm> 28 : 29 : namespace jami { 30 : 31 : static constexpr const int RMS_SIGNAL_INTERVAL = 5; 32 : 33 516 : RingBuffer::RingBuffer(const std::string& rbuf_id, AudioFormat format) 34 516 : : id(rbuf_id) 35 516 : , endPos_(0) 36 516 : , format_(format) 37 516 : , lock_() 38 516 : , not_empty_() 39 516 : , readoffsets_() 40 516 : , resizer_(format_, format_.sample_rate / 50, [this](std::shared_ptr<AudioFrame>&& frame) { 41 0 : putToBuffer(std::move(frame)); 42 2064 : }) 43 : { 44 2064 : JAMI_LOG("Create new RingBuffer {}", id); 45 516 : } 46 : 47 1032 : RingBuffer::~RingBuffer() 48 : { 49 2064 : JAMI_LOG("Destroy RingBuffer {}", id); 50 516 : } 51 : 52 : void 53 5937 : RingBuffer::flush(const std::string& ringbufferId) 54 : { 55 5937 : storeReadOffset(endPos_, ringbufferId); 56 5937 : } 57 : 58 : void 59 6216 : RingBuffer::flushAll() 60 : { 61 12166 : for (auto& offset : readoffsets_) 62 5950 : offset.second.offset = endPos_; 63 6216 : } 64 : 65 : std::vector<std::string> 66 93 : RingBuffer::getSubscribers() 67 : { 68 93 : std::vector<std::string> subscribers; 69 255 : for (const auto& offset : readoffsets_) { 70 162 : subscribers.push_back(offset.first); 71 : } 72 93 : return subscribers; 73 0 : } 74 : 75 : size_t 76 0 : RingBuffer::putLength() const 77 : { 78 0 : const size_t buffer_size = buffer_.size(); 79 0 : if (buffer_size == 0) 80 0 : return 0; 81 0 : const size_t startPos = getSmallestReadOffset(); 82 0 : return (endPos_ + buffer_size - startPos) % buffer_size; 83 : } 84 : 85 : size_t 86 10 : RingBuffer::getLength(const std::string& ringbufferId) const 87 : { 88 10 : const size_t buffer_size = buffer_.size(); 89 10 : if (buffer_size == 0) 90 0 : return 0; 91 10 : return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size; 92 : } 93 : 94 : void 95 0 : RingBuffer::debug() 96 : { 97 0 : JAMI_DBG("Start=%zu; End=%zu; BufferSize=%zu", getSmallestReadOffset(), endPos_, buffer_.size()); 98 0 : } 99 : 100 : size_t 101 10 : RingBuffer::getReadOffset(const std::string& ringbufferId) const 102 : { 103 10 : auto iter = readoffsets_.find(ringbufferId); 104 10 : return (iter != readoffsets_.end()) ? iter->second.offset : 0; 105 : } 106 : 107 : size_t 108 0 : RingBuffer::getSmallestReadOffset() const 109 : { 110 0 : if (hasNoReadOffsets()) 111 0 : return 0; 112 0 : size_t smallest = buffer_.size(); 113 0 : for (auto const& iter : readoffsets_) 114 0 : smallest = std::min(smallest, iter.second.offset); 115 0 : return smallest; 116 : } 117 : 118 : void 119 5937 : RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId) 120 : { 121 5937 : ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId); 122 : 123 5937 : if (iter != readoffsets_.end()) 124 5937 : iter->second.offset = offset; 125 : else 126 0 : JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId); 127 5937 : } 128 : 129 : void 130 788 : RingBuffer::createReadOffset(const std::string& ringbufferId) 131 : { 132 788 : std::lock_guard l(lock_); 133 788 : if (!hasThisReadOffset(ringbufferId)) 134 740 : readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}}); 135 788 : } 136 : 137 : void 138 788 : RingBuffer::removeReadOffset(const std::string& ringbufferId) 139 : { 140 788 : std::lock_guard l(lock_); 141 788 : auto iter = readoffsets_.find(ringbufferId); 142 788 : if (iter != readoffsets_.end()) 143 658 : readoffsets_.erase(iter); 144 788 : } 145 : 146 : bool 147 788 : RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const 148 : { 149 788 : return readoffsets_.find(ringbufferId) != readoffsets_.end(); 150 : } 151 : 152 : bool 153 0 : RingBuffer::hasNoReadOffsets() const 154 : { 155 0 : return readoffsets_.empty(); 156 : } 157 : 158 : // 159 : // For the writer only: 160 : // 161 : 162 : void 163 0 : RingBuffer::put(std::shared_ptr<AudioFrame>&& data) 164 : { 165 0 : std::lock_guard l(writeLock_); 166 0 : resizer_.enqueue(resampler_.resample(std::move(data), format_)); 167 0 : } 168 : 169 : // This one puts some data inside the ring buffer. 170 : void 171 0 : RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data) 172 : { 173 0 : std::lock_guard l(lock_); 174 0 : const size_t buffer_size = buffer_.size(); 175 0 : if (buffer_size == 0) 176 0 : return; 177 : 178 0 : size_t len = buffer_size - putLength(); 179 0 : if (len == 0) 180 0 : discard(1); 181 : 182 0 : size_t pos = endPos_; 183 : 184 0 : buffer_[pos] = std::move(data); 185 0 : const auto& newBuf = buffer_[pos]; 186 0 : pos = (pos + 1) % buffer_size; 187 : 188 0 : endPos_ = pos; 189 : 190 0 : if (rmsSignal_) { 191 0 : ++rmsFrameCount_; 192 0 : rmsLevel_ += newBuf->calcRMS(); 193 0 : if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) { 194 0 : emitSignal<libjami::AudioSignal::AudioMeter>(id, rmsLevel_ / RMS_SIGNAL_INTERVAL); 195 0 : rmsLevel_ = 0; 196 0 : rmsFrameCount_ = 0; 197 : } 198 : } 199 : 200 0 : for (auto& offset : readoffsets_) { 201 0 : if (offset.second.callback) 202 0 : offset.second.callback(newBuf); 203 : } 204 : 205 0 : not_empty_.notify_all(); 206 0 : } 207 : 208 : // 209 : // For the reader only: 210 : // 211 : 212 : size_t 213 0 : RingBuffer::availableForGet(const std::string& ringbufferId) const 214 : { 215 : // Used space 216 0 : return getLength(ringbufferId); 217 : } 218 : 219 : std::shared_ptr<AudioFrame> 220 50852 : RingBuffer::get(const std::string& ringbufferId) 221 : { 222 50852 : std::lock_guard l(lock_); 223 : 224 50852 : auto offset = readoffsets_.find(ringbufferId); 225 50852 : if (offset == readoffsets_.end()) 226 0 : return {}; 227 : 228 50852 : const size_t buffer_size = buffer_.size(); 229 50852 : if (buffer_size == 0) 230 0 : return {}; 231 : 232 50852 : size_t startPos = offset->second.offset; 233 50852 : size_t len = (endPos_ + buffer_size - startPos) % buffer_size; 234 50852 : if (len == 0) 235 50852 : return {}; 236 : 237 0 : auto ret = buffer_[startPos]; 238 0 : offset->second.offset = (startPos + 1) % buffer_size; 239 0 : return ret; 240 50852 : } 241 : 242 : size_t 243 0 : RingBuffer::waitForDataAvailable(const std::string& ringbufferId, const time_point& deadline) const 244 : { 245 0 : std::unique_lock l(lock_); 246 : 247 0 : if (buffer_.empty()) 248 0 : return 0; 249 0 : if (readoffsets_.find(ringbufferId) == readoffsets_.end()) 250 0 : return 0; 251 : 252 0 : size_t getl = 0; 253 0 : auto check = [=, &getl] { 254 : // Re-find read_ptr: it may be destroyed during the wait 255 0 : const size_t buffer_size = buffer_.size(); 256 0 : const auto read_ptr = readoffsets_.find(ringbufferId); 257 0 : if (buffer_size == 0 || read_ptr == readoffsets_.end()) 258 0 : return true; 259 0 : getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size; 260 0 : return getl != 0; 261 0 : }; 262 : 263 0 : if (deadline == time_point::max()) { 264 : // no timeout provided, wait as long as necessary 265 0 : not_empty_.wait(l, check); 266 : } else { 267 0 : not_empty_.wait_until(l, deadline, check); 268 : } 269 : 270 0 : return getl; 271 0 : } 272 : 273 : size_t 274 0 : RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId) 275 : { 276 0 : std::lock_guard l(lock_); 277 : 278 0 : const size_t buffer_size = buffer_.size(); 279 0 : if (buffer_size == 0) 280 0 : return 0; 281 : 282 0 : auto offset = readoffsets_.find(ringbufferId); 283 0 : if (offset == readoffsets_.end()) 284 0 : return 0; 285 : 286 0 : size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size; 287 0 : toDiscard = std::min(toDiscard, len); 288 : 289 0 : offset->second.offset = (offset->second.offset + toDiscard) % buffer_size; 290 0 : return toDiscard; 291 0 : } 292 : 293 : size_t 294 0 : RingBuffer::discard(size_t toDiscard) 295 : { 296 0 : const size_t buffer_size = buffer_.size(); 297 0 : if (buffer_size == 0) 298 0 : return 0; 299 : 300 0 : for (auto& r : readoffsets_) { 301 0 : size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size; 302 0 : if (dst < toDiscard) 303 0 : r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size; 304 : } 305 0 : return toDiscard; 306 : } 307 : 308 : } // namespace jami