Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "data_transfer.h"
19 :
20 : #include "base64.h"
21 : #include "fileutils.h"
22 : #include "manager.h"
23 : #include "client/jami_signal.h"
24 :
25 : #include <mutex>
26 : #include <cstdlib> // mkstemp
27 : #include <filesystem>
28 :
29 : #include <opendht/rng.h>
30 : #include <opendht/thread_pool.h>
31 :
32 : namespace jami {
33 :
34 : libjami::DataTransferId
35 64 : generateUID(std::mt19937_64& engine)
36 : {
37 64 : return std::uniform_int_distribution<libjami::DataTransferId> {1, JAMI_ID_MAX_VAL}(engine);
38 : }
39 :
40 : std::string
41 86 : getFileId(const std::string& commitId, const std::string& tid, const std::string& displayName)
42 : {
43 86 : auto extension = fileutils::getFileExtension(displayName);
44 86 : if (extension.empty())
45 164 : return fmt::format("{}_{}", commitId, tid);
46 8 : return fmt::format("{}_{}.{}", commitId, tid, extension);
47 : }
48 :
49 127 : FileInfo::FileInfo(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
50 : const std::string& fileId,
51 : const std::string& interactionId,
52 127 : const libjami::DataTransferInfo& info)
53 127 : : fileId_(fileId)
54 127 : , interactionId_(interactionId)
55 127 : , info_(info)
56 254 : , channel_(channel)
57 127 : {}
58 :
59 : void
60 152 : FileInfo::emit(libjami::DataTransferEventCode code)
61 : {
62 152 : if (finishedCb_ && code >= libjami::DataTransferEventCode::finished)
63 90 : finishedCb_(uint32_t(code));
64 152 : if (interactionId_ != "") {
65 : // Else it's an internal transfer
66 34 : runOnMainThread([info = info_, iid = interactionId_, fid = fileId_, code]() {
67 34 : emitSignal<libjami::DataTransferSignal::DataTransferEvent>(info.accountId,
68 34 : info.conversationId,
69 34 : iid,
70 34 : fid,
71 : uint32_t(code));
72 34 : });
73 : }
74 152 : }
75 :
76 65 : OutgoingFile::OutgoingFile(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
77 : const std::string& fileId,
78 : const std::string& interactionId,
79 : const libjami::DataTransferInfo& info,
80 : size_t start,
81 65 : size_t end)
82 : : FileInfo(channel, fileId, interactionId, info)
83 65 : , start_(start)
84 65 : , end_(end)
85 : {
86 65 : std::filesystem::path fpath(info_.path);
87 65 : if (!std::filesystem::is_regular_file(fpath)) {
88 70 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
89 35 : return;
90 : }
91 30 : stream_.open(fpath, std::ios::binary | std::ios::in);
92 30 : if (!stream_ || !stream_.is_open()) {
93 0 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
94 0 : return;
95 : }
96 65 : }
97 :
98 65 : OutgoingFile::~OutgoingFile()
99 : {
100 65 : if (stream_ && stream_.is_open())
101 0 : stream_.close();
102 65 : if (channel_) {
103 60 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
104 : }
105 65 : }
106 :
107 : void
108 65 : OutgoingFile::process()
109 : {
110 65 : if (!channel_ or !stream_ or !stream_.is_open())
111 35 : return;
112 30 : auto correct = false;
113 30 : stream_.seekg(static_cast<long>(start_), std::ios::beg);
114 : try {
115 30 : std::vector<char> buffer(UINT16_MAX, 0);
116 30 : std::error_code ec;
117 30 : auto pos = start_;
118 266 : while (!stream_.eof()) {
119 238 : stream_.read(buffer.data(),
120 238 : end_ > start_ ? static_cast<long>(std::min(end_ - pos, buffer.size()))
121 238 : : static_cast<long>(buffer.size()));
122 238 : auto gcount = stream_.gcount();
123 238 : pos += gcount;
124 238 : channel_->write(reinterpret_cast<const uint8_t*>(buffer.data()), gcount, ec);
125 238 : if (ec)
126 2 : break;
127 : }
128 30 : if (!ec)
129 28 : correct = true;
130 30 : stream_.close();
131 30 : } catch (const std::exception& e) {
132 0 : JAMI_WARNING("Failed to read from stream: {}", e.what());
133 0 : }
134 30 : if (!isUserCancelled_) {
135 : // NOTE: emit(code) MUST be changed to improve handling of multiple destinations
136 : // But for now, we can just avoid to emit errors to the client, because for outgoing
137 : // transfer in a swarm, for outgoingFiles, we know that the file is ok. And the peer
138 : // will retry the transfer if they need, so we don't need to show errors.
139 30 : if (!interactionId_.empty() && !correct)
140 2 : return;
141 28 : auto code = correct ? libjami::DataTransferEventCode::finished : libjami::DataTransferEventCode::closed_by_peer;
142 28 : emit(code);
143 : }
144 : }
145 :
146 : void
147 0 : OutgoingFile::cancel()
148 : {
149 : // Remove link, not original file
150 0 : auto path = fileutils::get_data_dir() / "conversation_data" / info_.accountId / info_.conversationId / fileId_;
151 0 : if (std::filesystem::is_symlink(path))
152 0 : dhtnet::fileutils::remove(path);
153 0 : isUserCancelled_ = true;
154 0 : emit(libjami::DataTransferEventCode::closed_by_host);
155 0 : }
156 :
157 62 : IncomingFile::IncomingFile(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
158 : const libjami::DataTransferInfo& info,
159 : const std::string& fileId,
160 : const std::string& interactionId,
161 62 : const std::string& sha3Sum)
162 : : FileInfo(channel, fileId, interactionId, info)
163 62 : , sha3Sum_(sha3Sum)
164 124 : , path_(info.path + ".tmp")
165 : {
166 62 : stream_.open(path_, std::ios::binary | std::ios::out | std::ios::app);
167 62 : if (!stream_)
168 0 : return;
169 :
170 62 : emit(libjami::DataTransferEventCode::ongoing);
171 0 : }
172 :
173 62 : IncomingFile::~IncomingFile()
174 : {
175 : {
176 62 : std::lock_guard<std::mutex> lk(streamMtx_);
177 62 : if (stream_ && stream_.is_open())
178 1 : stream_.close();
179 62 : }
180 62 : if (channel_)
181 122 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
182 62 : }
183 :
184 : void
185 1 : IncomingFile::cancel()
186 : {
187 1 : isUserCancelled_ = true;
188 1 : emit(libjami::DataTransferEventCode::closed_by_peer);
189 1 : if (channel_)
190 2 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
191 1 : }
192 :
193 : void
194 62 : IncomingFile::process()
195 : {
196 62 : channel_->setOnRecv([w = weak_from_this()](const uint8_t* buf, size_t len) {
197 223 : if (auto shared = w.lock()) {
198 223 : std::lock_guard<std::mutex> lk(shared->streamMtx_);
199 223 : if (shared->stream_.is_open())
200 223 : shared->stream_.write(reinterpret_cast<const char*>(buf), static_cast<long>(len));
201 223 : shared->info_.bytesProgress = shared->stream_.tellp();
202 223 : return static_cast<int>(len);
203 446 : }
204 : // Data received after destruction
205 0 : JAMI_ERROR("{} bytes received after IncomingFile destruction.", len);
206 0 : return -1;
207 : });
208 62 : channel_->onShutdown([w = weak_from_this()](const std::error_code& /*error_code*/) {
209 62 : auto shared = w.lock();
210 62 : if (!shared)
211 1 : return;
212 : {
213 61 : std::lock_guard<std::mutex> lk(shared->streamMtx_);
214 61 : if (shared->stream_ && shared->stream_.is_open())
215 61 : shared->stream_.close();
216 61 : }
217 61 : auto correct = shared->sha3Sum_.empty();
218 61 : std::error_code ec;
219 61 : if (!correct) {
220 18 : if (shared->isUserCancelled_) {
221 0 : std::filesystem::remove(shared->path_, ec);
222 18 : } else if (shared->info_.bytesProgress < shared->info_.totalSize) {
223 4 : JAMI_WARNING("Channel for {} shut down before transfer was complete (progress: {}/{})",
224 : shared->info_.path,
225 : shared->info_.bytesProgress,
226 : shared->info_.totalSize);
227 17 : } else if (shared->info_.totalSize != 0 && shared->info_.bytesProgress > shared->info_.totalSize) {
228 0 : JAMI_WARNING("Removing {} larger than announced: {}/{}",
229 : shared->path_,
230 : shared->info_.bytesProgress,
231 : shared->info_.totalSize);
232 0 : std::filesystem::remove(shared->path_, ec);
233 : } else {
234 17 : auto sha3Sum = fileutils::sha3File(shared->path_);
235 17 : if (shared->sha3Sum_ == sha3Sum) {
236 60 : JAMI_LOG("New file received: {}", shared->info_.path);
237 15 : correct = true;
238 : } else {
239 8 : JAMI_WARNING(
240 : "Removing {} with expected size ({} bytes) but invalid sha3sum (expected: {}, actual: {})",
241 : shared->path_,
242 : shared->info_.totalSize,
243 : shared->sha3Sum_,
244 : sha3Sum);
245 2 : std::filesystem::remove(shared->path_, ec);
246 : }
247 17 : }
248 18 : if (ec) {
249 0 : JAMI_ERROR("Failed to remove file {}: {}", shared->path_, ec.message());
250 : }
251 : }
252 61 : if (correct) {
253 58 : std::filesystem::rename(shared->path_, shared->info_.path, ec);
254 58 : if (ec) {
255 0 : JAMI_ERROR("Failed to rename file from {} to {}: {}", shared->path_, shared->info_.path, ec.message());
256 0 : correct = false;
257 : }
258 : }
259 61 : if (shared->isUserCancelled_)
260 0 : return;
261 61 : auto code = correct ? libjami::DataTransferEventCode::finished : libjami::DataTransferEventCode::closed_by_host;
262 61 : shared->emit(code);
263 61 : dht::ThreadPool::io().run([s = std::move(shared)] {});
264 62 : });
265 62 : }
266 :
267 : //==============================================================================
268 :
269 : class TransferManager::Impl
270 : {
271 : public:
272 1049 : Impl(const std::string& accountId, const std::string& accountUri, const std::string& to, const std::mt19937_64& rand)
273 1049 : : accountId_(accountId)
274 1049 : , accountUri_(accountUri)
275 1049 : , to_(to)
276 1049 : , rand_(rand)
277 : {
278 1049 : if (!to_.empty()) {
279 390 : conversationDataPath_ = fileutils::get_data_dir() / accountId_ / "conversation_data" / to_;
280 390 : dhtnet::fileutils::check_dir(conversationDataPath_);
281 390 : waitingPath_ = conversationDataPath_ / "waiting";
282 : }
283 1049 : profilesPath_ = fileutils::get_data_dir() / accountId_ / "profiles";
284 1049 : accountProfilePath_ = fileutils::get_data_dir() / accountId / "profile.vcf";
285 1049 : loadWaiting();
286 1049 : }
287 :
288 1048 : ~Impl()
289 : {
290 1048 : std::lock_guard lk {mapMutex_};
291 1085 : for (auto& [channel, _] : outgoings_) {
292 74 : dht::ThreadPool::io().run([c = std::move(channel)] { c->shutdown(); });
293 : }
294 1048 : outgoings_.clear();
295 1048 : incomings_.clear();
296 1048 : vcards_.clear();
297 1048 : }
298 :
299 1049 : void loadWaiting()
300 : {
301 : try {
302 : // read file
303 2098 : auto file = fileutils::loadFile(waitingPath_);
304 : // load values
305 0 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
306 0 : std::lock_guard lk {mapMutex_};
307 0 : oh.get().convert(waitingIds_);
308 1049 : } catch (const std::exception& e) {
309 1049 : return;
310 1049 : }
311 : }
312 21 : void saveWaiting()
313 : {
314 21 : std::ofstream file(waitingPath_, std::ios::trunc | std::ios::binary);
315 21 : msgpack::pack(file, waitingIds_);
316 21 : }
317 :
318 : std::string accountId_ {};
319 : std::string accountUri_ {};
320 : std::string to_ {};
321 : std::filesystem::path waitingPath_ {};
322 : std::filesystem::path profilesPath_ {};
323 : std::filesystem::path accountProfilePath_ {};
324 : std::filesystem::path conversationDataPath_ {};
325 :
326 : std::mutex mapMutex_ {};
327 : std::map<std::string, WaitingRequest> waitingIds_ {};
328 : std::map<std::shared_ptr<dhtnet::ChannelSocket>, std::shared_ptr<OutgoingFile>> outgoings_ {};
329 : std::map<std::string, std::shared_ptr<IncomingFile>> incomings_ {};
330 : std::map<std::pair<std::string, std::string>, std::shared_ptr<IncomingFile>> vcards_ {};
331 :
332 : std::mt19937_64 rand_;
333 : };
334 :
335 1049 : TransferManager::TransferManager(const std::string& accountId,
336 : const std::string& accountUri,
337 : const std::string& to,
338 1049 : const std::mt19937_64& rand)
339 1049 : : pimpl_ {std::make_unique<Impl>(accountId, accountUri, to, rand)}
340 1049 : {}
341 :
342 1048 : TransferManager::~TransferManager() {}
343 :
344 : void
345 67 : TransferManager::transferFile(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
346 : const std::string& fileId,
347 : const std::string& interactionId,
348 : const std::string& path,
349 : size_t start,
350 : size_t end,
351 : OnFinishedCb onFinished)
352 : {
353 67 : std::lock_guard lk {pimpl_->mapMutex_};
354 67 : if (pimpl_->outgoings_.find(channel) != pimpl_->outgoings_.end())
355 2 : return;
356 65 : libjami::DataTransferInfo info;
357 65 : info.accountId = pimpl_->accountId_;
358 65 : info.conversationId = pimpl_->to_;
359 65 : info.path = path;
360 65 : auto f = std::make_shared<OutgoingFile>(channel, fileId, interactionId, info, start, end);
361 65 : f->onFinished([w = weak(), channel, onFinished = std::move(onFinished)](uint32_t code) {
362 28 : if (code == uint32_t(libjami::DataTransferEventCode::finished) && onFinished) {
363 6 : onFinished();
364 : }
365 : // schedule destroy outgoing transfer as not needed
366 28 : dht::ThreadPool().computation().run([w, channel] {
367 28 : if (auto sthis_ = w.lock()) {
368 28 : auto& pimpl = sthis_->pimpl_;
369 28 : std::lock_guard lk {pimpl->mapMutex_};
370 28 : auto itO = pimpl->outgoings_.find(channel);
371 28 : if (itO != pimpl->outgoings_.end())
372 28 : pimpl->outgoings_.erase(itO);
373 56 : }
374 28 : });
375 28 : });
376 65 : auto [outFile, _] = pimpl_->outgoings_.emplace(channel, std::move(f));
377 65 : dht::ThreadPool::io().run([w = std::weak_ptr<OutgoingFile>(outFile->second)] {
378 65 : if (auto of = w.lock())
379 65 : of->process();
380 65 : });
381 67 : }
382 :
383 : bool
384 1 : TransferManager::cancel(const std::string& fileId)
385 : {
386 1 : std::lock_guard lk {pimpl_->mapMutex_};
387 : // Remove from waiting, this avoid auto-download
388 1 : auto itW = pimpl_->waitingIds_.find(fileId);
389 1 : if (itW != pimpl_->waitingIds_.end()) {
390 1 : pimpl_->waitingIds_.erase(itW);
391 1 : JAMI_DBG() << "Cancel " << fileId;
392 1 : pimpl_->saveWaiting();
393 : }
394 1 : auto itC = pimpl_->incomings_.find(fileId);
395 1 : if (itC == pimpl_->incomings_.end())
396 0 : return false;
397 1 : itC->second->cancel();
398 1 : return true;
399 1 : }
400 :
401 : bool
402 2 : TransferManager::info(const std::string& fileId, std::string& path, int64_t& total, int64_t& progress) const noexcept
403 : {
404 2 : std::unique_lock lk {pimpl_->mapMutex_};
405 2 : if (pimpl_->to_.empty())
406 0 : return false;
407 :
408 2 : auto itI = pimpl_->incomings_.find(fileId);
409 2 : auto itW = pimpl_->waitingIds_.find(fileId);
410 2 : path = this->path(fileId).string();
411 2 : if (itI != pimpl_->incomings_.end()) {
412 0 : total = itI->second->info().totalSize;
413 0 : progress = itI->second->info().bytesProgress;
414 0 : return true;
415 2 : } else if (std::filesystem::is_regular_file(path)) {
416 1 : std::ifstream transfer(path, std::ios::binary);
417 1 : transfer.seekg(0, std::ios::end);
418 1 : progress = transfer.tellg();
419 1 : if (itW != pimpl_->waitingIds_.end()) {
420 0 : total = static_cast<int64_t>(itW->second.totalSize);
421 : } else {
422 : // If not waiting it's finished
423 1 : total = progress;
424 : }
425 1 : return true;
426 2 : } else if (itW != pimpl_->waitingIds_.end()) {
427 0 : total = static_cast<int64_t>(itW->second.totalSize);
428 0 : progress = 0;
429 0 : return true;
430 : }
431 : // Else we don't know infos there.
432 1 : progress = 0;
433 1 : return false;
434 2 : }
435 :
436 : void
437 13 : TransferManager::waitForTransfer(const std::string& fileId,
438 : const std::string& interactionId,
439 : const std::string& sha3sum,
440 : const std::string& path,
441 : std::size_t total)
442 : {
443 13 : std::unique_lock lk(pimpl_->mapMutex_);
444 13 : auto itW = pimpl_->waitingIds_.find(fileId);
445 13 : if (itW != pimpl_->waitingIds_.end())
446 1 : return;
447 12 : pimpl_->waitingIds_[fileId] = {fileId, interactionId, sha3sum, path, total};
448 12 : pimpl_->saveWaiting();
449 13 : }
450 :
451 : void
452 12 : TransferManager::onIncomingFileTransfer(const std::string& fileId,
453 : const std::shared_ptr<dhtnet::ChannelSocket>& channel,
454 : size_t start)
455 : {
456 12 : std::lock_guard lk(pimpl_->mapMutex_);
457 : // Check if not already an incoming file for this id and that we are waiting this file
458 12 : auto itC = pimpl_->incomings_.find(fileId);
459 12 : if (itC != pimpl_->incomings_.end()) {
460 0 : dht::ThreadPool().io().run([channel] { channel->shutdown(); });
461 0 : return;
462 : }
463 12 : auto itW = pimpl_->waitingIds_.find(fileId);
464 12 : if (itW == pimpl_->waitingIds_.end()) {
465 0 : dht::ThreadPool().io().run([channel] { channel->shutdown(); });
466 0 : return;
467 : }
468 :
469 12 : libjami::DataTransferInfo info;
470 12 : info.accountId = pimpl_->accountId_;
471 12 : info.conversationId = pimpl_->to_;
472 12 : info.path = itW->second.path;
473 12 : info.totalSize = static_cast<int64_t>(itW->second.totalSize);
474 12 : info.bytesProgress = static_cast<int64_t>(start);
475 :
476 : // Generate the file path within the conversation data directory
477 : // using the file id if no path has been specified, otherwise create
478 : // a symlink(Note: this will not work on Windows).
479 12 : auto filePath = path(fileId);
480 12 : if (info.path.empty()) {
481 0 : info.path = filePath.string();
482 : } else {
483 : // We don't need to check if this is an existing symlink here, as
484 : // the attempt to create one should report the error string correctly.
485 12 : fileutils::createFileLink(filePath, info.path);
486 : }
487 :
488 12 : auto ifile = std::make_shared<IncomingFile>(std::move(channel),
489 : info,
490 : fileId,
491 12 : itW->second.interactionId,
492 24 : itW->second.sha3sum);
493 12 : auto res = pimpl_->incomings_.emplace(fileId, std::move(ifile));
494 12 : if (res.second) {
495 12 : res.first->second->onFinished([w = weak(), fileId](uint32_t code) {
496 : // schedule destroy transfer as not needed
497 12 : dht::ThreadPool().computation().run([w, fileId, code] {
498 12 : if (auto sthis_ = w.lock()) {
499 12 : auto& pimpl = sthis_->pimpl_;
500 12 : std::lock_guard lk {pimpl->mapMutex_};
501 12 : auto itO = pimpl->incomings_.find(fileId);
502 12 : if (itO != pimpl->incomings_.end())
503 12 : pimpl->incomings_.erase(itO);
504 12 : if (code == uint32_t(libjami::DataTransferEventCode::finished)) {
505 8 : auto itW = pimpl->waitingIds_.find(fileId);
506 8 : if (itW != pimpl->waitingIds_.end()) {
507 8 : pimpl->waitingIds_.erase(itW);
508 8 : pimpl->saveWaiting();
509 : }
510 : }
511 24 : }
512 12 : });
513 12 : });
514 12 : res.first->second->process();
515 : }
516 12 : }
517 :
518 : std::filesystem::path
519 40 : TransferManager::path(const std::string& fileId) const
520 : {
521 40 : return pimpl_->conversationDataPath_ / fileId;
522 : }
523 :
524 : void
525 53 : TransferManager::onIncomingProfile(const std::shared_ptr<dhtnet::ChannelSocket>& channel, const std::string& sha3Sum)
526 : {
527 53 : if (!channel)
528 3 : return;
529 :
530 53 : auto chName = channel->name();
531 53 : std::string_view name = chName;
532 53 : auto sep = name.find_last_of('?');
533 53 : if (sep != std::string::npos)
534 8 : name = name.substr(0, sep);
535 :
536 53 : auto lastSep = name.find_last_of('/');
537 53 : auto fileId = name.substr(lastSep + 1);
538 :
539 53 : auto deviceId = channel->deviceId().toString();
540 53 : auto cert = channel->peerCertificate();
541 53 : if (!cert || !cert->issuer || fileId.find(".vcf") == std::string::npos)
542 2 : return;
543 :
544 59 : auto uri = fileId == "profile.vcf" ? cert->issuer->getId().toString()
545 59 : : std::string(fileId.substr(0, fileId.size() - 4 /*.vcf*/));
546 :
547 51 : std::lock_guard lk(pimpl_->mapMutex_);
548 51 : auto idx = std::make_pair(deviceId, uri);
549 : // Check if not already an incoming file for this id and that we are waiting this file
550 51 : auto itV = pimpl_->vcards_.find(idx);
551 51 : if (itV != pimpl_->vcards_.end()) {
552 2 : dht::ThreadPool().io().run([channel] { channel->shutdown(); });
553 1 : return;
554 : }
555 :
556 50 : auto tid = generateUID(pimpl_->rand_);
557 50 : libjami::DataTransferInfo info;
558 50 : info.accountId = pimpl_->accountId_;
559 50 : info.conversationId = pimpl_->to_;
560 :
561 100 : auto recvDir = fileutils::get_cache_dir() / pimpl_->accountId_ / "vcard";
562 50 : dhtnet::fileutils::recursive_mkdir(recvDir);
563 100 : info.path = (recvDir / fmt::format("{:s}_{:s}_{}", deviceId, uri, tid)).string();
564 :
565 50 : auto ifile = std::make_shared<IncomingFile>(std::move(channel), info, "profile.vcf", "", sha3Sum);
566 50 : auto res = pimpl_->vcards_.emplace(idx, std::move(ifile));
567 50 : if (res.second) {
568 200 : res.first->second->onFinished([w = weak(),
569 50 : uri = std::move(uri),
570 50 : deviceId = std::move(deviceId),
571 50 : accountId = pimpl_->accountId_,
572 50 : cert = std::move(cert),
573 : path = info.path](uint32_t code) {
574 250 : dht::ThreadPool().computation().run([w,
575 50 : uri = std::move(uri),
576 50 : deviceId = std::move(deviceId),
577 50 : accountId = std::move(accountId),
578 50 : path = std::move(path),
579 50 : code] {
580 50 : if (auto sthis_ = w.lock()) {
581 50 : auto& pimpl = sthis_->pimpl_;
582 :
583 50 : auto destPath = sthis_->profilePath(uri);
584 : try {
585 : // Move profile to destination path
586 50 : std::lock_guard lock(dhtnet::fileutils::getFileLock(destPath));
587 50 : dhtnet::fileutils::recursive_mkdir(destPath.parent_path());
588 50 : std::filesystem::rename(path, destPath);
589 50 : if (!pimpl->accountUri_.empty() && uri == pimpl->accountUri_) {
590 : // If this is the account profile, link or copy it to the account profile path
591 3 : if (!fileutils::createFileLink(pimpl->accountProfilePath_, destPath)) {
592 0 : std::error_code ec;
593 0 : std::filesystem::copy_file(destPath, pimpl->accountProfilePath_, ec);
594 : }
595 : }
596 50 : } catch (const std::exception& e) {
597 0 : JAMI_ERROR("{}", e.what());
598 0 : }
599 :
600 50 : std::lock_guard lk {pimpl->mapMutex_};
601 50 : auto itO = pimpl->vcards_.find({deviceId, uri});
602 50 : if (itO != pimpl->vcards_.end())
603 50 : pimpl->vcards_.erase(itO);
604 50 : if (code == uint32_t(libjami::DataTransferEventCode::finished)) {
605 50 : emitSignal<libjami::ConfigurationSignal::ProfileReceived>(accountId, uri, destPath.string());
606 : }
607 100 : }
608 50 : });
609 50 : });
610 50 : res.first->second->process();
611 : }
612 62 : }
613 :
614 : std::filesystem::path
615 61 : TransferManager::profilePath(const std::string& contactId) const
616 : {
617 122 : return pimpl_->profilesPath_ / fmt::format("{}.vcf", base64::encode(contactId));
618 : }
619 :
620 : std::vector<WaitingRequest>
621 2050 : TransferManager::waitingRequests() const
622 : {
623 2050 : std::vector<WaitingRequest> res;
624 2051 : std::lock_guard lk(pimpl_->mapMutex_);
625 2052 : for (const auto& [fileId, req] : pimpl_->waitingIds_) {
626 1 : auto itC = pimpl_->incomings_.find(fileId);
627 1 : if (itC == pimpl_->incomings_.end())
628 1 : res.emplace_back(req);
629 : }
630 4101 : return res;
631 2050 : }
632 :
633 : bool
634 4 : TransferManager::isWaiting(const std::string& fileId) const
635 : {
636 4 : std::lock_guard lk(pimpl_->mapMutex_);
637 8 : return pimpl_->waitingIds_.find(fileId) != pimpl_->waitingIds_.end();
638 4 : }
639 :
640 : } // namespace jami
|