LCOV - code coverage report
Current view: top level - foo/src/im - message_engine.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 127 156 81.4 %
Date: 2025-09-15 07:46:53 Functions: 26 35 74.3 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "message_engine.h"
      19             : #include "sip/sipaccountbase.h"
      20             : #include "manager.h"
      21             : #include "fileutils.h"
      22             : 
      23             : #include "client/ring_signal.h"
      24             : #include "jami/account_const.h"
      25             : 
      26             : #include <opendht/thread_pool.h>
      27             : #include <fmt/std.h>
      28             : 
      29             : #include <fstream>
      30             : 
      31             : namespace jami {
      32             : namespace im {
      33             : 
      34         665 : MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
      35         665 :     : account_(acc)
      36         665 :     , savePath_(path)
      37         665 :     , ioContext_(Manager::instance().ioContext())
      38         665 :     , saveTimer_(*ioContext_)
      39             : {
      40         665 :     dhtnet::fileutils::check_dir(savePath_.parent_path());
      41         665 : }
      42             : 
      43             : MessageToken
      44       13855 : MessageEngine::sendMessage(const std::string& to,
      45             :                            const std::string& deviceId,
      46             :                            const std::map<std::string, std::string>& payloads,
      47             :                            uint64_t refreshToken)
      48             : {
      49       13855 :     if (payloads.empty() or to.empty())
      50           0 :         return 0;
      51       13855 :     MessageToken token = 0;
      52             :     {
      53       13855 :         std::lock_guard lock(messagesMutex_);
      54       13855 :         auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
      55       13855 :         if (refreshToken != 0) {
      56       11907 :             for (auto& m : peerMessages) {
      57         584 :                 if (m.token == refreshToken) {
      58         583 :                     token = refreshToken;
      59         583 :                     m.to = to;
      60         583 :                     m.payloads = payloads;
      61         583 :                     m.status = MessageStatus::IDLE;
      62         583 :                     break;
      63             :                 }
      64             :             }
      65             :         }
      66       13855 :         if (token == 0) {
      67       13272 :             token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
      68       13272 :             auto& m = peerMessages.emplace_back(Message {token});
      69       13272 :             m.to = to;
      70       13272 :             m.payloads = payloads;
      71             :         }
      72       13855 :         scheduleSave();
      73       13855 :     }
      74       13855 :     asio::post(*ioContext_, [this, to, deviceId]() {
      75       13855 :         retrySend(to, deviceId, true);
      76       13855 :     });
      77       13855 :     return token;
      78             : }
      79             : 
      80             : void
      81        1686 : MessageEngine::onPeerOnline(const std::string& peer,
      82             :                             const std::string& deviceId,
      83             :                             bool retryOnTimeout)
      84             : {
      85        1686 :     retrySend(peer, deviceId, retryOnTimeout);
      86        1686 : }
      87             : 
      88             : void
      89       15541 : MessageEngine::retrySend(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
      90             : {
      91             :     struct PendingMsg {
      92             :         MessageToken token;
      93             :         std::string to;
      94             :         std::map<std::string, std::string> payloads;
      95             :     };
      96       15541 :     std::vector<PendingMsg> pending {};
      97       15541 :     auto now = clock::now();
      98             :     {
      99       15541 :         std::lock_guard lock(messagesMutex_);
     100       15541 :         auto& m = deviceId.empty() ? messages_ : messagesDevices_;
     101       15541 :         auto p = m.find(deviceId.empty() ? peer : deviceId);
     102       15541 :         if (p == m.end())
     103        1010 :             return;
     104       14531 :         auto& messages = p->second;
     105             : 
     106       29211 :         for (auto& m: messages) {
     107       14680 :             if (m.status == MessageStatus::IDLE) {
     108       14676 :                 m.status = MessageStatus::SENDING;
     109       14676 :                 m.retried++;
     110       14676 :                 m.last_op = now;
     111       14676 :                 pending.emplace_back(PendingMsg {m.token, m.to, m.payloads});
     112             :             }
     113             :         }
     114       15541 :     }
     115             :     // avoid locking while calling callback
     116       29207 :     for (const auto& p : pending) {
     117       44028 :         JAMI_DEBUG("[Account {:s}] [message {:d}] Reattempt sending", account_.getAccountID(), p.token);
     118       14676 :         if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
     119         468 :             emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     120         468 :                 account_.getAccountID(),
     121             :                 "",
     122         468 :                 p.to,
     123         936 :                 std::to_string(p.token),
     124             :                 (int) libjami::Account::MessageStates::SENDING);
     125       14676 :         account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
     126             :     }
     127       15541 : }
     128             : 
     129             : MessageStatus
     130           0 : MessageEngine::getStatus(MessageToken t) const
     131             : {
     132           0 :     std::lock_guard lock(messagesMutex_);
     133           0 :     for (const auto& p : messages_) {
     134           0 :         for (const auto& m : p.second) {
     135           0 :             if (m.token == t)
     136           0 :                 return m.status;
     137             :         }
     138             :     }
     139           0 :     return MessageStatus::UNKNOWN;
     140           0 : }
     141             : 
     142             : void
     143       14676 : MessageEngine::onMessageSent(const std::string& peer,
     144             :                              MessageToken token,
     145             :                              bool ok,
     146             :                              const std::string& deviceId)
     147             : {
     148       44028 :     JAMI_DEBUG("[Account {:s}] [message {:d}] Message sent: {:s}", account_.getAccountID(), token, ok ? "success"sv : "failure"sv);
     149       14676 :     std::lock_guard lock(messagesMutex_);
     150       14676 :     auto& m = deviceId.empty() ? messages_ : messagesDevices_;
     151             : 
     152       14676 :     auto p = m.find(deviceId.empty() ? peer : deviceId);
     153       14676 :     if (p == m.end()) {
     154           0 :         JAMI_WARNING("[Account {:s}] onMessageSent: Peer not found: id:{} device:{}", account_.getAccountID(), peer, deviceId);
     155           0 :         return;
     156             :     }
     157             : 
     158       14676 :     auto f = std::find_if(p->second.begin(), p->second.end(), [&](const Message& m) {
     159       14826 :         return m.token == token;
     160             :     });
     161       14676 :     if (f != p->second.end()) {
     162       29352 :         auto emit = f->payloads.find("application/im-gitmessage-id")
     163       29352 :                     == f->payloads.end();
     164       14676 :         if (f->status == MessageStatus::SENDING) {
     165       14673 :             if (ok) {
     166       12936 :                 f->status = MessageStatus::SENT;
     167       38808 :                 JAMI_LOG("[Account {:s}] [message {:d}] Status changed to SENT", account_.getAccountID(), token);
     168       12936 :                 if (emit)
     169         315 :                     emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     170         315 :                         account_.getAccountID(),
     171             :                         "",
     172         315 :                         f->to,
     173         630 :                         std::to_string(token),
     174             :                         static_cast<int>(libjami::Account::MessageStates::SENT));
     175       12936 :                 p->second.erase(f);
     176       12936 :                 scheduleSave();
     177        1737 :             } else if (f->retried >= MAX_RETRIES) {
     178           4 :                 f->status = MessageStatus::FAILURE;
     179          12 :                 JAMI_WARNING("[Account {:s}] [message {:d}] Status changed to FAILURE", account_.getAccountID(), token);
     180           4 :                 if (emit)
     181           0 :                     emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     182           0 :                         account_.getAccountID(),
     183             :                         "",
     184           0 :                         f->to,
     185           0 :                         std::to_string(token),
     186             :                         static_cast<int>(libjami::Account::MessageStates::FAILURE));
     187           4 :                 p->second.erase(f);
     188           4 :                 scheduleSave();
     189             :             } else {
     190        1733 :                 f->status = MessageStatus::IDLE;
     191        5199 :                 JAMI_DEBUG("[Account {:s}] [message {:d}] Status changed to IDLE", account_.getAccountID(), token);
     192             :             }
     193             :         } else {
     194           9 :             JAMI_DEBUG("[Account {:s}] [message {:d}] State is not SENDING", account_.getAccountID(), token);
     195             :         }
     196             :     } else {
     197           0 :         JAMI_DEBUG("[Account {:s}] [message {:d}] Unable to find message", account_.getAccountID(), token);
     198             :     }
     199       14676 : }
     200             : 
     201             : void
     202          25 : MessageEngine::load()
     203             : {
     204             :     try {
     205          25 :         decltype(messages_) root;
     206             :         {
     207          25 :             std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
     208          25 :             std::ifstream file;
     209          25 :             file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
     210          25 :             file.open(savePath_);
     211           1 :             if (file.is_open()) {
     212           1 :                 msgpack::unpacker up;
     213           1 :                 up.reserve_buffer(UINT16_MAX);
     214           1 :                 while (file.read(up.buffer(), UINT16_MAX)) {
     215           0 :                     up.buffer_consumed(file.gcount());
     216           0 :                     msgpack::object_handle oh;
     217           0 :                     if (up.next(oh)) {
     218           0 :                         root = oh.get().as<std::map<std::string, std::list<Message>>>();
     219           0 :                         break;
     220             :                     }
     221           0 :                     up.reserve_buffer(UINT16_MAX);
     222           0 :                 }
     223           1 :             }
     224          50 :         }
     225           0 :         std::lock_guard lock(messagesMutex_);
     226           0 :         messages_ = std::move(root);
     227           0 :         if (not messages_.empty()) {
     228           0 :             JAMI_LOG("[Account {}] Loaded {} messages from {}",
     229             :                      account_.getAccountID(),
     230             :                      messages_.size(),
     231             :                      savePath_);
     232             :         }
     233          50 :     } catch (const std::exception& e) {
     234          75 :         JAMI_LOG("[Account {}] Unable to load messages from {}: {}",
     235             :                  account_.getAccountID(),
     236             :                  savePath_,
     237             :                  e.what());
     238          25 :     }
     239          25 : }
     240             : 
     241             : void
     242         134 : MessageEngine::save() const
     243             : {
     244         134 :     std::lock_guard lock(messagesMutex_);
     245         134 :     save_();
     246         134 : }
     247             : 
     248             : void
     249       26795 : MessageEngine::scheduleSave()
     250             : {
     251       26795 :     saveTimer_.expires_after(std::chrono::seconds(5));
     252       26795 :     saveTimer_.async_wait([this, w = account_.weak_from_this()](const std::error_code& ec) {
     253       26795 :         if (!ec)
     254         133 :             if (auto acc = w.lock())
     255         133 :                 save();
     256       26795 :     });
     257       26795 : }
     258             : 
     259             : void
     260         134 : MessageEngine::save_() const
     261             : {
     262             :     try {
     263         134 :         std::ofstream file;
     264         134 :         file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
     265         134 :         file.open(savePath_, std::ios::trunc);
     266         134 :         if (file.is_open())
     267         134 :             msgpack::pack(file, messages_);
     268         134 :     } catch (const std::exception& e) {
     269           0 :         JAMI_ERROR("[Account {}] Unable to serialize pending messages: {}",
     270             :                  account_.getAccountID(), e.what());
     271           0 :     }
     272         134 : }
     273             : 
     274             : } // namespace im
     275             : } // namespace jami

Generated by: LCOV version 1.14