Line data Source code
1 : /* 2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "ringbuffer.h" 19 : #include "logger.h" 20 : #include "client/ring_signal.h" 21 : #include "media_buffer.h" 22 : #include "libav_deps.h" 23 : 24 : #include <chrono> 25 : #include <cstdlib> 26 : #include <cstring> 27 : #include <algorithm> 28 : 29 : namespace jami { 30 : 31 : // corresponds to 160 ms (about 5 rtp packets) 32 : static const size_t MIN_BUFFER_SIZE = 1024; 33 : 34 : static constexpr const int RMS_SIGNAL_INTERVAL = 5; 35 : 36 526 : RingBuffer::RingBuffer(const std::string& rbuf_id, size_t /*size*/, AudioFormat format) 37 526 : : id(rbuf_id) 38 526 : , endPos_(0) 39 526 : , format_(format) 40 526 : , lock_() 41 526 : , not_empty_() 42 526 : , readoffsets_() 43 526 : , resizer_(format_, format_.sample_rate / 50, [this](std::shared_ptr<AudioFrame>&& frame) { 44 0 : putToBuffer(std::move(frame)); 45 2104 : }) 46 : { 47 1578 : JAMI_LOG("Create new RingBuffer {}", id); 48 526 : } 49 : 50 1052 : RingBuffer::~RingBuffer() 51 : { 52 1578 : JAMI_LOG("Destroy RingBuffer {}", id); 53 526 : } 54 : 55 : void 56 5891 : RingBuffer::flush(const std::string& ringbufferId) 57 : { 58 5891 : storeReadOffset(endPos_, ringbufferId); 59 5891 : } 60 : 61 : void 62 6255 : RingBuffer::flushAll() 63 : { 64 12322 : for (auto& offset : readoffsets_) 65 6067 : offset.second.offset = endPos_; 66 6255 : } 67 : 68 95 : std::vector<std::string> RingBuffer::getSubscribers() { 69 95 : std::vector<std::string> subscribers; 70 263 : for (const auto &offset: readoffsets_) { 71 168 : subscribers.push_back(offset.first); 72 : } 73 95 : return subscribers; 74 0 : } 75 : 76 : size_t 77 0 : RingBuffer::putLength() const 78 : { 79 0 : const size_t buffer_size = buffer_.size(); 80 0 : if (buffer_size == 0) 81 0 : return 0; 82 0 : const size_t startPos = getSmallestReadOffset(); 83 0 : return (endPos_ + buffer_size - startPos) % buffer_size; 84 : } 85 : 86 : size_t 87 5604 : RingBuffer::getLength(const std::string& ringbufferId) const 88 : { 89 5604 : const size_t buffer_size = buffer_.size(); 90 5604 : if (buffer_size == 0) 91 0 : return 0; 92 5604 : return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size; 93 : } 94 : 95 : void 96 0 : RingBuffer::debug() 97 : { 98 0 : JAMI_DBG("Start=%zu; End=%zu; BufferSize=%zu", getSmallestReadOffset(), endPos_, buffer_.size()); 99 0 : } 100 : 101 : size_t 102 5604 : RingBuffer::getReadOffset(const std::string& ringbufferId) const 103 : { 104 5604 : auto iter = readoffsets_.find(ringbufferId); 105 5604 : return (iter != readoffsets_.end()) ? iter->second.offset : 0; 106 : } 107 : 108 : size_t 109 0 : RingBuffer::getSmallestReadOffset() const 110 : { 111 0 : if (hasNoReadOffsets()) 112 0 : return 0; 113 0 : size_t smallest = buffer_.size(); 114 0 : for (auto const& iter : readoffsets_) 115 0 : smallest = std::min(smallest, iter.second.offset); 116 0 : return smallest; 117 : } 118 : 119 : void 120 5891 : RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId) 121 : { 122 5891 : ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId); 123 : 124 5891 : if (iter != readoffsets_.end()) 125 5891 : iter->second.offset = offset; 126 : else 127 0 : JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId); 128 5891 : } 129 : 130 : void 131 834 : RingBuffer::createReadOffset(const std::string& ringbufferId) 132 : { 133 834 : std::lock_guard l(lock_); 134 834 : if (!hasThisReadOffset(ringbufferId)) 135 788 : readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}}); 136 834 : } 137 : 138 : void 139 842 : RingBuffer::removeReadOffset(const std::string& ringbufferId) 140 : { 141 842 : std::lock_guard l(lock_); 142 842 : auto iter = readoffsets_.find(ringbufferId); 143 842 : if (iter != readoffsets_.end()) 144 706 : readoffsets_.erase(iter); 145 842 : } 146 : 147 : bool 148 834 : RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const 149 : { 150 834 : return readoffsets_.find(ringbufferId) != readoffsets_.end(); 151 : } 152 : 153 : bool 154 0 : RingBuffer::hasNoReadOffsets() const 155 : { 156 0 : return readoffsets_.empty(); 157 : } 158 : 159 : // 160 : // For the writer only: 161 : // 162 : 163 : void 164 0 : RingBuffer::put(std::shared_ptr<AudioFrame>&& data) 165 : { 166 0 : std::lock_guard l(writeLock_); 167 0 : resizer_.enqueue(resampler_.resample(std::move(data), format_)); 168 0 : } 169 : 170 : // This one puts some data inside the ring buffer. 171 : void 172 0 : RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data) 173 : { 174 0 : std::lock_guard l(lock_); 175 0 : const size_t buffer_size = buffer_.size(); 176 0 : if (buffer_size == 0) 177 0 : return; 178 : 179 0 : size_t len = buffer_size - putLength(); 180 0 : if (len == 0) 181 0 : discard(1); 182 : 183 0 : size_t pos = endPos_; 184 : 185 0 : buffer_[pos] = std::move(data); 186 0 : const auto& newBuf = buffer_[pos]; 187 0 : pos = (pos + 1) % buffer_size; 188 : 189 0 : endPos_ = pos; 190 : 191 0 : if (rmsSignal_) { 192 0 : ++rmsFrameCount_; 193 0 : rmsLevel_ += newBuf->calcRMS(); 194 0 : if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) { 195 0 : emitSignal<libjami::AudioSignal::AudioMeter>(id, rmsLevel_ / RMS_SIGNAL_INTERVAL); 196 0 : rmsLevel_ = 0; 197 0 : rmsFrameCount_ = 0; 198 : } 199 : } 200 : 201 0 : for (auto& offset : readoffsets_) { 202 0 : if (offset.second.callback) 203 0 : offset.second.callback(newBuf); 204 : } 205 : 206 0 : not_empty_.notify_all(); 207 0 : } 208 : 209 : // 210 : // For the reader only: 211 : // 212 : 213 : size_t 214 0 : RingBuffer::availableForGet(const std::string& ringbufferId) const 215 : { 216 : // Used space 217 0 : return getLength(ringbufferId); 218 : } 219 : 220 : std::shared_ptr<AudioFrame> 221 60476 : RingBuffer::get(const std::string& ringbufferId) 222 : { 223 60476 : std::lock_guard l(lock_); 224 : 225 60476 : auto offset = readoffsets_.find(ringbufferId); 226 60476 : if (offset == readoffsets_.end()) 227 0 : return {}; 228 : 229 60476 : const size_t buffer_size = buffer_.size(); 230 60476 : if (buffer_size == 0) 231 0 : return {}; 232 : 233 60476 : size_t startPos = offset->second.offset; 234 60476 : size_t len = (endPos_ + buffer_size - startPos) % buffer_size; 235 60476 : if (len == 0) 236 60476 : return {}; 237 : 238 0 : auto ret = buffer_[startPos]; 239 0 : offset->second.offset = (startPos + 1) % buffer_size; 240 0 : return ret; 241 60476 : } 242 : 243 : size_t 244 0 : RingBuffer::waitForDataAvailable(const std::string& ringbufferId, const time_point& deadline) const 245 : { 246 0 : std::unique_lock l(lock_); 247 : 248 0 : if (buffer_.empty()) 249 0 : return 0; 250 0 : if (readoffsets_.find(ringbufferId) == readoffsets_.end()) 251 0 : return 0; 252 : 253 0 : size_t getl = 0; 254 0 : auto check = [=, &getl] { 255 : // Re-find read_ptr: it may be destroyed during the wait 256 0 : const size_t buffer_size = buffer_.size(); 257 0 : const auto read_ptr = readoffsets_.find(ringbufferId); 258 0 : if (buffer_size == 0 || read_ptr == readoffsets_.end()) 259 0 : return true; 260 0 : getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size; 261 0 : return getl != 0; 262 0 : }; 263 : 264 0 : if (deadline == time_point::max()) { 265 : // no timeout provided, wait as long as necessary 266 0 : not_empty_.wait(l, check); 267 : } else { 268 0 : not_empty_.wait_until(l, deadline, check); 269 : } 270 : 271 0 : return getl; 272 0 : } 273 : 274 : size_t 275 0 : RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId) 276 : { 277 0 : std::lock_guard l(lock_); 278 : 279 0 : const size_t buffer_size = buffer_.size(); 280 0 : if (buffer_size == 0) 281 0 : return 0; 282 : 283 0 : auto offset = readoffsets_.find(ringbufferId); 284 0 : if (offset == readoffsets_.end()) 285 0 : return 0; 286 : 287 0 : size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size; 288 0 : toDiscard = std::min(toDiscard, len); 289 : 290 0 : offset->second.offset = (offset->second.offset + toDiscard) % buffer_size; 291 0 : return toDiscard; 292 0 : } 293 : 294 : size_t 295 0 : RingBuffer::discard(size_t toDiscard) 296 : { 297 0 : const size_t buffer_size = buffer_.size(); 298 0 : if (buffer_size == 0) 299 0 : return 0; 300 : 301 0 : for (auto& r : readoffsets_) { 302 0 : size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size; 303 0 : if (dst < toDiscard) 304 0 : r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size; 305 : } 306 0 : return toDiscard; 307 : } 308 : 309 : } // namespace jami