LCOV - code coverage report
Current view: top level - src/im - message_engine.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 145 191 75.9 %
Date: 2024-04-26 09:41:19 Functions: 25 29 86.2 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  Author: Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
       5             :  *
       6             :  *  This program is free software; you can redistribute it and/or modify
       7             :  *  it under the terms of the GNU General Public License as published by
       8             :  *  the Free Software Foundation; either version 3 of the License, or
       9             :  *  (at your option) any later version.
      10             :  *
      11             :  *  This program is distributed in the hope that it will be useful,
      12             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  *  GNU General Public License for more details.
      15             :  *
      16             :  *  You should have received a copy of the GNU General Public License
      17             :  *  along with this program. If not, see <http://www.gnu.org/licenses/>.
      18             :  */
      19             : 
      20             : #include "message_engine.h"
      21             : #include "sip/sipaccountbase.h"
      22             : #include "manager.h"
      23             : #include "fileutils.h"
      24             : 
      25             : #include "client/ring_signal.h"
      26             : #include "jami/account_const.h"
      27             : 
      28             : #include <opendht/thread_pool.h>
      29             : #include <json/json.h>
      30             : #include <fmt/std.h>
      31             : 
      32             : #include <fstream>
      33             : 
      34             : namespace jami {
      35             : namespace im {
      36             : 
      37         608 : MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
      38         608 :     : account_(acc)
      39         608 :     , savePath_(path)
      40             : {
      41         608 :     dhtnet::fileutils::check_dir(savePath_.parent_path());
      42         608 : }
      43             : 
      44             : MessageToken
      45       39205 : MessageEngine::sendMessage(const std::string& to,
      46             :                            const std::string& deviceId,
      47             :                            const std::map<std::string, std::string>& payloads,
      48             :                            uint64_t refreshToken)
      49             : {
      50       39205 :     if (payloads.empty() or to.empty())
      51           0 :         return 0;
      52             :     MessageToken token;
      53             :     {
      54       39201 :         std::lock_guard lock(messagesMutex_);
      55             : 
      56       39202 :         auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
      57       39205 :         auto previousIt = peerMessages.find(refreshToken);
      58       39206 :         if (previousIt != peerMessages.end() && previousIt->second.status != MessageStatus::SENT) {
      59       44040 :             JAMI_DEBUG("[message {:d}] Replace content", refreshToken);
      60       14681 :             token = refreshToken;
      61       14681 :             previousIt->second.to = to;
      62       14681 :             previousIt->second.payloads = payloads;
      63       14681 :             previousIt->second.status = MessageStatus::IDLE;
      64             :         } else {
      65             :             do {
      66       24525 :                 token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(
      67       24524 :                     account_.rand);
      68       24526 :             } while (peerMessages.find(token) != peerMessages.end());
      69       24525 :             auto m = peerMessages.emplace(token, Message {});
      70       24524 :             m.first->second.to = to;
      71       24523 :             m.first->second.payloads = payloads;
      72             :         }
      73       39209 :         save_();
      74       39210 :     }
      75       78420 :     runOnMainThread([this, to, deviceId]() { retrySend(to, true, deviceId); });
      76       39210 :     return token;
      77             : }
      78             : 
      79             : void
      80        2427 : MessageEngine::onPeerOnline(const std::string& peer,
      81             :                             bool retryOnTimeout,
      82             :                             const std::string& deviceId)
      83             : {
      84        2427 :     retrySend(peer, retryOnTimeout, deviceId);
      85        2427 : }
      86             : 
      87             : void
      88       41637 : MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout, const std::string& deviceId)
      89             : {
      90       41637 :     if (account_.getRegistrationState() != RegistrationState::REGISTERED)
      91        1890 :         return;
      92             :     struct PendingMsg
      93             :     {
      94             :         MessageToken token;
      95             :         std::string to;
      96             :         std::map<std::string, std::string> payloads;
      97             :     };
      98       41625 :     std::vector<PendingMsg> pending {};
      99       41625 :     auto now = clock::now();
     100             :     {
     101       41625 :         std::lock_guard lock(messagesMutex_);
     102             : 
     103       41625 :         auto& m = deviceId.empty() ? messages_ : messagesDevices_;
     104       41625 :         auto p = m.find(deviceId.empty() ? peer : deviceId);
     105       41625 :         if (p == m.end())
     106        1879 :             return;
     107       39746 :         auto& messages = p->second;
     108             : 
     109     1075098 :         for (auto m = messages.begin(); m != messages.end(); ++m) {
     110     1035352 :             if (m->second.status == MessageStatus::UNKNOWN
     111     1035352 :                 || m->second.status == MessageStatus::IDLE) {
     112       35329 :                 m->second.status = MessageStatus::SENDING;
     113       35329 :                 m->second.retried++;
     114       35329 :                 m->second.last_op = now;
     115       35329 :                 pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
     116             :             }
     117             :         }
     118       41625 :     }
     119             :     // avoid locking while calling callback
     120       75075 :     for (const auto& p : pending) {
     121      105987 :         JAMI_DEBUG("[message {:d}] Retry sending", p.token);
     122       35329 :         if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
     123        1466 :             emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     124        1466 :                 account_.getAccountID(),
     125             :                 "",
     126        1466 :                 p.to,
     127        2932 :                 std::to_string(p.token),
     128             :                 (int) libjami::Account::MessageStates::SENDING);
     129       35329 :         account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
     130             :     }
     131       41625 : }
     132             : 
     133             : MessageStatus
     134           0 : MessageEngine::getStatus(MessageToken t) const
     135             : {
     136           0 :     std::lock_guard lock(messagesMutex_);
     137           0 :     for (const auto& p : messages_) {
     138           0 :         const auto m = p.second.find(t);
     139           0 :         if (m != p.second.end())
     140           0 :             return m->second.status;
     141             :     }
     142           0 :     return MessageStatus::UNKNOWN;
     143           0 : }
     144             : 
     145             : void
     146       37800 : MessageEngine::onMessageSent(const std::string& peer,
     147             :                              MessageToken token,
     148             :                              bool ok,
     149             :                              const std::string& deviceId)
     150             : {
     151      113400 :     JAMI_DEBUG("[message {:d}] Message sent: {:s}", token, ok ? "success"sv : "failure"sv);
     152       37800 :     std::lock_guard lock(messagesMutex_);
     153       37800 :     auto& m = deviceId.empty() ? messages_ : messagesDevices_;
     154             : 
     155       37800 :     auto p = m.find(deviceId.empty() ? peer : deviceId);
     156       37800 :     if (p == m.end()) {
     157        7428 :         JAMI_WARNING("[message {:d}] Not found", token);
     158        2476 :         return;
     159             :     }
     160             : 
     161       35324 :     auto f = p->second.find(token);
     162       35324 :     if (f != p->second.end()) {
     163       70648 :         auto emit = f->second.payloads.find("application/im-gitmessage-id")
     164       70648 :                     == f->second.payloads.end();
     165       35324 :         if (f->second.status == MessageStatus::SENDING) {
     166       27207 :             if (ok) {
     167       22905 :                 f->second.status = MessageStatus::SENT;
     168       22905 :                 JAMI_DBG() << "[message " << token << "] Status changed to SENT";
     169       22905 :                 if (emit)
     170           1 :                     emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     171           1 :                         account_.getAccountID(),
     172             :                         "",
     173           1 :                         f->second.to,
     174           2 :                         std::to_string(token),
     175             :                         static_cast<int>(libjami::Account::MessageStates::SENT));
     176       22905 :                 save_();
     177        4302 :             } else if (f->second.retried >= MAX_RETRIES) {
     178          57 :                 f->second.status = MessageStatus::FAILURE;
     179          57 :                 JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
     180          57 :                 if (emit)
     181           0 :                     emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
     182           0 :                         account_.getAccountID(),
     183             :                         "",
     184           0 :                         f->second.to,
     185           0 :                         std::to_string(token),
     186             :                         static_cast<int>(libjami::Account::MessageStates::FAILURE));
     187          57 :                 save_();
     188             :             } else {
     189        4245 :                 f->second.status = MessageStatus::IDLE;
     190       12735 :                 JAMI_DEBUG("[message {:d}] Status changed to IDLE", token);
     191             :             }
     192             :         } else {
     193       24351 :             JAMI_DEBUG("[message {:d}] State is not SENDING", token);
     194             :         }
     195             :     } else {
     196           0 :         JAMI_DEBUG("[message {:d}] Can't find message", token);
     197             :     }
     198       37800 : }
     199             : 
     200             : void
     201          24 : MessageEngine::load()
     202             : {
     203             :     try {
     204          24 :         Json::Value root;
     205             :         {
     206          24 :             std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
     207          24 :             std::ifstream file;
     208          24 :             file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
     209          24 :             file.open(savePath_);
     210           0 :             if (file.is_open())
     211           0 :                 file >> root;
     212          48 :         }
     213           0 :         std::lock_guard lock(messagesMutex_);
     214           0 :         long unsigned loaded {0};
     215           0 :         for (auto i = root.begin(); i != root.end(); ++i) {
     216           0 :             auto to = i.key().asString();
     217           0 :             auto& pmessages = *i;
     218           0 :             auto& p = messages_[to];
     219           0 :             for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
     220           0 :                 const auto& jmsg = *m;
     221           0 :                 MessageToken token = from_hex_string(m.key().asString());
     222           0 :                 Message msg;
     223           0 :                 msg.status = (MessageStatus) jmsg["status"].asInt();
     224           0 :                 msg.to = jmsg["to"].asString();
     225           0 :                 auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
     226           0 :                 msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
     227           0 :                 msg.retried = jmsg.get("retried", 0).asUInt();
     228           0 :                 const auto& pl = jmsg["payload"];
     229           0 :                 for (auto p = pl.begin(); p != pl.end(); ++p)
     230           0 :                     msg.payloads[p.key().asString()] = p->asString();
     231           0 :                 p.emplace(token, std::move(msg));
     232           0 :                 loaded++;
     233           0 :             }
     234           0 :         }
     235           0 :         if (loaded > 0) {
     236           0 :             JAMI_DBG("[Account %s] loaded %lu messages from %s",
     237             :                      account_.getAccountID().c_str(),
     238             :                      loaded,
     239             :                      savePath_.c_str());
     240             :         }
     241          48 :     } catch (const std::exception& e) {
     242          24 :         JAMI_DBG("[Account %s] couldn't load messages from %s: %s",
     243             :                  account_.getAccountID().c_str(),
     244             :                  savePath_.c_str(),
     245             :                  e.what());
     246          24 :     }
     247          24 : }
     248             : 
     249             : void
     250           0 : MessageEngine::save() const
     251             : {
     252           0 :     std::lock_guard lock(messagesMutex_);
     253           0 :     save_();
     254           0 : }
     255             : 
     256             : void
     257       62164 : MessageEngine::save_() const
     258             : {
     259             :     try {
     260       62164 :         Json::Value root(Json::objectValue);
     261     1310400 :         for (auto& c : messages_) {
     262     1245117 :             Json::Value peerRoot(Json::objectValue);
     263     3436473 :             for (auto& m : c.second) {
     264     2177059 :                 auto& v = m.second;
     265     2177059 :                 if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT
     266     2168385 :                     || v.status == MessageStatus::CANCELLED)
     267        8663 :                     continue;
     268     2168396 :                 Json::Value msg;
     269     2178884 :                 msg["status"] = (int) (v.status == MessageStatus::SENDING ? MessageStatus::IDLE
     270     4353230 :                                                                           : v.status);
     271     2181656 :                 msg["to"] = v.to;
     272     2172691 :                 auto wall_time = std::chrono::system_clock::now()
     273     4355447 :                                  + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
     274     2174736 :                 msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
     275     2177889 :                 msg["retried"] = v.retried;
     276     2181139 :                 auto& payloads = msg["payload"];
     277     4355674 :                 for (const auto& p : v.payloads)
     278     2166049 :                     payloads[p.first] = p.second;
     279     2162004 :                 peerRoot[to_hex_string(m.first)] = std::move(msg);
     280     2181623 :             }
     281     1235628 :             if (peerRoot.size() == 0)
     282        9458 :                 continue;
     283     1235361 :             root[c.first] = std::move(peerRoot);
     284     1248267 :         }
     285             : 
     286             :         // Save asynchronously
     287      186321 :         dht::ThreadPool::computation().run([path = savePath_,
     288       62168 :                                             root = std::move(root),
     289       62172 :                                             accountID = account_.getAccountID()] {
     290       62170 :             std::lock_guard lock(dhtnet::fileutils::getFileLock(path));
     291             :             try {
     292       62169 :                 Json::StreamWriterBuilder wbuilder;
     293       62171 :                 wbuilder["commentStyle"] = "None";
     294       62170 :                 wbuilder["indentation"] = "";
     295       62169 :                 std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
     296       62167 :                 std::ofstream file;
     297       62171 :                 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
     298       62167 :                 file.open(path, std::ios::trunc);
     299       62161 :                 if (file.is_open())
     300       62162 :                     writer->write(root, &file);
     301       62176 :             } catch (const std::exception& e) {
     302          18 :                 JAMI_ERROR("[Account {:s}] Couldn't save messages to {:s}: {:s}",
     303             :                            accountID,
     304             :                            path.string(),
     305             :                            e.what());
     306           6 :             }
     307      186461 :             JAMI_DEBUG("[Account {:s}] saved {:d} messages to {:s}", accountID, root.size(), path.string());
     308       62171 :         });
     309       62172 :     } catch (const std::exception& e) {
     310           0 :         JAMI_ERR("[Account %s] couldn't save messages to %s: %s",
     311             :                  account_.getAccountID().c_str(),
     312             :                  savePath_.c_str(),
     313             :                  e.what());
     314           0 :     }
     315       62172 : }
     316             : 
     317             : } // namespace im
     318             : } // namespace jami

Generated by: LCOV version 1.14