LCOV - code coverage report
Current view: top level - src/jamidht - sync_module.cpp (source / functions) Coverage Total Hit
Test: jami-coverage-filtered.info Lines: 88.5 % 139 123
Test Date: 2026-06-13 09:18:46 Functions: 63.6 % 33 21

            Line data    Source code
       1              : /*
       2              :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3              :  *
       4              :  *  This program is free software: you can redistribute it and/or modify
       5              :  *  it under the terms of the GNU General Public License as published by
       6              :  *  the Free Software Foundation, either version 3 of the License, or
       7              :  *  (at your option) any later version.
       8              :  *
       9              :  *  This program is distributed in the hope that it will be useful,
      10              :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12              :  *  GNU General Public License for more details.
      13              :  *
      14              :  *  You should have received a copy of the GNU General Public License
      15              :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16              :  */
      17              : 
      18              : #include "sync_module.h"
      19              : 
      20              : #include "jamidht/conversation_module.h"
      21              : #include "jamidht/archive_account_manager.h"
      22              : 
      23              : #include <dhtnet/multiplexed_socket.h>
      24              : #include <dhtnet/channel_utils.h>
      25              : #include <opendht/thread_pool.h>
      26              : 
      27              : namespace jami {
      28              : 
      29              : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      30              : {
      31              : public:
      32              :     Impl(const std::shared_ptr<JamiAccount>& account);
      33              : 
      34              :     std::weak_ptr<JamiAccount> account_;
      35              :     const std::string accountId_;
      36              : 
      37              :     // Sync connections
      38              :     std::recursive_mutex syncConnectionsMtx_;
      39              :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
      40              : 
      41              :     /**
      42              :      * Build SyncMsg and send it on socket
      43              :      * @param socket
      44              :      */
      45              :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::shared_ptr<SyncMsg>& syncMsg);
      46              :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
      47              : };
      48              : 
      49          658 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
      50          658 :     : account_(account)
      51         1316 :     , accountId_ {account->getAccountID()}
      52          658 : {}
      53              : 
      54              : void
      55          242 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      56              :                             const std::shared_ptr<SyncMsg>& syncMsg)
      57              : {
      58          242 :     auto acc = account_.lock();
      59          242 :     if (!acc)
      60            0 :         return;
      61          242 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      62          242 :     std::error_code ec;
      63          242 :     if (!syncMsg) {
      64              :         // Send contacts infos
      65              :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      66              :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
      67          202 :         if (auto info = acc->accountManager()->getInfo()) {
      68          202 :             if (info->contacts) {
      69          202 :                 SyncMsg msg;
      70          202 :                 msg.ds = info->contacts->getSyncData();
      71          202 :                 msgpack::pack(buffer, msg);
      72          201 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      73          202 :                 if (ec) {
      74            4 :                     JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
      75            1 :                     return;
      76              :                 }
      77          202 :             }
      78              :         }
      79          201 :         buffer.clear();
      80              :         // Sync conversations
      81          201 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      82          201 :         if (!c.empty()) {
      83           97 :             SyncMsg msg;
      84           97 :             msg.c = std::move(c);
      85           97 :             msgpack::pack(buffer, msg);
      86           97 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      87           97 :             if (ec) {
      88            0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
      89            0 :                 return;
      90              :             }
      91           97 :         }
      92          201 :         buffer.clear();
      93              :         // Sync requests
      94          201 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
      95          201 :         if (!cr.empty()) {
      96           21 :             SyncMsg msg;
      97           21 :             msg.cr = std::move(cr);
      98           21 :             msgpack::pack(buffer, msg);
      99           21 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     100           21 :             if (ec) {
     101            0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     102            0 :                 return;
     103              :             }
     104           21 :         }
     105          201 :         buffer.clear();
     106          201 :         auto convModule = acc->convModule(true);
     107          201 :         if (!convModule)
     108            0 :             return;
     109              :         // Sync conversation's preferences
     110          201 :         auto p = convModule->convPreferences();
     111          201 :         if (!p.empty()) {
     112            3 :             SyncMsg msg;
     113            3 :             msg.p = std::move(p);
     114            3 :             msgpack::pack(buffer, msg);
     115            3 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     116            3 :             if (ec) {
     117            0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     118            0 :                 return;
     119              :             }
     120            3 :         }
     121          200 :         buffer.clear();
     122              :         // Sync read's status
     123          200 :         auto ms = convModule->convMessageStatus();
     124          201 :         if (!ms.empty()) {
     125           33 :             SyncMsg msg;
     126           33 :             msg.ms = std::move(ms);
     127           33 :             msgpack::pack(buffer, msg);
     128           33 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     129           33 :             if (ec) {
     130            4 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     131            1 :                 return;
     132              :             }
     133           33 :         }
     134          200 :         buffer.clear();
     135              : 
     136          204 :     } else {
     137           40 :         msgpack::pack(buffer, *syncMsg);
     138           40 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     139           40 :         if (ec)
     140            0 :             JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     141              :     }
     142          244 : }
     143              : 
     144              : ////////////////////////////////////////////////////////////////
     145              : 
     146          658 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
     147          658 :     : pimpl_ {std::make_shared<Impl>(account)}
     148          658 : {}
     149              : 
     150              : void
     151          135 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
     152              : {
     153          135 :     std::lock_guard lk(syncConnectionsMtx_);
     154          135 :     auto connectionsIt = syncConnections_.find(device);
     155          135 :     if (connectionsIt == syncConnections_.end()) {
     156            0 :         JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.", accountId_, device.to_view());
     157            0 :         return;
     158              :     }
     159          135 :     auto& connections = connectionsIt->second;
     160          135 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     161          135 :     if (conn != connections.end())
     162          135 :         connections.erase(conn);
     163          540 :     JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
     164              :              accountId_,
     165              :              device.to_view(),
     166              :              connections.size());
     167          135 :     if (connections.empty())
     168           68 :         syncConnections_.erase(connectionsIt);
     169          135 : }
     170              : 
     171              : void
     172          135 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     173              :                                 const std::string& peerId,
     174              :                                 const DeviceId& device)
     175              : {
     176          135 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     177          135 :     pimpl_->syncConnections_[device].emplace_back(socket);
     178              : 
     179          135 :     socket->setOnRecv(dhtnet::buildMsgpackReader<SyncMsg>([acc = pimpl_->account_, device, peerId](SyncMsg&& msg) {
     180          386 :         auto account = acc.lock();
     181          386 :         if (!account)
     182            0 :             return std::make_error_code(std::errc::operation_canceled);
     183              : 
     184              :         try {
     185          386 :             if (auto manager = account->accountManager())
     186          386 :                 manager->onSyncData(std::move(msg.ds), false);
     187              : 
     188          386 :             if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
     189          169 :                 if (auto cm = account->convModule(true))
     190          169 :                     cm->onSyncData(msg, peerId, device.toString());
     191            0 :         } catch (const std::exception& e) {
     192            0 :             JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
     193              :                          account->getAccountID(),
     194              :                          device.to_view(),
     195              :                          e.what());
     196            0 :         }
     197          386 :         return std::error_code();
     198          386 :     }));
     199          135 :     socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)](const std::error_code&) {
     200          135 :         if (auto shared = w.lock())
     201          135 :             shared->onChannelShutdown(s.lock(), device);
     202          135 :     });
     203              : 
     204          135 :     dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket = std::move(socket)]() {
     205          135 :         if (auto s = w.lock())
     206          135 :             s->syncInfos(socket, nullptr);
     207          135 :     });
     208          135 : }
     209              : 
     210              : bool
     211           68 : SyncModule::isConnected(const DeviceId& deviceId) const
     212              : {
     213           68 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     214           68 :     auto it = pimpl_->syncConnections_.find(deviceId);
     215           68 :     if (it == pimpl_->syncConnections_.end())
     216           68 :         return false;
     217            0 :     return !it->second.empty();
     218           68 : }
     219              : 
     220              : void
     221         2380 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     222              : {
     223         2380 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     224         2380 :     size_t count = 0;
     225         2487 :     for (const auto& [did, sockets] : pimpl_->syncConnections_) {
     226          107 :         if (not sockets.empty() and (!deviceId || deviceId == did)) {
     227          107 :             count++;
     228          107 :             dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
     229          107 :                 if (auto sthis = w.lock())
     230          107 :                     sthis->syncInfos(s, syncMsg);
     231          107 :             });
     232              :         }
     233              :     }
     234         2380 :     if (count == 0) {
     235         9092 :         JAMI_WARNING("[Account {}] [device {}] no sync connection.", pimpl_->accountId_, deviceId.toString());
     236              :     } else {
     237          428 :         JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices", pimpl_->accountId_, deviceId.to_view(), count);
     238              :     }
     239         2380 : }
     240              : 
     241              : } // namespace jami
        

Generated by: LCOV version 2.0-1