Line data Source code
1 : /*
2 : * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "data_transfer.h"
19 :
20 : #include "base64.h"
21 : #include "fileutils.h"
22 : #include "manager.h"
23 : #include "client/jami_signal.h"
24 :
25 : #include <mutex>
26 : #include <cstdlib> // mkstemp
27 : #include <filesystem>
28 :
29 : #include <opendht/rng.h>
30 : #include <opendht/thread_pool.h>
31 :
32 : namespace jami {
33 :
34 : libjami::DataTransferId
35 66 : generateUID(std::mt19937_64& engine)
36 : {
37 66 : return std::uniform_int_distribution<libjami::DataTransferId> {1, JAMI_ID_MAX_VAL}(engine);
38 : }
39 :
40 : std::string
41 85 : getFileId(const std::string& commitId, const std::string& tid, const std::string& displayName)
42 : {
43 85 : auto extension = fileutils::getFileExtension(displayName);
44 85 : if (extension.empty())
45 162 : return fmt::format("{}_{}", commitId, tid);
46 8 : return fmt::format("{}_{}.{}", commitId, tid, extension);
47 : }
48 :
49 133 : FileInfo::FileInfo(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
50 : const std::string& fileId,
51 : const std::string& interactionId,
52 133 : const libjami::DataTransferInfo& info)
53 133 : : fileId_(fileId)
54 133 : , interactionId_(interactionId)
55 133 : , info_(info)
56 266 : , channel_(channel)
57 133 : {}
58 :
59 : void
60 157 : FileInfo::emit(libjami::DataTransferEventCode code)
61 : {
62 157 : if (finishedCb_ && code >= libjami::DataTransferEventCode::finished)
63 93 : finishedCb_(uint32_t(code));
64 157 : if (interactionId_ != "") {
65 : // Else it's an internal transfer
66 34 : runOnMainThread([info = info_, iid = interactionId_, fid = fileId_, code]() {
67 34 : emitSignal<libjami::DataTransferSignal::DataTransferEvent>(info.accountId,
68 34 : info.conversationId,
69 34 : iid,
70 34 : fid,
71 : uint32_t(code));
72 34 : });
73 : }
74 157 : }
75 :
76 69 : OutgoingFile::OutgoingFile(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
77 : const std::string& fileId,
78 : const std::string& interactionId,
79 : const libjami::DataTransferInfo& info,
80 : size_t start,
81 69 : size_t end)
82 : : FileInfo(channel, fileId, interactionId, info)
83 69 : , start_(start)
84 69 : , end_(end)
85 : {
86 69 : std::filesystem::path fpath(info_.path);
87 69 : if (!std::filesystem::is_regular_file(fpath)) {
88 76 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
89 38 : return;
90 : }
91 31 : stream_.open(fpath, std::ios::binary | std::ios::in);
92 31 : if (!stream_ || !stream_.is_open()) {
93 0 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
94 0 : return;
95 : }
96 69 : }
97 :
98 69 : OutgoingFile::~OutgoingFile()
99 : {
100 69 : if (stream_ && stream_.is_open())
101 0 : stream_.close();
102 69 : if (channel_) {
103 62 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
104 : }
105 69 : }
106 :
107 : void
108 69 : OutgoingFile::process()
109 : {
110 69 : if (!channel_ or !stream_ or !stream_.is_open())
111 38 : return;
112 31 : auto correct = false;
113 31 : stream_.seekg(static_cast<long>(start_), std::ios::beg);
114 : try {
115 31 : std::vector<char> buffer(UINT16_MAX, 0);
116 31 : std::error_code ec;
117 31 : auto pos = start_;
118 272 : while (!stream_.eof()) {
119 243 : stream_.read(buffer.data(),
120 243 : end_ > start_ ? static_cast<long>(std::min(end_ - pos, buffer.size()))
121 243 : : static_cast<long>(buffer.size()));
122 243 : auto gcount = stream_.gcount();
123 243 : pos += gcount;
124 243 : channel_->write(reinterpret_cast<const uint8_t*>(buffer.data()), gcount, ec);
125 243 : if (ec)
126 2 : break;
127 : }
128 31 : if (!ec)
129 29 : correct = true;
130 31 : stream_.close();
131 31 : } catch (const std::exception& e) {
132 0 : JAMI_WARNING("Failed to read from stream: {}", e.what());
133 0 : }
134 31 : if (!isUserCancelled_) {
135 : // NOTE: emit(code) MUST be changed to improve handling of multiple destinations
136 : // But for now, we can just avoid to emit errors to the client, because for outgoing
137 : // transfer in a swarm, for outgoingFiles, we know that the file is ok. And the peer
138 : // will retry the transfer if they need, so we don't need to show errors.
139 31 : if (!interactionId_.empty() && !correct)
140 2 : return;
141 29 : auto code = correct ? libjami::DataTransferEventCode::finished : libjami::DataTransferEventCode::closed_by_peer;
142 29 : emit(code);
143 : }
144 : }
145 :
146 : void
147 0 : OutgoingFile::cancel()
148 : {
149 : // Remove link, not original file
150 0 : auto path = fileutils::get_data_dir() / "conversation_data" / info_.accountId / info_.conversationId / fileId_;
151 0 : if (std::filesystem::is_symlink(path))
152 0 : dhtnet::fileutils::remove(path);
153 0 : isUserCancelled_ = true;
154 0 : emit(libjami::DataTransferEventCode::closed_by_host);
155 0 : }
156 :
157 64 : IncomingFile::IncomingFile(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
158 : const libjami::DataTransferInfo& info,
159 : const std::string& fileId,
160 : const std::string& interactionId,
161 64 : const std::string& sha3Sum)
162 : : FileInfo(channel, fileId, interactionId, info)
163 64 : , sha3Sum_(sha3Sum)
164 128 : , path_(info.path + ".tmp")
165 : {
166 64 : stream_.open(path_, std::ios::binary | std::ios::out | std::ios::app);
167 64 : if (!stream_)
168 0 : return;
169 :
170 64 : emit(libjami::DataTransferEventCode::ongoing);
171 0 : }
172 :
173 64 : IncomingFile::~IncomingFile()
174 : {
175 : {
176 64 : std::lock_guard<std::mutex> lk(streamMtx_);
177 64 : if (stream_ && stream_.is_open())
178 1 : stream_.close();
179 64 : }
180 64 : if (channel_)
181 126 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
182 64 : }
183 :
184 : void
185 1 : IncomingFile::cancel()
186 : {
187 1 : isUserCancelled_ = true;
188 1 : emit(libjami::DataTransferEventCode::closed_by_peer);
189 1 : if (channel_)
190 2 : dht::ThreadPool::io().run([channel = std::move(channel_)] { channel->shutdown(); });
191 1 : }
192 :
193 : void
194 64 : IncomingFile::process()
195 : {
196 64 : channel_->setOnRecv([w = weak_from_this()](const uint8_t* buf, size_t len) {
197 223 : if (auto shared = w.lock()) {
198 223 : std::lock_guard<std::mutex> lk(shared->streamMtx_);
199 223 : if (shared->stream_.is_open())
200 223 : shared->stream_.write(reinterpret_cast<const char*>(buf), static_cast<long>(len));
201 223 : shared->info_.bytesProgress = shared->stream_.tellp();
202 223 : return static_cast<int>(len);
203 446 : }
204 : // Data received after destruction
205 0 : JAMI_ERROR("{} bytes received after IncomingFile destruction.", len);
206 0 : return -1;
207 : });
208 64 : channel_->onShutdown([w = weak_from_this()](const std::error_code& /*error_code*/) {
209 64 : auto shared = w.lock();
210 64 : if (!shared)
211 1 : return;
212 : {
213 63 : std::lock_guard<std::mutex> lk(shared->streamMtx_);
214 63 : if (shared->stream_ && shared->stream_.is_open())
215 63 : shared->stream_.close();
216 63 : }
217 63 : auto correct = shared->sha3Sum_.empty();
218 63 : std::error_code ec;
219 63 : if (!correct) {
220 18 : if (shared->isUserCancelled_) {
221 0 : std::filesystem::remove(shared->path_, ec);
222 18 : } else if (shared->info_.bytesProgress < shared->info_.totalSize) {
223 4 : JAMI_WARNING("Channel for {} shut down before transfer was complete (progress: {}/{})",
224 : shared->info_.path,
225 : shared->info_.bytesProgress,
226 : shared->info_.totalSize);
227 17 : } else if (shared->info_.totalSize != 0 && shared->info_.bytesProgress > shared->info_.totalSize) {
228 0 : JAMI_WARNING("Removing {} larger than announced: {}/{}",
229 : shared->path_,
230 : shared->info_.bytesProgress,
231 : shared->info_.totalSize);
232 0 : std::filesystem::remove(shared->path_, ec);
233 : } else {
234 17 : auto sha3Sum = fileutils::sha3File(shared->path_);
235 17 : if (shared->sha3Sum_ == sha3Sum) {
236 60 : JAMI_LOG("New file received: {}", shared->info_.path);
237 15 : correct = true;
238 : } else {
239 8 : JAMI_WARNING(
240 : "Removing {} with expected size ({} bytes) but invalid sha3sum (expected: {}, actual: {})",
241 : shared->path_,
242 : shared->info_.totalSize,
243 : shared->sha3Sum_,
244 : sha3Sum);
245 2 : std::filesystem::remove(shared->path_, ec);
246 : }
247 17 : }
248 18 : if (ec) {
249 0 : JAMI_ERROR("Failed to remove file {}: {}", shared->path_, ec.message());
250 : }
251 : }
252 63 : if (correct) {
253 60 : std::filesystem::rename(shared->path_, shared->info_.path, ec);
254 60 : if (ec) {
255 0 : JAMI_ERROR("Failed to rename file from {} to {}: {}", shared->path_, shared->info_.path, ec.message());
256 0 : correct = false;
257 : }
258 : }
259 63 : if (shared->isUserCancelled_)
260 0 : return;
261 63 : auto code = correct ? libjami::DataTransferEventCode::finished : libjami::DataTransferEventCode::closed_by_host;
262 63 : shared->emit(code);
263 63 : dht::ThreadPool::io().run([s = std::move(shared)] {});
264 64 : });
265 64 : }
266 :
267 : //==============================================================================
268 :
269 : class TransferManager::Impl
270 : {
271 : public:
272 958 : Impl(const std::string& accountId, const std::string& accountUri, const std::string& to, const std::mt19937_64& rand)
273 958 : : accountId_(accountId)
274 958 : , accountUri_(accountUri)
275 958 : , to_(to)
276 958 : , rand_(rand)
277 : {
278 958 : if (!to_.empty()) {
279 391 : conversationDataPath_ = fileutils::get_data_dir() / accountId_ / "conversation_data" / to_;
280 391 : dhtnet::fileutils::check_dir(conversationDataPath_);
281 391 : waitingPath_ = conversationDataPath_ / "waiting";
282 : }
283 958 : profilesPath_ = fileutils::get_data_dir() / accountId_ / "profiles";
284 958 : accountProfilePath_ = fileutils::get_data_dir() / accountId / "profile.vcf";
285 958 : loadWaiting();
286 958 : }
287 :
288 958 : ~Impl()
289 : {
290 958 : std::lock_guard lk {mapMutex_};
291 998 : for (auto& [channel, _] : outgoings_) {
292 80 : dht::ThreadPool::io().run([c = std::move(channel)] { c->shutdown(); });
293 : }
294 958 : outgoings_.clear();
295 958 : incomings_.clear();
296 958 : vcards_.clear();
297 958 : }
298 :
299 958 : void loadWaiting()
300 : {
301 : try {
302 : // read file
303 1916 : auto file = fileutils::loadFile(waitingPath_);
304 : // load values
305 0 : msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
306 0 : std::lock_guard lk {mapMutex_};
307 0 : oh.get().convert(waitingIds_);
308 958 : } catch (const std::exception& e) {
309 958 : return;
310 958 : }
311 : }
312 21 : void saveWaiting()
313 : {
314 21 : std::ofstream file(waitingPath_, std::ios::trunc | std::ios::binary);
315 21 : msgpack::pack(file, waitingIds_);
316 21 : }
317 :
318 : std::string accountId_ {};
319 : std::string accountUri_ {};
320 : std::string to_ {};
321 : std::filesystem::path waitingPath_ {};
322 : std::filesystem::path profilesPath_ {};
323 : std::filesystem::path accountProfilePath_ {};
324 : std::filesystem::path conversationDataPath_ {};
325 :
326 : std::mutex mapMutex_ {};
327 : std::map<std::string, WaitingRequest> waitingIds_ {};
328 : std::map<std::shared_ptr<dhtnet::ChannelSocket>, std::shared_ptr<OutgoingFile>> outgoings_ {};
329 : std::map<std::string, std::shared_ptr<IncomingFile>> incomings_ {};
330 : std::map<std::pair<std::string, std::string>, std::shared_ptr<IncomingFile>> vcards_ {};
331 :
332 : std::mt19937_64 rand_;
333 : };
334 :
335 958 : TransferManager::TransferManager(const std::string& accountId,
336 : const std::string& accountUri,
337 : const std::string& to,
338 958 : const std::mt19937_64& rand)
339 958 : : pimpl_ {std::make_unique<Impl>(accountId, accountUri, to, rand)}
340 958 : {}
341 :
342 958 : TransferManager::~TransferManager() {}
343 :
344 : void
345 71 : TransferManager::transferFile(const std::shared_ptr<dhtnet::ChannelSocket>& channel,
346 : const std::string& fileId,
347 : const std::string& interactionId,
348 : const std::string& path,
349 : size_t start,
350 : size_t end,
351 : OnFinishedCb onFinished)
352 : {
353 71 : std::lock_guard lk {pimpl_->mapMutex_};
354 71 : if (pimpl_->outgoings_.find(channel) != pimpl_->outgoings_.end())
355 2 : return;
356 69 : libjami::DataTransferInfo info;
357 69 : info.accountId = pimpl_->accountId_;
358 69 : info.conversationId = pimpl_->to_;
359 69 : info.path = path;
360 69 : auto f = std::make_shared<OutgoingFile>(channel, fileId, interactionId, info, start, end);
361 69 : f->onFinished([w = weak(), channel, onFinished = std::move(onFinished)](uint32_t code) {
362 29 : if (code == uint32_t(libjami::DataTransferEventCode::finished) && onFinished) {
363 6 : onFinished();
364 : }
365 : // schedule destroy outgoing transfer as not needed
366 29 : dht::ThreadPool().computation().run([w, channel] {
367 29 : if (auto sthis_ = w.lock()) {
368 29 : auto& pimpl = sthis_->pimpl_;
369 29 : std::lock_guard lk {pimpl->mapMutex_};
370 29 : auto itO = pimpl->outgoings_.find(channel);
371 29 : if (itO != pimpl->outgoings_.end())
372 29 : pimpl->outgoings_.erase(itO);
373 58 : }
374 29 : });
375 29 : });
376 69 : auto [outFile, _] = pimpl_->outgoings_.emplace(channel, std::move(f));
377 69 : dht::ThreadPool::io().run([w = std::weak_ptr<OutgoingFile>(outFile->second)] {
378 69 : if (auto of = w.lock())
379 69 : of->process();
380 69 : });
381 71 : }
382 :
383 : bool
384 1 : TransferManager::cancel(const std::string& fileId)
385 : {
386 1 : std::lock_guard lk {pimpl_->mapMutex_};
387 : // Remove from waiting, this avoid auto-download
388 1 : auto itW = pimpl_->waitingIds_.find(fileId);
389 1 : if (itW != pimpl_->waitingIds_.end()) {
390 1 : pimpl_->waitingIds_.erase(itW);
391 1 : JAMI_DBG() << "Cancel " << fileId;
392 1 : pimpl_->saveWaiting();
393 : }
394 1 : auto itC = pimpl_->incomings_.find(fileId);
395 1 : if (itC == pimpl_->incomings_.end())
396 0 : return false;
397 1 : itC->second->cancel();
398 1 : return true;
399 1 : }
400 :
401 : bool
402 2 : TransferManager::info(const std::string& fileId, std::string& path, int64_t& total, int64_t& progress) const noexcept
403 : {
404 2 : std::unique_lock lk {pimpl_->mapMutex_};
405 2 : if (pimpl_->to_.empty())
406 0 : return false;
407 :
408 2 : auto itI = pimpl_->incomings_.find(fileId);
409 2 : auto itW = pimpl_->waitingIds_.find(fileId);
410 2 : path = this->path(fileId).string();
411 2 : if (itI != pimpl_->incomings_.end()) {
412 0 : total = itI->second->info().totalSize;
413 0 : progress = itI->second->info().bytesProgress;
414 0 : return true;
415 2 : } else if (std::filesystem::is_regular_file(path)) {
416 1 : std::ifstream transfer(path, std::ios::binary);
417 1 : transfer.seekg(0, std::ios::end);
418 1 : progress = transfer.tellg();
419 1 : if (itW != pimpl_->waitingIds_.end()) {
420 0 : total = static_cast<int64_t>(itW->second.totalSize);
421 : } else {
422 : // If not waiting it's finished
423 1 : total = progress;
424 : }
425 1 : return true;
426 2 : } else if (itW != pimpl_->waitingIds_.end()) {
427 0 : total = static_cast<int64_t>(itW->second.totalSize);
428 0 : progress = 0;
429 0 : return true;
430 : }
431 : // Else we don't know infos there.
432 1 : progress = 0;
433 1 : return false;
434 2 : }
435 :
436 : void
437 13 : TransferManager::waitForTransfer(const std::string& fileId,
438 : const std::string& interactionId,
439 : const std::string& sha3sum,
440 : const std::string& path,
441 : std::size_t total)
442 : {
443 13 : std::unique_lock lk(pimpl_->mapMutex_);
444 13 : auto itW = pimpl_->waitingIds_.find(fileId);
445 13 : if (itW != pimpl_->waitingIds_.end())
446 1 : return;
447 12 : pimpl_->waitingIds_[fileId] = {fileId, interactionId, sha3sum, path, total};
448 12 : pimpl_->saveWaiting();
449 13 : }
450 :
451 : void
452 12 : TransferManager::onIncomingFileTransfer(const std::string& fileId,
453 : const std::shared_ptr<dhtnet::ChannelSocket>& channel,
454 : size_t start)
455 : {
456 12 : std::lock_guard lk(pimpl_->mapMutex_);
457 : // Check if not already an incoming file for this id and that we are waiting this file
458 12 : auto itC = pimpl_->incomings_.find(fileId);
459 12 : if (itC != pimpl_->incomings_.end()) {
460 0 : dht::ThreadPool().io().run([channel] { channel->shutdown(); });
461 0 : return;
462 : }
463 12 : auto itW = pimpl_->waitingIds_.find(fileId);
464 12 : if (itW == pimpl_->waitingIds_.end()) {
465 0 : dht::ThreadPool().io().run([channel] { channel->shutdown(); });
466 0 : return;
467 : }
468 :
469 12 : libjami::DataTransferInfo info;
470 12 : info.accountId = pimpl_->accountId_;
471 12 : info.conversationId = pimpl_->to_;
472 12 : info.path = itW->second.path;
473 12 : info.totalSize = static_cast<int64_t>(itW->second.totalSize);
474 12 : info.bytesProgress = static_cast<int64_t>(start);
475 :
476 : // Generate the file path within the conversation data directory
477 : // using the file id if no path has been specified, otherwise create
478 : // a symlink(Note: this will not work on Windows).
479 12 : auto filePath = path(fileId);
480 12 : if (info.path.empty()) {
481 0 : info.path = filePath.string();
482 : } else {
483 : // We don't need to check if this is an existing symlink here, as
484 : // the attempt to create one should report the error string correctly.
485 12 : fileutils::createFileLink(filePath, info.path);
486 : }
487 :
488 12 : auto ifile = std::make_shared<IncomingFile>(std::move(channel),
489 : info,
490 : fileId,
491 12 : itW->second.interactionId,
492 24 : itW->second.sha3sum);
493 12 : auto res = pimpl_->incomings_.emplace(fileId, std::move(ifile));
494 12 : if (res.second) {
495 12 : res.first->second->onFinished([w = weak(), fileId](uint32_t code) {
496 : // schedule destroy transfer as not needed
497 12 : dht::ThreadPool().computation().run([w, fileId, code] {
498 12 : if (auto sthis_ = w.lock()) {
499 12 : auto& pimpl = sthis_->pimpl_;
500 12 : std::lock_guard lk {pimpl->mapMutex_};
501 12 : auto itO = pimpl->incomings_.find(fileId);
502 12 : if (itO != pimpl->incomings_.end())
503 12 : pimpl->incomings_.erase(itO);
504 12 : if (code == uint32_t(libjami::DataTransferEventCode::finished)) {
505 8 : auto itW = pimpl->waitingIds_.find(fileId);
506 8 : if (itW != pimpl->waitingIds_.end()) {
507 8 : pimpl->waitingIds_.erase(itW);
508 8 : pimpl->saveWaiting();
509 : }
510 : }
511 24 : }
512 12 : });
513 12 : });
514 12 : res.first->second->process();
515 : }
516 12 : }
517 :
518 : std::filesystem::path
519 39 : TransferManager::path(const std::string& fileId) const
520 : {
521 39 : return pimpl_->conversationDataPath_ / fileId;
522 : }
523 :
524 : void
525 54 : TransferManager::onIncomingProfile(const std::shared_ptr<dhtnet::ChannelSocket>& channel, const std::string& sha3Sum)
526 : {
527 54 : if (!channel)
528 2 : return;
529 :
530 54 : auto chName = channel->name();
531 54 : std::string_view name = chName;
532 54 : auto sep = name.find_last_of('?');
533 54 : if (sep != std::string::npos)
534 7 : name = name.substr(0, sep);
535 :
536 54 : auto lastSep = name.find_last_of('/');
537 54 : auto fileId = name.substr(lastSep + 1);
538 :
539 54 : auto deviceId = channel->deviceId().toString();
540 54 : auto cert = channel->peerCertificate();
541 54 : if (!cert || !cert->issuer || fileId.find(".vcf") == std::string::npos)
542 2 : return;
543 :
544 59 : auto uri = fileId == "profile.vcf" ? cert->issuer->getId().toString()
545 59 : : std::string(fileId.substr(0, fileId.size() - 4 /*.vcf*/));
546 :
547 52 : std::lock_guard lk(pimpl_->mapMutex_);
548 52 : auto idx = std::make_pair(deviceId, uri);
549 : // Check if not already an incoming file for this id and that we are waiting this file
550 52 : auto itV = pimpl_->vcards_.find(idx);
551 52 : if (itV != pimpl_->vcards_.end()) {
552 0 : dht::ThreadPool().io().run([channel] { channel->shutdown(); });
553 0 : return;
554 : }
555 :
556 52 : auto tid = generateUID(pimpl_->rand_);
557 52 : libjami::DataTransferInfo info;
558 52 : info.accountId = pimpl_->accountId_;
559 52 : info.conversationId = pimpl_->to_;
560 :
561 104 : auto recvDir = fileutils::get_cache_dir() / pimpl_->accountId_ / "vcard";
562 52 : dhtnet::fileutils::recursive_mkdir(recvDir);
563 104 : info.path = (recvDir / fmt::format("{:s}_{:s}_{}", deviceId, uri, tid)).string();
564 :
565 52 : auto ifile = std::make_shared<IncomingFile>(std::move(channel), info, "profile.vcf", "", sha3Sum);
566 52 : auto res = pimpl_->vcards_.emplace(idx, std::move(ifile));
567 52 : if (res.second) {
568 208 : res.first->second->onFinished([w = weak(),
569 52 : uri = std::move(uri),
570 52 : deviceId = std::move(deviceId),
571 52 : accountId = pimpl_->accountId_,
572 52 : cert = std::move(cert),
573 : path = info.path](uint32_t code) {
574 260 : dht::ThreadPool().computation().run([w,
575 52 : uri = std::move(uri),
576 52 : deviceId = std::move(deviceId),
577 52 : accountId = std::move(accountId),
578 52 : path = std::move(path),
579 52 : code] {
580 52 : if (auto sthis_ = w.lock()) {
581 52 : auto& pimpl = sthis_->pimpl_;
582 :
583 52 : auto destPath = sthis_->profilePath(uri);
584 : try {
585 : // Move profile to destination path
586 52 : std::lock_guard lock(dhtnet::fileutils::getFileLock(destPath));
587 52 : dhtnet::fileutils::recursive_mkdir(destPath.parent_path());
588 52 : std::filesystem::rename(path, destPath);
589 52 : if (!pimpl->accountUri_.empty() && uri == pimpl->accountUri_) {
590 : // If this is the account profile, link or copy it to the account profile path
591 3 : if (!fileutils::createFileLink(pimpl->accountProfilePath_, destPath)) {
592 0 : std::error_code ec;
593 0 : std::filesystem::copy_file(destPath, pimpl->accountProfilePath_, ec);
594 : }
595 : }
596 52 : } catch (const std::exception& e) {
597 0 : JAMI_ERROR("{}", e.what());
598 0 : }
599 :
600 52 : std::lock_guard lk {pimpl->mapMutex_};
601 52 : auto itO = pimpl->vcards_.find({deviceId, uri});
602 52 : if (itO != pimpl->vcards_.end())
603 52 : pimpl->vcards_.erase(itO);
604 52 : if (code == uint32_t(libjami::DataTransferEventCode::finished)) {
605 52 : emitSignal<libjami::ConfigurationSignal::ProfileReceived>(accountId, uri, destPath.string());
606 : }
607 104 : }
608 52 : });
609 52 : });
610 52 : res.first->second->process();
611 : }
612 58 : }
613 :
614 : std::filesystem::path
615 65 : TransferManager::profilePath(const std::string& contactId) const
616 : {
617 130 : return pimpl_->profilesPath_ / fmt::format("{}.vcf", base64::encode(contactId));
618 : }
619 :
620 : std::vector<WaitingRequest>
621 1850 : TransferManager::waitingRequests() const
622 : {
623 1850 : std::vector<WaitingRequest> res;
624 1849 : std::lock_guard lk(pimpl_->mapMutex_);
625 1851 : for (const auto& [fileId, req] : pimpl_->waitingIds_) {
626 1 : auto itC = pimpl_->incomings_.find(fileId);
627 1 : if (itC == pimpl_->incomings_.end())
628 1 : res.emplace_back(req);
629 : }
630 3700 : return res;
631 1850 : }
632 :
633 : bool
634 4 : TransferManager::isWaiting(const std::string& fileId) const
635 : {
636 4 : std::lock_guard lk(pimpl_->mapMutex_);
637 8 : return pimpl_->waitingIds_.find(fileId) != pimpl_->waitingIds_.end();
638 4 : }
639 :
640 : } // namespace jami
|