Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "message_engine.h"
19 : #include "sip/sipaccountbase.h"
20 : #include "manager.h"
21 : #include "fileutils.h"
22 :
23 : #include "client/ring_signal.h"
24 : #include "jami/account_const.h"
25 :
26 : #include <opendht/thread_pool.h>
27 : #include <fmt/std.h>
28 :
29 : #include <fstream>
30 :
31 : namespace jami {
32 : namespace im {
33 :
34 808 : MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
35 808 : : account_(acc)
36 808 : , savePath_(path)
37 808 : , ioContext_(Manager::instance().ioContext())
38 808 : , saveTimer_(*ioContext_)
39 : {
40 808 : dhtnet::fileutils::check_dir(savePath_.parent_path());
41 808 : }
42 :
43 : MessageToken
44 23152 : MessageEngine::sendMessage(const std::string& to,
45 : const std::string& deviceId,
46 : const std::map<std::string, std::string>& payloads,
47 : uint64_t refreshToken)
48 : {
49 23152 : if (payloads.empty() or to.empty())
50 0 : return 0;
51 23152 : MessageToken token = 0;
52 : {
53 23152 : std::lock_guard lock(messagesMutex_);
54 23151 : auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
55 23153 : if (refreshToken != 0) {
56 16249 : for (auto& m : peerMessages) {
57 2294 : if (m.token == refreshToken) {
58 2031 : token = refreshToken;
59 2031 : m.to = to;
60 2031 : m.payloads = payloads;
61 2031 : m.status = MessageStatus::IDLE;
62 2031 : break;
63 : }
64 : }
65 : }
66 23154 : if (token == 0) {
67 21122 : token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
68 21120 : auto& m = peerMessages.emplace_back(Message {token});
69 21120 : m.to = to;
70 21121 : m.payloads = payloads;
71 : }
72 23153 : scheduleSave();
73 23154 : }
74 46308 : ioContext_->post([this, to, deviceId]() { retrySend(to, deviceId, true); });
75 23154 : return token;
76 : }
77 :
78 : void
79 4170 : MessageEngine::onPeerOnline(const std::string& peer,
80 : const std::string& deviceId,
81 : bool retryOnTimeout)
82 : {
83 4170 : retrySend(peer, deviceId, retryOnTimeout);
84 4171 : }
85 :
86 : void
87 27324 : MessageEngine::retrySend(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
88 : {
89 27324 : if (account_.getRegistrationState() != RegistrationState::REGISTERED)
90 2925 : return;
91 : struct PendingMsg {
92 : MessageToken token;
93 : std::string to;
94 : std::map<std::string, std::string> payloads;
95 : };
96 27319 : std::vector<PendingMsg> pending {};
97 27318 : auto now = clock::now();
98 : {
99 27318 : std::lock_guard lock(messagesMutex_);
100 27320 : auto& m = deviceId.empty() ? messages_ : messagesDevices_;
101 27320 : auto p = m.find(deviceId.empty() ? peer : deviceId);
102 27320 : if (p == m.end())
103 2919 : return;
104 24399 : auto& messages = p->second;
105 :
106 62382 : for (auto& m: messages) {
107 37983 : if (m.status == MessageStatus::IDLE) {
108 37932 : m.status = MessageStatus::SENDING;
109 37932 : m.retried++;
110 37932 : m.last_op = now;
111 37932 : pending.emplace_back(PendingMsg {m.token, m.to, m.payloads});
112 : }
113 : }
114 27319 : }
115 : // avoid locking while calling callback
116 62332 : for (const auto& p : pending) {
117 113796 : JAMI_DEBUG("[message {:d}] Reattempt sending", p.token);
118 37932 : if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
119 19651 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
120 19651 : account_.getAccountID(),
121 : "",
122 19651 : p.to,
123 39302 : std::to_string(p.token),
124 : (int) libjami::Account::MessageStates::SENDING);
125 37932 : account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
126 : }
127 27319 : }
128 :
129 : MessageStatus
130 0 : MessageEngine::getStatus(MessageToken t) const
131 : {
132 0 : std::lock_guard lock(messagesMutex_);
133 0 : for (const auto& p : messages_) {
134 0 : for (const auto& m : p.second) {
135 0 : if (m.token == t)
136 0 : return m.status;
137 : }
138 : }
139 0 : return MessageStatus::UNKNOWN;
140 0 : }
141 :
142 : void
143 59182 : MessageEngine::onMessageSent(const std::string& peer,
144 : MessageToken token,
145 : bool ok,
146 : const std::string& deviceId)
147 : {
148 177511 : JAMI_DEBUG("[message {:d}] Message sent: {:s}", token, ok ? "success"sv : "failure"sv);
149 59209 : std::lock_guard lock(messagesMutex_);
150 59208 : auto& m = deviceId.empty() ? messages_ : messagesDevices_;
151 :
152 59208 : auto p = m.find(deviceId.empty() ? peer : deviceId);
153 59208 : if (p == m.end()) {
154 0 : JAMI_WARNING("onMessageSent: Peer not found: id:{} device:{}", peer, deviceId);
155 0 : return;
156 : }
157 :
158 59207 : auto f = std::find_if(p->second.begin(), p->second.end(), [&](const Message& m) {
159 301294 : return m.token == token;
160 : });
161 59208 : if (f != p->second.end()) {
162 108374 : auto emit = f->payloads.find("application/im-gitmessage-id")
163 108374 : == f->payloads.end();
164 54187 : if (f->status == MessageStatus::SENDING) {
165 37890 : if (ok) {
166 19685 : f->status = MessageStatus::SENT;
167 59055 : JAMI_LOG("[message {:d}] Status changed to SENT", token);
168 19685 : if (emit)
169 4347 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
170 4347 : account_.getAccountID(),
171 : "",
172 4347 : f->to,
173 8694 : std::to_string(token),
174 : static_cast<int>(libjami::Account::MessageStates::SENT));
175 19685 : p->second.erase(f);
176 19685 : scheduleSave();
177 18205 : } else if (f->retried >= MAX_RETRIES) {
178 98 : f->status = MessageStatus::FAILURE;
179 294 : JAMI_WARNING("[message {:d}] Status changed to FAILURE", token);
180 98 : if (emit)
181 84 : emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
182 84 : account_.getAccountID(),
183 : "",
184 84 : f->to,
185 168 : std::to_string(token),
186 : static_cast<int>(libjami::Account::MessageStates::FAILURE));
187 98 : p->second.erase(f);
188 98 : scheduleSave();
189 : } else {
190 18107 : f->status = MessageStatus::IDLE;
191 54321 : JAMI_DEBUG("[message {:d}] Status changed to IDLE", token);
192 : }
193 : } else {
194 48891 : JAMI_DEBUG("[message {:d}] State is not SENDING", token);
195 : }
196 : } else {
197 15063 : JAMI_DEBUG("[message {:d}] Unable to find message", token);
198 : }
199 59208 : }
200 :
201 : void
202 25 : MessageEngine::load()
203 : {
204 : try {
205 25 : decltype(messages_) root;
206 : {
207 25 : std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
208 25 : std::ifstream file;
209 25 : file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
210 25 : file.open(savePath_);
211 1 : if (file.is_open()) {
212 1 : msgpack::unpacker up;
213 1 : up.reserve_buffer(UINT16_MAX);
214 1 : while (file.read(up.buffer(), UINT16_MAX)) {
215 0 : up.buffer_consumed(file.gcount());
216 0 : msgpack::object_handle oh;
217 0 : if (up.next(oh)) {
218 0 : root = oh.get().as<std::map<std::string, std::list<Message>>>();
219 0 : break;
220 : }
221 0 : up.reserve_buffer(UINT16_MAX);
222 0 : }
223 1 : }
224 50 : }
225 0 : std::lock_guard lock(messagesMutex_);
226 0 : messages_ = std::move(root);
227 0 : if (not messages_.empty()) {
228 0 : JAMI_LOG("[Account {}] Loaded {} messages from {}",
229 : account_.getAccountID(),
230 : messages_.size(),
231 : savePath_);
232 : }
233 50 : } catch (const std::exception& e) {
234 75 : JAMI_LOG("[Account {}] Unable to load messages from {}: {}",
235 : account_.getAccountID(),
236 : savePath_,
237 : e.what());
238 25 : }
239 25 : }
240 :
241 : void
242 865 : MessageEngine::save() const
243 : {
244 865 : std::lock_guard lock(messagesMutex_);
245 865 : save_();
246 865 : }
247 :
248 : void
249 42934 : MessageEngine::scheduleSave()
250 : {
251 42934 : saveTimer_.expires_after(std::chrono::seconds(5));
252 42936 : saveTimer_.async_wait([this, w = account_.weak_from_this()](const std::error_code& ec) {
253 42937 : if (!ec)
254 864 : if (auto acc = w.lock())
255 864 : save();
256 42937 : });
257 42937 : }
258 :
259 : void
260 865 : MessageEngine::save_() const
261 : {
262 : try {
263 865 : std::ofstream file;
264 865 : file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
265 865 : file.open(savePath_, std::ios::trunc);
266 865 : if (file.is_open())
267 865 : msgpack::pack(file, messages_);
268 865 : } catch (const std::exception& e) {
269 0 : JAMI_ERROR("[Account {}] Unable to serialize pending messages: {}",
270 : account_.getAccountID(), e.what());
271 0 : }
272 865 : }
273 :
274 : } // namespace im
275 : } // namespace jami
|