Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "message_engine.h"
19 : #include "sip/sipaccountbase.h"
20 : #include "manager.h"
21 :
22 : #include "client/jami_signal.h"
23 : #include "jami/account_const.h"
24 :
25 : #include <dhtnet/fileutils.h>
26 : #include <opendht/thread_pool.h>
27 : #include <fmt/std.h>
28 :
29 : #include <fstream>
30 :
31 : namespace jami {
32 : namespace im {
33 :
34 786 : MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
35 786 : : account_(acc)
36 786 : , savePath_(path)
37 786 : , ioContext_(Manager::instance().ioContext())
38 786 : , saveTimer_(*ioContext_)
39 : {
40 786 : dhtnet::fileutils::check_dir(savePath_.parent_path());
41 786 : }
42 :
43 : MessageToken
44 14410 : MessageEngine::sendMessage(const std::string& to,
45 : const std::string& deviceId,
46 : const std::map<std::string, std::string>& payloads,
47 : uint64_t refreshToken)
48 : {
49 14410 : if (payloads.empty() or to.empty())
50 0 : return 0;
51 14410 : MessageToken token = 0;
52 : {
53 14410 : std::lock_guard lock(messagesMutex_);
54 14416 : auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
55 14414 : if (refreshToken != 0) {
56 12183 : for (auto& m : peerMessages) {
57 1060 : if (m.token == refreshToken) {
58 1059 : token = refreshToken;
59 1059 : m.to = to;
60 1059 : m.payloads = payloads;
61 1059 : m.status = MessageStatus::IDLE;
62 1059 : break;
63 : }
64 : }
65 : }
66 14419 : if (token == 0) {
67 13361 : token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
68 13360 : auto& m = peerMessages.emplace_back(Message {token});
69 13348 : m.to = to;
70 13352 : m.payloads = payloads;
71 : }
72 14413 : scheduleSave();
73 14425 : }
74 28850 : asio::post(*ioContext_, [this, to, deviceId]() { retrySend(to, deviceId, true); });
75 14420 : return token;
76 : }
77 :
78 : void
79 1710 : MessageEngine::onPeerOnline(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
80 : {
81 1710 : retrySend(peer, deviceId, retryOnTimeout);
82 1710 : }
83 :
84 : void
85 16135 : MessageEngine::retrySend(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
86 : {
87 : struct PendingMsg
88 : {
89 : MessageToken token;
90 : std::string to;
91 : std::map<std::string, std::string> payloads;
92 : };
93 16135 : std::vector<PendingMsg> pending {};
94 16134 : auto now = clock::now();
95 : {
96 16135 : std::lock_guard lock(messagesMutex_);
97 16136 : auto& m = deviceId.empty() ? messages_ : messagesDevices_;
98 16135 : auto p = m.find(deviceId.empty() ? peer : deviceId);
99 16132 : if (p == m.end())
100 1055 : return;
101 15078 : auto& messages = p->second;
102 :
103 30038 : for (auto& m : messages) {
104 14958 : if (m.status == MessageStatus::IDLE) {
105 14952 : m.status = MessageStatus::SENDING;
106 14952 : m.retried++;
107 14952 : m.last_op = now;
108 14952 : pending.emplace_back(PendingMsg {m.token, m.to, m.payloads});
109 : }
110 : }
111 16134 : }
112 : // avoid locking while calling callback
113 30032 : for (const auto& p : pending) {
114 59812 : JAMI_DEBUG("[Account {:s}] [message {:d}] Reattempt sending", account_.getAccountID(), p.token);
115 14953 : if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
116 584 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
117 584 : account_.getAccountID(),
118 : "",
119 584 : p.to,
120 1168 : std::to_string(p.token),
121 : (int) libjami::Account::MessageStates::SENDING);
122 14953 : account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
123 : }
124 16134 : }
125 :
126 : MessageStatus
127 0 : MessageEngine::getStatus(MessageToken t) const
128 : {
129 0 : std::lock_guard lock(messagesMutex_);
130 0 : for (const auto& p : messages_) {
131 0 : for (const auto& m : p.second) {
132 0 : if (m.token == t)
133 0 : return m.status;
134 : }
135 : }
136 0 : return MessageStatus::UNKNOWN;
137 0 : }
138 :
139 : void
140 14952 : MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok, const std::string& deviceId)
141 : {
142 59804 : JAMI_DEBUG("[Account {:s}] [message {:d}] Message sent: {:s}",
143 : account_.getAccountID(),
144 : token,
145 : ok ? "success"sv : "failure"sv);
146 14952 : std::lock_guard lock(messagesMutex_);
147 14953 : auto& m = deviceId.empty() ? messages_ : messagesDevices_;
148 :
149 14953 : auto p = m.find(deviceId.empty() ? peer : deviceId);
150 14953 : if (p == m.end()) {
151 0 : JAMI_WARNING("[Account {:s}] onMessageSent: Peer not found: id:{} device:{}",
152 : account_.getAccountID(),
153 : peer,
154 : deviceId);
155 0 : return;
156 : }
157 :
158 30089 : auto f = std::find_if(p->second.begin(), p->second.end(), [&](const Message& m) { return m.token == token; });
159 14953 : if (f != p->second.end()) {
160 14953 : auto emit = f->payloads.find("application/im-gitmessage-id") == f->payloads.end();
161 14953 : if (f->status == MessageStatus::SENDING) {
162 14946 : if (ok) {
163 12970 : f->status = MessageStatus::SENT;
164 51879 : JAMI_LOG("[Account {:s}] [message {:d}] Status changed to SENT", account_.getAccountID(), token);
165 12970 : if (emit)
166 395 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
167 395 : account_.getAccountID(),
168 : "",
169 395 : f->to,
170 790 : std::to_string(token),
171 : static_cast<int>(libjami::Account::MessageStates::SENT));
172 12970 : p->second.erase(f);
173 12970 : scheduleSave();
174 1976 : } else if (f->retried >= MAX_RETRIES) {
175 2 : f->status = MessageStatus::FAILURE;
176 8 : JAMI_WARNING("[Account {:s}] [message {:d}] Status changed to FAILURE", account_.getAccountID(), token);
177 2 : if (emit)
178 0 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
179 0 : account_.getAccountID(),
180 : "",
181 0 : f->to,
182 0 : std::to_string(token),
183 : static_cast<int>(libjami::Account::MessageStates::FAILURE));
184 2 : p->second.erase(f);
185 2 : scheduleSave();
186 : } else {
187 1974 : f->status = MessageStatus::IDLE;
188 7896 : JAMI_DEBUG("[Account {:s}] [message {:d}] Status changed to IDLE", account_.getAccountID(), token);
189 : }
190 : } else {
191 28 : JAMI_DEBUG("[Account {:s}] [message {:d}] State is not SENDING", account_.getAccountID(), token);
192 : }
193 : } else {
194 0 : JAMI_DEBUG("[Account {:s}] [message {:d}] Unable to find message", account_.getAccountID(), token);
195 : }
196 14953 : }
197 :
198 : void
199 10 : MessageEngine::load()
200 : {
201 : try {
202 10 : decltype(messages_) root;
203 : {
204 10 : std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
205 10 : std::ifstream file;
206 10 : file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
207 10 : file.open(savePath_);
208 1 : if (file.is_open()) {
209 1 : msgpack::unpacker up;
210 1 : up.reserve_buffer(UINT16_MAX);
211 1 : while (file.read(up.buffer(), UINT16_MAX)) {
212 0 : up.buffer_consumed(file.gcount());
213 0 : msgpack::object_handle oh;
214 0 : if (up.next(oh)) {
215 0 : root = oh.get().as<std::map<std::string, std::list<Message>>>();
216 0 : break;
217 : }
218 0 : up.reserve_buffer(UINT16_MAX);
219 0 : }
220 1 : }
221 20 : }
222 0 : std::lock_guard lock(messagesMutex_);
223 0 : messages_ = std::move(root);
224 0 : if (not messages_.empty()) {
225 0 : JAMI_LOG("[Account {}] Loaded {} messages from {}", account_.getAccountID(), messages_.size(), savePath_);
226 : }
227 20 : } catch (const std::exception& e) {
228 40 : JAMI_LOG("[Account {}] Unable to load messages from {}: {}", account_.getAccountID(), savePath_, e.what());
229 10 : }
230 10 : }
231 :
232 : void
233 152 : MessageEngine::save() const
234 : {
235 152 : std::lock_guard lock(messagesMutex_);
236 152 : save_();
237 152 : }
238 :
239 : void
240 27383 : MessageEngine::scheduleSave()
241 : {
242 27383 : saveTimer_.expires_after(std::chrono::seconds(5));
243 27396 : saveTimer_.async_wait([this, w = account_.weak_from_this()](const std::error_code& ec) {
244 27397 : if (!ec)
245 151 : if (auto acc = w.lock())
246 151 : save();
247 27397 : });
248 27397 : }
249 :
250 : void
251 152 : MessageEngine::save_() const
252 : {
253 : try {
254 152 : std::ofstream file;
255 152 : file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
256 152 : file.open(savePath_, std::ios::trunc);
257 151 : if (file.is_open())
258 151 : msgpack::pack(file, messages_);
259 153 : } catch (const std::exception& e) {
260 4 : JAMI_ERROR("[Account {}] Unable to serialize pending messages: {}", account_.getAccountID(), e.what());
261 1 : }
262 152 : }
263 :
264 : } // namespace im
265 : } // namespace jami
|