LCOV - code coverage report
Current view: top level - foo/src/jamidht - sync_module.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 124 139 89.2 %
Date: 2025-12-18 10:07:43 Functions: 21 33 63.6 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "sync_module.h"
      19             : 
      20             : #include "jamidht/conversation_module.h"
      21             : #include "jamidht/archive_account_manager.h"
      22             : 
      23             : #include <dhtnet/multiplexed_socket.h>
      24             : #include <dhtnet/channel_utils.h>
      25             : #include <opendht/thread_pool.h>
      26             : 
      27             : namespace jami {
      28             : 
      29             : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      30             : {
      31             : public:
      32             :     Impl(const std::shared_ptr<JamiAccount>& account);
      33             : 
      34             :     std::weak_ptr<JamiAccount> account_;
      35             :     const std::string accountId_;
      36             : 
      37             :     // Sync connections
      38             :     std::recursive_mutex syncConnectionsMtx_;
      39             :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
      40             : 
      41             :     /**
      42             :      * Build SyncMsg and send it on socket
      43             :      * @param socket
      44             :      */
      45             :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::shared_ptr<SyncMsg>& syncMsg);
      46             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
      47             : };
      48             : 
      49         657 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
      50         657 :     : account_(account)
      51        1314 :     , accountId_ {account->getAccountID()}
      52         657 : {}
      53             : 
      54             : void
      55         231 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      56             :                             const std::shared_ptr<SyncMsg>& syncMsg)
      57             : {
      58         231 :     auto acc = account_.lock();
      59         231 :     if (!acc)
      60           0 :         return;
      61         231 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      62         231 :     std::error_code ec;
      63         231 :     if (!syncMsg) {
      64             :         // Send contacts infos
      65             :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      66             :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
      67         188 :         if (auto info = acc->accountManager()->getInfo()) {
      68         188 :             if (info->contacts) {
      69         188 :                 SyncMsg msg;
      70         187 :                 msg.ds = info->contacts->getSyncData();
      71         188 :                 msgpack::pack(buffer, msg);
      72         188 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      73         188 :                 if (ec) {
      74          12 :                     JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
      75           3 :                     return;
      76             :                 }
      77         188 :             }
      78             :         }
      79         185 :         buffer.clear();
      80             :         // Sync conversations
      81         185 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      82         185 :         if (!c.empty()) {
      83          88 :             SyncMsg msg;
      84          88 :             msg.c = std::move(c);
      85          88 :             msgpack::pack(buffer, msg);
      86          88 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      87          88 :             if (ec) {
      88           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
      89           0 :                 return;
      90             :             }
      91          88 :         }
      92         185 :         buffer.clear();
      93             :         // Sync requests
      94         185 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
      95         185 :         if (!cr.empty()) {
      96          19 :             SyncMsg msg;
      97          19 :             msg.cr = std::move(cr);
      98          19 :             msgpack::pack(buffer, msg);
      99          19 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     100          19 :             if (ec) {
     101           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     102           0 :                 return;
     103             :             }
     104          19 :         }
     105         185 :         buffer.clear();
     106         185 :         auto convModule = acc->convModule(true);
     107         185 :         if (!convModule)
     108           0 :             return;
     109             :         // Sync conversation's preferences
     110         185 :         auto p = convModule->convPreferences();
     111         185 :         if (!p.empty()) {
     112           2 :             SyncMsg msg;
     113           2 :             msg.p = std::move(p);
     114           2 :             msgpack::pack(buffer, msg);
     115           2 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     116           2 :             if (ec) {
     117           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     118           0 :                 return;
     119             :             }
     120           2 :         }
     121         185 :         buffer.clear();
     122             :         // Sync read's status
     123         184 :         auto ms = convModule->convMessageStatus();
     124         184 :         if (!ms.empty()) {
     125          29 :             SyncMsg msg;
     126          29 :             msg.ms = std::move(ms);
     127          29 :             msgpack::pack(buffer, msg);
     128          29 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     129          29 :             if (ec) {
     130           8 :                 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     131           2 :                 return;
     132             :             }
     133          29 :         }
     134         182 :         buffer.clear();
     135             : 
     136         191 :     } else {
     137          43 :         msgpack::pack(buffer, *syncMsg);
     138          43 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     139          43 :         if (ec)
     140           0 :             JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
     141             :     }
     142         236 : }
     143             : 
     144             : ////////////////////////////////////////////////////////////////
     145             : 
     146         657 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
     147         657 :     : pimpl_ {std::make_shared<Impl>(account)}
     148         657 : {}
     149             : 
     150             : void
     151         128 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
     152             : {
     153         128 :     std::lock_guard lk(syncConnectionsMtx_);
     154         128 :     auto connectionsIt = syncConnections_.find(device);
     155         128 :     if (connectionsIt == syncConnections_.end()) {
     156           0 :         JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.", accountId_, device.to_view());
     157           0 :         return;
     158             :     }
     159         128 :     auto& connections = connectionsIt->second;
     160         128 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     161         128 :     if (conn != connections.end())
     162         128 :         connections.erase(conn);
     163         512 :     JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
     164             :              accountId_,
     165             :              device.to_view(),
     166             :              connections.size());
     167         128 :     if (connections.empty())
     168          70 :         syncConnections_.erase(connectionsIt);
     169         128 : }
     170             : 
     171             : void
     172         128 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     173             :                                 const std::string& peerId,
     174             :                                 const DeviceId& device)
     175             : {
     176         128 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     177         128 :     pimpl_->syncConnections_[device].emplace_back(socket);
     178             : 
     179         128 :     socket->setOnRecv(dhtnet::buildMsgpackReader<SyncMsg>([acc = pimpl_->account_, device, peerId](SyncMsg&& msg) {
     180         357 :         auto account = acc.lock();
     181         357 :         if (!account)
     182           0 :             return std::make_error_code(std::errc::operation_canceled);
     183             : 
     184             :         try {
     185         357 :             if (auto manager = account->accountManager())
     186         357 :                 manager->onSyncData(std::move(msg.ds), false);
     187             : 
     188         357 :             if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
     189         154 :                 if (auto cm = account->convModule(true))
     190         154 :                     cm->onSyncData(msg, peerId, device.toString());
     191           0 :         } catch (const std::exception& e) {
     192           0 :             JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
     193             :                          account->getAccountID(),
     194             :                          device.to_view(),
     195             :                          e.what());
     196           0 :         }
     197         357 :         return std::error_code();
     198         357 :     }));
     199         128 :     socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)](const std::error_code&) {
     200         128 :         if (auto shared = w.lock())
     201         128 :             shared->onChannelShutdown(s.lock(), device);
     202         128 :     });
     203             : 
     204         127 :     dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket = std::move(socket)]() {
     205         128 :         if (auto s = w.lock())
     206         128 :             s->syncInfos(socket, nullptr);
     207         128 :     });
     208         128 : }
     209             : 
     210             : bool
     211          72 : SyncModule::isConnected(const DeviceId& deviceId) const
     212             : {
     213          72 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     214          72 :     auto it = pimpl_->syncConnections_.find(deviceId);
     215          72 :     if (it == pimpl_->syncConnections_.end())
     216          68 :         return false;
     217           4 :     return !it->second.empty();
     218          72 : }
     219             : 
     220             : void
     221        2368 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     222             : {
     223        2368 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     224        2368 :     size_t count = 0;
     225        2471 :     for (const auto& [did, sockets] : pimpl_->syncConnections_) {
     226         103 :         if (not sockets.empty() and (!deviceId || deviceId == did)) {
     227         103 :             count++;
     228         103 :             dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
     229         103 :                 if (auto sthis = w.lock())
     230         103 :                     sthis->syncInfos(s, syncMsg);
     231         103 :             });
     232             :         }
     233             :     }
     234        2368 :     if (count == 0) {
     235        9060 :         JAMI_WARNING("[Account {}] [device {}] no sync connection.", pimpl_->accountId_, deviceId.toString());
     236             :     } else {
     237         412 :         JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices", pimpl_->accountId_, deviceId.to_view(), count);
     238             :     }
     239        2368 : }
     240             : 
     241             : } // namespace jami

Generated by: LCOV version 1.14