LCOV - code coverage report
Current view: top level - src/im - message_engine.cpp (source / functions) Coverage Total Hit
Test: jami-coverage-filtered.info Lines: 79.6 % 152 121
Test Date: 2026-06-13 09:18:46 Functions: 74.3 % 35 26

            Line data    Source code
       1              : /*
       2              :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3              :  *
       4              :  *  This program is free software: you can redistribute it and/or modify
       5              :  *  it under the terms of the GNU General Public License as published by
       6              :  *  the Free Software Foundation, either version 3 of the License, or
       7              :  *  (at your option) any later version.
       8              :  *
       9              :  *  This program is distributed in the hope that it will be useful,
      10              :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12              :  *  GNU General Public License for more details.
      13              :  *
      14              :  *  You should have received a copy of the GNU General Public License
      15              :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16              :  */
      17              : 
      18              : #include "message_engine.h"
      19              : #include "sip/sipaccountbase.h"
      20              : #include "manager.h"
      21              : 
      22              : #include "client/jami_signal.h"
      23              : #include "jami/account_const.h"
      24              : 
      25              : #include <dhtnet/fileutils.h>
      26              : #include <opendht/thread_pool.h>
      27              : #include <fmt/std.h>
      28              : 
      29              : #include <fstream>
      30              : 
      31              : namespace jami {
      32              : namespace im {
      33              : 
      34          802 : MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
      35          802 :     : account_(acc)
      36          802 :     , savePath_(path)
      37          802 :     , ioContext_(Manager::instance().ioContext())
      38          802 :     , saveTimer_(*ioContext_)
      39              : {
      40          802 :     dhtnet::fileutils::check_dir(savePath_.parent_path());
      41          802 : }
      42              : 
      43              : MessageToken
      44        17000 : MessageEngine::sendMessage(const std::string& to,
      45              :                            const std::string& deviceId,
      46              :                            const std::map<std::string, std::string>& payloads,
      47              :                            uint64_t refreshToken)
      48              : {
      49        17000 :     if (payloads.empty() or to.empty())
      50            0 :         return 0;
      51        17004 :     MessageToken token = 0;
      52              :     {
      53        17004 :         std::lock_guard lock(messagesMutex_);
      54        17012 :         auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
      55        16999 :         if (refreshToken != 0) {
      56        14835 :             for (auto& m : peerMessages) {
      57         1161 :                 if (m.token == refreshToken) {
      58         1161 :                     token = refreshToken;
      59         1161 :                     m.to = to;
      60         1161 :                     m.payloads = payloads;
      61         1161 :                     m.status = MessageStatus::IDLE;
      62         1161 :                     break;
      63              :                 }
      64              :             }
      65              :         }
      66        17006 :         if (token == 0) {
      67        15851 :             token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
      68        15857 :             auto& m = peerMessages.emplace_back(Message {token});
      69        15825 :             m.to = to;
      70        15838 :             m.payloads = payloads;
      71              :         }
      72        17005 :         scheduleSave();
      73        17022 :     }
      74        34046 :     asio::post(*ioContext_, [this, to, deviceId]() { retrySend(to, deviceId, true); });
      75        17009 :     return token;
      76              : }
      77              : 
      78              : void
      79         3379 : MessageEngine::onPeerOnline(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
      80              : {
      81         3379 :     retrySend(peer, deviceId, retryOnTimeout);
      82         3380 : }
      83              : 
      84              : void
      85        20402 : MessageEngine::retrySend(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
      86              : {
      87              :     struct PendingMsg
      88              :     {
      89              :         MessageToken token;
      90              :         std::string to;
      91              :         std::map<std::string, std::string> payloads;
      92              :     };
      93        20402 :     std::vector<PendingMsg> pending {};
      94        20402 :     auto now = clock::now();
      95              :     {
      96        20404 :         std::lock_guard lock(messagesMutex_);
      97        20404 :         auto& m = deviceId.empty() ? messages_ : messagesDevices_;
      98        20403 :         auto p = m.find(deviceId.empty() ? peer : deviceId);
      99        20401 :         if (p == m.end())
     100         2653 :             return;
     101        17750 :         auto& messages = p->second;
     102              : 
     103        35186 :         for (auto& m : messages) {
     104        17438 :             if (m.status == MessageStatus::IDLE) {
     105        17438 :                 m.status = MessageStatus::SENDING;
     106        17438 :                 m.retried++;
     107        17438 :                 m.last_op = now;
     108        17438 :                 pending.emplace_back(PendingMsg {m.token, m.to, m.payloads});
     109              :             }
     110              :         }
     111        20401 :     }
     112              :     // avoid locking while calling callback
     113        35188 :     for (const auto& p : pending) {
     114        69751 :         JAMI_DEBUG("[Account {:s}] [message {:d}] Reattempt sending", account_.getAccountID(), p.token);
     115        52314 :         if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
     116          597 :             emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     117          597 :                 account_.getAccountID(),
     118              :                 "",
     119          597 :                 p.to,
     120         1194 :                 std::to_string(p.token),
     121              :                 (int) libjami::Account::MessageStates::SENDING);
     122        17438 :         account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
     123              :     }
     124        20403 : }
     125              : 
     126              : MessageStatus
     127            0 : MessageEngine::getStatus(MessageToken t) const
     128              : {
     129            0 :     std::lock_guard lock(messagesMutex_);
     130            0 :     for (const auto& p : messages_) {
     131            0 :         for (const auto& m : p.second) {
     132            0 :             if (m.token == t)
     133            0 :                 return m.status;
     134              :         }
     135              :     }
     136            0 :     return MessageStatus::UNKNOWN;
     137            0 : }
     138              : 
     139              : void
     140        17438 : MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok, const std::string& deviceId)
     141              : {
     142        69740 :     JAMI_DEBUG("[Account {:s}] [message {:d}] Message sent: {:s}",
     143              :                account_.getAccountID(),
     144              :                token,
     145              :                ok ? "success"sv : "failure"sv);
     146        17437 :     std::lock_guard lock(messagesMutex_);
     147        17438 :     auto& m = deviceId.empty() ? messages_ : messagesDevices_;
     148              : 
     149        17438 :     auto p = m.find(deviceId.empty() ? peer : deviceId);
     150        17438 :     if (p == m.end()) {
     151            0 :         JAMI_WARNING("[Account {:s}] onMessageSent: Peer not found: id:{} device:{}",
     152              :                      account_.getAccountID(),
     153              :                      peer,
     154              :                      deviceId);
     155            0 :         return;
     156              :     }
     157              : 
     158        35063 :     auto f = std::find_if(p->second.begin(), p->second.end(), [&](const Message& m) { return m.token == token; });
     159        17438 :     if (f != p->second.end()) {
     160        34876 :         auto emit = f->payloads.find("application/im-gitmessage-id") == f->payloads.end();
     161        17438 :         if (f->status == MessageStatus::SENDING) {
     162        17426 :             if (ok) {
     163        15548 :                 f->status = MessageStatus::SENT;
     164        62192 :                 JAMI_LOG("[Account {:s}] [message {:d}] Status changed to SENT", account_.getAccountID(), token);
     165        15548 :                 if (emit)
     166          404 :                     emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     167          404 :                         account_.getAccountID(),
     168              :                         "",
     169          404 :                         f->to,
     170          808 :                         std::to_string(token),
     171              :                         static_cast<int>(libjami::Account::MessageStates::SENT));
     172        15548 :                 p->second.erase(f);
     173        15548 :                 scheduleSave();
     174         1878 :             } else if (f->retried >= MAX_RETRIES) {
     175            5 :                 f->status = MessageStatus::FAILURE;
     176           20 :                 JAMI_WARNING("[Account {:s}] [message {:d}] Status changed to FAILURE", account_.getAccountID(), token);
     177            5 :                 if (emit)
     178            0 :                     emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     179            0 :                         account_.getAccountID(),
     180              :                         "",
     181            0 :                         f->to,
     182            0 :                         std::to_string(token),
     183              :                         static_cast<int>(libjami::Account::MessageStates::FAILURE));
     184            5 :                 p->second.erase(f);
     185            5 :                 scheduleSave();
     186              :             } else {
     187         1873 :                 f->status = MessageStatus::IDLE;
     188         7492 :                 JAMI_DEBUG("[Account {:s}] [message {:d}] Status changed to IDLE", account_.getAccountID(), token);
     189              :             }
     190              :         } else {
     191           48 :             JAMI_DEBUG("[Account {:s}] [message {:d}] State is not SENDING", account_.getAccountID(), token);
     192              :         }
     193              :     } else {
     194            0 :         JAMI_DEBUG("[Account {:s}] [message {:d}] Unable to find message", account_.getAccountID(), token);
     195              :     }
     196        17438 : }
     197              : 
     198              : void
     199           25 : MessageEngine::load()
     200              : {
     201              :     try {
     202           25 :         decltype(messages_) root;
     203              :         {
     204           25 :             std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
     205           25 :             std::ifstream file;
     206           25 :             file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
     207           25 :             file.open(savePath_);
     208            1 :             if (file.is_open()) {
     209            1 :                 msgpack::unpacker up;
     210            1 :                 up.reserve_buffer(UINT16_MAX);
     211            1 :                 while (file.read(up.buffer(), UINT16_MAX)) {
     212            0 :                     up.buffer_consumed(file.gcount());
     213            0 :                     msgpack::object_handle oh;
     214            0 :                     if (up.next(oh)) {
     215            0 :                         root = oh.get().as<std::map<std::string, std::list<Message>>>();
     216            0 :                         break;
     217              :                     }
     218            0 :                     up.reserve_buffer(UINT16_MAX);
     219            0 :                 }
     220            0 :             }
     221            0 :         }
     222            0 :         std::lock_guard lock(messagesMutex_);
     223            0 :         messages_ = std::move(root);
     224            0 :         if (not messages_.empty()) {
     225            0 :             JAMI_LOG("[Account {}] Loaded {} messages from {}", account_.getAccountID(), messages_.size(), savePath_);
     226              :         }
     227           50 :     } catch (const std::exception& e) {
     228          100 :         JAMI_LOG("[Account {}] Unable to load messages from {}: {}", account_.getAccountID(), savePath_, e.what());
     229           25 :     }
     230           25 : }
     231              : 
     232              : void
     233          129 : MessageEngine::save() const
     234              : {
     235          129 :     std::lock_guard lock(messagesMutex_);
     236          129 :     save_();
     237          129 : }
     238              : 
     239              : void
     240        32560 : MessageEngine::scheduleSave()
     241              : {
     242        32560 :     saveTimer_.expires_after(std::chrono::seconds(5));
     243        32576 :     saveTimer_.async_wait([this, w = account_.weak_from_this()](const std::error_code& ec) {
     244        32576 :         if (!ec)
     245          128 :             if (auto acc = w.lock())
     246          128 :                 save();
     247        32576 :     });
     248        32572 : }
     249              : 
     250              : void
     251          129 : MessageEngine::save_() const
     252              : {
     253              :     try {
     254          129 :         std::ofstream file;
     255          129 :         file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
     256          129 :         file.open(savePath_, std::ios::trunc);
     257          129 :         if (file.is_open())
     258          129 :             msgpack::pack(file, messages_);
     259          129 :     } catch (const std::exception& e) {
     260            0 :         JAMI_ERROR("[Account {}] Unable to serialize pending messages: {}", account_.getAccountID(), e.what());
     261            0 :     }
     262          129 : }
     263              : 
     264              : } // namespace im
     265              : } // namespace jami
        

Generated by: LCOV version 2.0-1