Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "message_engine.h"
21 : #include "sip/sipaccountbase.h"
22 : #include "manager.h"
23 : #include "fileutils.h"
24 :
25 : #include "client/ring_signal.h"
26 : #include "jami/account_const.h"
27 :
28 : #include <opendht/thread_pool.h>
29 : #include <json/json.h>
30 : #include <fmt/std.h>
31 :
32 : #include <fstream>
33 :
34 : namespace jami {
35 : namespace im {
36 :
37 608 : MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
38 608 : : account_(acc)
39 608 : , savePath_(path)
40 : {
41 608 : dhtnet::fileutils::check_dir(savePath_.parent_path());
42 608 : }
43 :
44 : MessageToken
45 39205 : MessageEngine::sendMessage(const std::string& to,
46 : const std::string& deviceId,
47 : const std::map<std::string, std::string>& payloads,
48 : uint64_t refreshToken)
49 : {
50 39205 : if (payloads.empty() or to.empty())
51 0 : return 0;
52 : MessageToken token;
53 : {
54 39201 : std::lock_guard lock(messagesMutex_);
55 :
56 39202 : auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
57 39205 : auto previousIt = peerMessages.find(refreshToken);
58 39206 : if (previousIt != peerMessages.end() && previousIt->second.status != MessageStatus::SENT) {
59 44040 : JAMI_DEBUG("[message {:d}] Replace content", refreshToken);
60 14681 : token = refreshToken;
61 14681 : previousIt->second.to = to;
62 14681 : previousIt->second.payloads = payloads;
63 14681 : previousIt->second.status = MessageStatus::IDLE;
64 : } else {
65 : do {
66 24525 : token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(
67 24524 : account_.rand);
68 24526 : } while (peerMessages.find(token) != peerMessages.end());
69 24525 : auto m = peerMessages.emplace(token, Message {});
70 24524 : m.first->second.to = to;
71 24523 : m.first->second.payloads = payloads;
72 : }
73 39209 : save_();
74 39210 : }
75 78420 : runOnMainThread([this, to, deviceId]() { retrySend(to, true, deviceId); });
76 39210 : return token;
77 : }
78 :
79 : void
80 2427 : MessageEngine::onPeerOnline(const std::string& peer,
81 : bool retryOnTimeout,
82 : const std::string& deviceId)
83 : {
84 2427 : retrySend(peer, retryOnTimeout, deviceId);
85 2427 : }
86 :
87 : void
88 41637 : MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout, const std::string& deviceId)
89 : {
90 41637 : if (account_.getRegistrationState() != RegistrationState::REGISTERED)
91 1890 : return;
92 : struct PendingMsg
93 : {
94 : MessageToken token;
95 : std::string to;
96 : std::map<std::string, std::string> payloads;
97 : };
98 41625 : std::vector<PendingMsg> pending {};
99 41625 : auto now = clock::now();
100 : {
101 41625 : std::lock_guard lock(messagesMutex_);
102 :
103 41625 : auto& m = deviceId.empty() ? messages_ : messagesDevices_;
104 41625 : auto p = m.find(deviceId.empty() ? peer : deviceId);
105 41625 : if (p == m.end())
106 1879 : return;
107 39746 : auto& messages = p->second;
108 :
109 1075098 : for (auto m = messages.begin(); m != messages.end(); ++m) {
110 1035352 : if (m->second.status == MessageStatus::UNKNOWN
111 1035352 : || m->second.status == MessageStatus::IDLE) {
112 35329 : m->second.status = MessageStatus::SENDING;
113 35329 : m->second.retried++;
114 35329 : m->second.last_op = now;
115 35329 : pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
116 : }
117 : }
118 41625 : }
119 : // avoid locking while calling callback
120 75075 : for (const auto& p : pending) {
121 105987 : JAMI_DEBUG("[message {:d}] Retry sending", p.token);
122 35329 : if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
123 1466 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
124 1466 : account_.getAccountID(),
125 : "",
126 1466 : p.to,
127 2932 : std::to_string(p.token),
128 : (int) libjami::Account::MessageStates::SENDING);
129 35329 : account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
130 : }
131 41625 : }
132 :
133 : MessageStatus
134 0 : MessageEngine::getStatus(MessageToken t) const
135 : {
136 0 : std::lock_guard lock(messagesMutex_);
137 0 : for (const auto& p : messages_) {
138 0 : const auto m = p.second.find(t);
139 0 : if (m != p.second.end())
140 0 : return m->second.status;
141 : }
142 0 : return MessageStatus::UNKNOWN;
143 0 : }
144 :
145 : void
146 37800 : MessageEngine::onMessageSent(const std::string& peer,
147 : MessageToken token,
148 : bool ok,
149 : const std::string& deviceId)
150 : {
151 113400 : JAMI_DEBUG("[message {:d}] Message sent: {:s}", token, ok ? "success"sv : "failure"sv);
152 37800 : std::lock_guard lock(messagesMutex_);
153 37800 : auto& m = deviceId.empty() ? messages_ : messagesDevices_;
154 :
155 37800 : auto p = m.find(deviceId.empty() ? peer : deviceId);
156 37800 : if (p == m.end()) {
157 7428 : JAMI_WARNING("[message {:d}] Not found", token);
158 2476 : return;
159 : }
160 :
161 35324 : auto f = p->second.find(token);
162 35324 : if (f != p->second.end()) {
163 70648 : auto emit = f->second.payloads.find("application/im-gitmessage-id")
164 70648 : == f->second.payloads.end();
165 35324 : if (f->second.status == MessageStatus::SENDING) {
166 27207 : if (ok) {
167 22905 : f->second.status = MessageStatus::SENT;
168 22905 : JAMI_DBG() << "[message " << token << "] Status changed to SENT";
169 22905 : if (emit)
170 1 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
171 1 : account_.getAccountID(),
172 : "",
173 1 : f->second.to,
174 2 : std::to_string(token),
175 : static_cast<int>(libjami::Account::MessageStates::SENT));
176 22905 : save_();
177 4302 : } else if (f->second.retried >= MAX_RETRIES) {
178 57 : f->second.status = MessageStatus::FAILURE;
179 57 : JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
180 57 : if (emit)
181 0 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
182 0 : account_.getAccountID(),
183 : "",
184 0 : f->second.to,
185 0 : std::to_string(token),
186 : static_cast<int>(libjami::Account::MessageStates::FAILURE));
187 57 : save_();
188 : } else {
189 4245 : f->second.status = MessageStatus::IDLE;
190 12735 : JAMI_DEBUG("[message {:d}] Status changed to IDLE", token);
191 : }
192 : } else {
193 24351 : JAMI_DEBUG("[message {:d}] State is not SENDING", token);
194 : }
195 : } else {
196 0 : JAMI_DEBUG("[message {:d}] Can't find message", token);
197 : }
198 37800 : }
199 :
200 : void
201 24 : MessageEngine::load()
202 : {
203 : try {
204 24 : Json::Value root;
205 : {
206 24 : std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
207 24 : std::ifstream file;
208 24 : file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
209 24 : file.open(savePath_);
210 0 : if (file.is_open())
211 0 : file >> root;
212 48 : }
213 0 : std::lock_guard lock(messagesMutex_);
214 0 : long unsigned loaded {0};
215 0 : for (auto i = root.begin(); i != root.end(); ++i) {
216 0 : auto to = i.key().asString();
217 0 : auto& pmessages = *i;
218 0 : auto& p = messages_[to];
219 0 : for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
220 0 : const auto& jmsg = *m;
221 0 : MessageToken token = from_hex_string(m.key().asString());
222 0 : Message msg;
223 0 : msg.status = (MessageStatus) jmsg["status"].asInt();
224 0 : msg.to = jmsg["to"].asString();
225 0 : auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
226 0 : msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
227 0 : msg.retried = jmsg.get("retried", 0).asUInt();
228 0 : const auto& pl = jmsg["payload"];
229 0 : for (auto p = pl.begin(); p != pl.end(); ++p)
230 0 : msg.payloads[p.key().asString()] = p->asString();
231 0 : p.emplace(token, std::move(msg));
232 0 : loaded++;
233 0 : }
234 0 : }
235 0 : if (loaded > 0) {
236 0 : JAMI_DBG("[Account %s] loaded %lu messages from %s",
237 : account_.getAccountID().c_str(),
238 : loaded,
239 : savePath_.c_str());
240 : }
241 48 : } catch (const std::exception& e) {
242 24 : JAMI_DBG("[Account %s] couldn't load messages from %s: %s",
243 : account_.getAccountID().c_str(),
244 : savePath_.c_str(),
245 : e.what());
246 24 : }
247 24 : }
248 :
249 : void
250 0 : MessageEngine::save() const
251 : {
252 0 : std::lock_guard lock(messagesMutex_);
253 0 : save_();
254 0 : }
255 :
256 : void
257 62164 : MessageEngine::save_() const
258 : {
259 : try {
260 62164 : Json::Value root(Json::objectValue);
261 1310400 : for (auto& c : messages_) {
262 1245117 : Json::Value peerRoot(Json::objectValue);
263 3436473 : for (auto& m : c.second) {
264 2177059 : auto& v = m.second;
265 2177059 : if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT
266 2168385 : || v.status == MessageStatus::CANCELLED)
267 8663 : continue;
268 2168396 : Json::Value msg;
269 2178884 : msg["status"] = (int) (v.status == MessageStatus::SENDING ? MessageStatus::IDLE
270 4353230 : : v.status);
271 2181656 : msg["to"] = v.to;
272 2172691 : auto wall_time = std::chrono::system_clock::now()
273 4355447 : + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
274 2174736 : msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
275 2177889 : msg["retried"] = v.retried;
276 2181139 : auto& payloads = msg["payload"];
277 4355674 : for (const auto& p : v.payloads)
278 2166049 : payloads[p.first] = p.second;
279 2162004 : peerRoot[to_hex_string(m.first)] = std::move(msg);
280 2181623 : }
281 1235628 : if (peerRoot.size() == 0)
282 9458 : continue;
283 1235361 : root[c.first] = std::move(peerRoot);
284 1248267 : }
285 :
286 : // Save asynchronously
287 186321 : dht::ThreadPool::computation().run([path = savePath_,
288 62168 : root = std::move(root),
289 62172 : accountID = account_.getAccountID()] {
290 62170 : std::lock_guard lock(dhtnet::fileutils::getFileLock(path));
291 : try {
292 62169 : Json::StreamWriterBuilder wbuilder;
293 62171 : wbuilder["commentStyle"] = "None";
294 62170 : wbuilder["indentation"] = "";
295 62169 : std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
296 62167 : std::ofstream file;
297 62171 : file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
298 62167 : file.open(path, std::ios::trunc);
299 62161 : if (file.is_open())
300 62162 : writer->write(root, &file);
301 62176 : } catch (const std::exception& e) {
302 18 : JAMI_ERROR("[Account {:s}] Couldn't save messages to {:s}: {:s}",
303 : accountID,
304 : path.string(),
305 : e.what());
306 6 : }
307 186461 : JAMI_DEBUG("[Account {:s}] saved {:d} messages to {:s}", accountID, root.size(), path.string());
308 62171 : });
309 62172 : } catch (const std::exception& e) {
310 0 : JAMI_ERR("[Account %s] couldn't save messages to %s: %s",
311 : account_.getAccountID().c_str(),
312 : savePath_.c_str(),
313 : e.what());
314 0 : }
315 62172 : }
316 :
317 : } // namespace im
318 : } // namespace jami
|