LCOV - code coverage report
Current view: top level - foo/src/jamidht - sync_module.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 124 139 89.2 %
Date: 2026-02-28 10:41:24 Functions: 21 33 63.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2026 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "sync_module.h"
      19             : 
      20             : #include "jamidht/conversation_module.h"
      21             : #include "jamidht/archive_account_manager.h"
      22             : 
      23             : #include <dhtnet/multiplexed_socket.h>
      24             : #include <dhtnet/channel_utils.h>
      25             : #include <opendht/thread_pool.h>
      26             : 
      27             : namespace jami {
      28             : 
      29             : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      30             : {
      31             : public:
      32             :     Impl(const std::shared_ptr<JamiAccount>& account);
      33             : 
      34             :     std::weak_ptr<JamiAccount> account_;
      35             :     const std::string accountId_;
      36             : 
      37             :     // Sync connections
      38             :     std::recursive_mutex syncConnectionsMtx_;
      39             :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
      40             : 
      41             :     /**
      42             :      * Build SyncMsg and send it on socket
      43             :      * @param socket
      44             :      */
      45             :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::shared_ptr<SyncMsg>& syncMsg);
      46             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
      47             : };
      48             : 
      49         657 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
      50         657 :     : account_(account)
      51        1314 :     , accountId_ {account->getAccountID()}
      52         657 : {}
      53             : 
      54             : void
      55         245 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      56             :                             const std::shared_ptr<SyncMsg>& syncMsg)
      57             : {
      58         245 :     auto acc = account_.lock();
      59         245 :     if (!acc)
      60           0 :         return;
      61         245 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      62         245 :     std::error_code ec;
      63         245 :     if (!syncMsg) {
      64             :         // Send contacts infos
      65             :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      66             :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
      67         204 :         if (auto info = acc->accountManager()->getInfo()) {
      68         204 :             if (info->contacts) {
      69         204 :                 SyncMsg msg;
      70         204 :                 msg.ds = info->contacts->getSyncData();
      71         201 :                 msgpack::pack(buffer, msg);
      72         204 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      73         204 :                 if (ec) {
      74           8 :                     JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
      75           2 :                     return;
      76             :                 }
      77         204 :             }
      78             :         }
      79         202 :         buffer.clear();
      80             :         // Sync conversations
      81         202 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      82         202 :         if (!c.empty()) {
      83          93 :             SyncMsg msg;
      84          92 :             msg.c = std::move(c);
      85          93 :             msgpack::pack(buffer, msg);
      86          93 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      87          93 :             if (ec) {
      88           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
      89           0 :                 return;
      90             :             }
      91          93 :         }
      92         202 :         buffer.clear();
      93             :         // Sync requests
      94         202 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
      95         202 :         if (!cr.empty()) {
      96          23 :             SyncMsg msg;
      97          23 :             msg.cr = std::move(cr);
      98          23 :             msgpack::pack(buffer, msg);
      99          23 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     100          23 :             if (ec) {
     101           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     102           0 :                 return;
     103             :             }
     104          23 :         }
     105         202 :         buffer.clear();
     106         202 :         auto convModule = acc->convModule(true);
     107         202 :         if (!convModule)
     108           0 :             return;
     109             :         // Sync conversation's preferences
     110         202 :         auto p = convModule->convPreferences();
     111         202 :         if (!p.empty()) {
     112           2 :             SyncMsg msg;
     113           2 :             msg.p = std::move(p);
     114           2 :             msgpack::pack(buffer, msg);
     115           2 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     116           2 :             if (ec) {
     117           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     118           0 :                 return;
     119             :             }
     120           2 :         }
     121         202 :         buffer.clear();
     122             :         // Sync read's status
     123         202 :         auto ms = convModule->convMessageStatus();
     124         202 :         if (!ms.empty()) {
     125          30 :             SyncMsg msg;
     126          30 :             msg.ms = std::move(ms);
     127          30 :             msgpack::pack(buffer, msg);
     128          30 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     129          30 :             if (ec) {
     130           4 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     131           1 :                 return;
     132             :             }
     133          30 :         }
     134         201 :         buffer.clear();
     135             : 
     136         205 :     } else {
     137          41 :         msgpack::pack(buffer, *syncMsg);
     138          41 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     139          41 :         if (ec)
     140           0 :             JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     141             :     }
     142         248 : }
     143             : 
     144             : ////////////////////////////////////////////////////////////////
     145             : 
     146         657 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
     147         657 :     : pimpl_ {std::make_shared<Impl>(account)}
     148         657 : {}
     149             : 
     150             : void
     151         136 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
     152             : {
     153         136 :     std::lock_guard lk(syncConnectionsMtx_);
     154         136 :     auto connectionsIt = syncConnections_.find(device);
     155         136 :     if (connectionsIt == syncConnections_.end()) {
     156           0 :         JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.", accountId_, device.to_view());
     157           0 :         return;
     158             :     }
     159         136 :     auto& connections = connectionsIt->second;
     160         136 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     161         136 :     if (conn != connections.end())
     162         136 :         connections.erase(conn);
     163         544 :     JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
     164             :              accountId_,
     165             :              device.to_view(),
     166             :              connections.size());
     167         136 :     if (connections.empty())
     168          70 :         syncConnections_.erase(connectionsIt);
     169         136 : }
     170             : 
     171             : void
     172         136 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     173             :                                 const std::string& peerId,
     174             :                                 const DeviceId& device)
     175             : {
     176         136 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     177         136 :     pimpl_->syncConnections_[device].emplace_back(socket);
     178             : 
     179         135 :     socket->setOnRecv(dhtnet::buildMsgpackReader<SyncMsg>([acc = pimpl_->account_, device, peerId](SyncMsg&& msg) {
     180         382 :         auto account = acc.lock();
     181         382 :         if (!account)
     182           0 :             return std::make_error_code(std::errc::operation_canceled);
     183             : 
     184             :         try {
     185         382 :             if (auto manager = account->accountManager())
     186         382 :                 manager->onSyncData(std::move(msg.ds), false);
     187             : 
     188         382 :             if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
     189         162 :                 if (auto cm = account->convModule(true))
     190         162 :                     cm->onSyncData(msg, peerId, device.toString());
     191           0 :         } catch (const std::exception& e) {
     192           0 :             JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
     193             :                          account->getAccountID(),
     194             :                          device.to_view(),
     195             :                          e.what());
     196           0 :         }
     197         382 :         return std::error_code();
     198         382 :     }));
     199         136 :     socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)](const std::error_code&) {
     200         136 :         if (auto shared = w.lock())
     201         136 :             shared->onChannelShutdown(s.lock(), device);
     202         136 :     });
     203             : 
     204         136 :     dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket = std::move(socket)]() {
     205         136 :         if (auto s = w.lock())
     206         136 :             s->syncInfos(socket, nullptr);
     207         136 :     });
     208         136 : }
     209             : 
     210             : bool
     211          74 : SyncModule::isConnected(const DeviceId& deviceId) const
     212             : {
     213          74 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     214          74 :     auto it = pimpl_->syncConnections_.find(deviceId);
     215          74 :     if (it == pimpl_->syncConnections_.end())
     216          72 :         return false;
     217           2 :     return !it->second.empty();
     218          74 : }
     219             : 
     220             : void
     221        2378 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     222             : {
     223        2378 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     224        2378 :     size_t count = 0;
     225        2487 :     for (const auto& [did, sockets] : pimpl_->syncConnections_) {
     226         109 :         if (not sockets.empty() and (!deviceId || deviceId == did)) {
     227         108 :             count++;
     228         108 :             dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
     229         109 :                 if (auto sthis = w.lock())
     230         109 :                     sthis->syncInfos(s, syncMsg);
     231         109 :             });
     232             :         }
     233             :     }
     234        2378 :     if (count == 0) {
     235        9076 :         JAMI_WARNING("[Account {}] [device {}] no sync connection.", pimpl_->accountId_, deviceId.toString());
     236             :     } else {
     237         436 :         JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices", pimpl_->accountId_, deviceId.to_view(), count);
     238             :     }
     239        2378 : }
     240             : 
     241             : } // namespace jami

Generated by: LCOV version 1.14