Line data Source code
1 : /* 2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc. 3 : * 4 : * This program is free software: you can redistribute it and/or modify 5 : * it under the terms of the GNU General Public License as published by 6 : * the Free Software Foundation, either version 3 of the License, or 7 : * (at your option) any later version. 8 : * 9 : * This program is distributed in the hope that it will be useful, 10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 : * GNU General Public License for more details. 13 : * 14 : * You should have received a copy of the GNU General Public License 15 : * along with this program. If not, see <https://www.gnu.org/licenses/>. 16 : */ 17 : 18 : #include "ringbuffer.h" 19 : #include "logger.h" 20 : #include "client/ring_signal.h" 21 : #include "media_buffer.h" 22 : #include "libav_deps.h" 23 : 24 : #include <chrono> 25 : #include <cstdlib> 26 : #include <cstring> 27 : #include <algorithm> 28 : 29 : namespace jami { 30 : 31 : // corresponds to 160 ms (about 5 rtp packets) 32 : static const size_t MIN_BUFFER_SIZE = 1024; 33 : 34 : static constexpr const int RMS_SIGNAL_INTERVAL = 5; 35 : 36 525 : RingBuffer::RingBuffer(const std::string& rbuf_id, size_t /*size*/, AudioFormat format) 37 525 : : id(rbuf_id) 38 525 : , endPos_(0) 39 525 : , format_(format) 40 525 : , lock_() 41 525 : , not_empty_() 42 525 : , readoffsets_() 43 525 : , resizer_(format_, format_.sample_rate / 50, [this](std::shared_ptr<AudioFrame>&& frame) { 44 0 : putToBuffer(std::move(frame)); 45 2100 : }) 46 : { 47 1575 : JAMI_LOG("Create new RingBuffer {}", id); 48 525 : } 49 : 50 1050 : RingBuffer::~RingBuffer() 51 : { 52 1575 : JAMI_LOG("Destroy RingBuffer {}", id); 53 525 : } 54 : 55 : void 56 6403 : RingBuffer::flush(const std::string& ringbufferId) 57 : { 58 6403 : storeReadOffset(endPos_, ringbufferId); 59 6403 : } 60 : 61 : void 62 6614 : RingBuffer::flushAll() 63 : { 64 13030 : for (auto& offset : readoffsets_) 65 6416 : offset.second.offset = endPos_; 66 6614 : } 67 : 68 : size_t 69 0 : RingBuffer::putLength() const 70 : { 71 0 : const size_t buffer_size = buffer_.size(); 72 0 : if (buffer_size == 0) 73 0 : return 0; 74 0 : const size_t startPos = getSmallestReadOffset(); 75 0 : return (endPos_ + buffer_size - startPos) % buffer_size; 76 : } 77 : 78 : size_t 79 5412 : RingBuffer::getLength(const std::string& ringbufferId) const 80 : { 81 5412 : const size_t buffer_size = buffer_.size(); 82 5412 : if (buffer_size == 0) 83 0 : return 0; 84 5412 : return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size; 85 : } 86 : 87 : void 88 0 : RingBuffer::debug() 89 : { 90 0 : JAMI_DBG("Start=%zu; End=%zu; BufferSize=%zu", getSmallestReadOffset(), endPos_, buffer_.size()); 91 0 : } 92 : 93 : size_t 94 5412 : RingBuffer::getReadOffset(const std::string& ringbufferId) const 95 : { 96 5412 : auto iter = readoffsets_.find(ringbufferId); 97 5412 : return (iter != readoffsets_.end()) ? iter->second.offset : 0; 98 : } 99 : 100 : size_t 101 0 : RingBuffer::getSmallestReadOffset() const 102 : { 103 0 : if (hasNoReadOffsets()) 104 0 : return 0; 105 0 : size_t smallest = buffer_.size(); 106 0 : for (auto const& iter : readoffsets_) 107 0 : smallest = std::min(smallest, iter.second.offset); 108 0 : return smallest; 109 : } 110 : 111 : void 112 6403 : RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId) 113 : { 114 6403 : ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId); 115 : 116 6403 : if (iter != readoffsets_.end()) 117 6403 : iter->second.offset = offset; 118 : else 119 0 : JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId); 120 6403 : } 121 : 122 : void 123 836 : RingBuffer::createReadOffset(const std::string& ringbufferId) 124 : { 125 836 : std::lock_guard l(lock_); 126 836 : if (!hasThisReadOffset(ringbufferId)) 127 748 : readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}}); 128 836 : } 129 : 130 : void 131 1257 : RingBuffer::removeReadOffset(const std::string& ringbufferId) 132 : { 133 1257 : std::lock_guard l(lock_); 134 1257 : auto iter = readoffsets_.find(ringbufferId); 135 1257 : if (iter != readoffsets_.end()) 136 663 : readoffsets_.erase(iter); 137 1257 : } 138 : 139 : bool 140 836 : RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const 141 : { 142 836 : return readoffsets_.find(ringbufferId) != readoffsets_.end(); 143 : } 144 : 145 : bool 146 0 : RingBuffer::hasNoReadOffsets() const 147 : { 148 0 : return readoffsets_.empty(); 149 : } 150 : 151 : // 152 : // For the writer only: 153 : // 154 : 155 : void 156 0 : RingBuffer::put(std::shared_ptr<AudioFrame>&& data) 157 : { 158 0 : std::lock_guard l(writeLock_); 159 0 : resizer_.enqueue(resampler_.resample(std::move(data), format_)); 160 0 : } 161 : 162 : // This one puts some data inside the ring buffer. 163 : void 164 0 : RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data) 165 : { 166 0 : std::lock_guard l(lock_); 167 0 : const size_t buffer_size = buffer_.size(); 168 0 : if (buffer_size == 0) 169 0 : return; 170 : 171 0 : size_t len = buffer_size - putLength(); 172 0 : if (len == 0) 173 0 : discard(1); 174 : 175 0 : size_t pos = endPos_; 176 : 177 0 : buffer_[pos] = std::move(data); 178 0 : const auto& newBuf = buffer_[pos]; 179 0 : pos = (pos + 1) % buffer_size; 180 : 181 0 : endPos_ = pos; 182 : 183 0 : if (rmsSignal_) { 184 0 : ++rmsFrameCount_; 185 0 : rmsLevel_ += newBuf->calcRMS(); 186 0 : if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) { 187 0 : emitSignal<libjami::AudioSignal::AudioMeter>(id, rmsLevel_ / RMS_SIGNAL_INTERVAL); 188 0 : rmsLevel_ = 0; 189 0 : rmsFrameCount_ = 0; 190 : } 191 : } 192 : 193 0 : for (auto& offset : readoffsets_) { 194 0 : if (offset.second.callback) 195 0 : offset.second.callback(newBuf); 196 : } 197 : 198 0 : not_empty_.notify_all(); 199 0 : } 200 : 201 : // 202 : // For the reader only: 203 : // 204 : 205 : size_t 206 0 : RingBuffer::availableForGet(const std::string& ringbufferId) const 207 : { 208 : // Used space 209 0 : return getLength(ringbufferId); 210 : } 211 : 212 : std::shared_ptr<AudioFrame> 213 58779 : RingBuffer::get(const std::string& ringbufferId) 214 : { 215 58779 : std::lock_guard l(lock_); 216 : 217 58779 : auto offset = readoffsets_.find(ringbufferId); 218 58779 : if (offset == readoffsets_.end()) 219 0 : return {}; 220 : 221 58779 : const size_t buffer_size = buffer_.size(); 222 58779 : if (buffer_size == 0) 223 0 : return {}; 224 : 225 58779 : size_t startPos = offset->second.offset; 226 58779 : size_t len = (endPos_ + buffer_size - startPos) % buffer_size; 227 58779 : if (len == 0) 228 58779 : return {}; 229 : 230 0 : auto ret = buffer_[startPos]; 231 0 : offset->second.offset = (startPos + 1) % buffer_size; 232 0 : return ret; 233 58779 : } 234 : 235 : size_t 236 0 : RingBuffer::waitForDataAvailable(const std::string& ringbufferId, const time_point& deadline) const 237 : { 238 0 : std::unique_lock l(lock_); 239 : 240 0 : if (buffer_.empty()) 241 0 : return 0; 242 0 : if (readoffsets_.find(ringbufferId) == readoffsets_.end()) 243 0 : return 0; 244 : 245 0 : size_t getl = 0; 246 0 : auto check = [=, &getl] { 247 : // Re-find read_ptr: it may be destroyed during the wait 248 0 : const size_t buffer_size = buffer_.size(); 249 0 : const auto read_ptr = readoffsets_.find(ringbufferId); 250 0 : if (buffer_size == 0 || read_ptr == readoffsets_.end()) 251 0 : return true; 252 0 : getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size; 253 0 : return getl != 0; 254 0 : }; 255 : 256 0 : if (deadline == time_point::max()) { 257 : // no timeout provided, wait as long as necessary 258 0 : not_empty_.wait(l, check); 259 : } else { 260 0 : not_empty_.wait_until(l, deadline, check); 261 : } 262 : 263 0 : return getl; 264 0 : } 265 : 266 : size_t 267 0 : RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId) 268 : { 269 0 : std::lock_guard l(lock_); 270 : 271 0 : const size_t buffer_size = buffer_.size(); 272 0 : if (buffer_size == 0) 273 0 : return 0; 274 : 275 0 : auto offset = readoffsets_.find(ringbufferId); 276 0 : if (offset == readoffsets_.end()) 277 0 : return 0; 278 : 279 0 : size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size; 280 0 : toDiscard = std::min(toDiscard, len); 281 : 282 0 : offset->second.offset = (offset->second.offset + toDiscard) % buffer_size; 283 0 : return toDiscard; 284 0 : } 285 : 286 : size_t 287 0 : RingBuffer::discard(size_t toDiscard) 288 : { 289 0 : const size_t buffer_size = buffer_.size(); 290 0 : if (buffer_size == 0) 291 0 : return 0; 292 : 293 0 : for (auto& r : readoffsets_) { 294 0 : size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size; 295 0 : if (dst < toDiscard) 296 0 : r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size; 297 : } 298 0 : return toDiscard; 299 : } 300 : 301 : } // namespace jami