LCOV - code coverage report
Current view: top level - foo/src/jamidht - sync_module.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 133 148 89.9 %
Date: 2025-10-16 08:11:43 Functions: 22 34 64.7 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "sync_module.h"
      19             : 
      20             : #include "jamidht/conversation_module.h"
      21             : #include "jamidht/archive_account_manager.h"
      22             : 
      23             : #include <dhtnet/multiplexed_socket.h>
      24             : #include <opendht/thread_pool.h>
      25             : 
      26             : namespace jami {
      27             : 
      28             : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      29             : {
      30             : public:
      31             :     Impl(const std::shared_ptr<JamiAccount>& account);
      32             : 
      33             :     std::weak_ptr<JamiAccount> account_;
      34             :     const std::string accountId_;
      35             : 
      36             :     // Sync connections
      37             :     std::recursive_mutex syncConnectionsMtx_;
      38             :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
      39             :         syncConnections_;
      40             : 
      41             :     /**
      42             :      * Build SyncMsg and send it on socket
      43             :      * @param socket
      44             :      */
      45             :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      46             :                    const std::shared_ptr<SyncMsg>& syncMsg);
      47             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      48             :                            const DeviceId& device);
      49             : };
      50             : 
      51         686 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
      52         686 :     : account_(account)
      53        1372 :     , accountId_ {account->getAccountID()}
      54         686 : {}
      55             : 
      56             : void
      57       17905 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      58             :                             const std::shared_ptr<SyncMsg>& syncMsg)
      59             : {
      60       17905 :     auto acc = account_.lock();
      61       17905 :     if (!acc)
      62           0 :         return;
      63       17905 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      64       17905 :     std::error_code ec;
      65       17905 :     if (!syncMsg) {
      66             :         // Send contacts infos
      67             :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      68             :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
      69         195 :         if (auto info = acc->accountManager()->getInfo()) {
      70         195 :             if (info->contacts) {
      71         195 :                 SyncMsg msg;
      72         195 :                 msg.ds = info->contacts->getSyncData();
      73         195 :                 msgpack::pack(buffer, msg);
      74         195 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
      75             :                               buffer.size(),
      76             :                               ec);
      77         195 :                 if (ec) {
      78           3 :                     JAMI_ERROR("[Account {}] [device {}] {:s}",
      79             :                                accountId_,
      80             :                                socket->deviceId(),
      81             :                                ec.message());
      82           1 :                     return;
      83             :                 }
      84         195 :             }
      85             :         }
      86         194 :         buffer.clear();
      87             :         // Sync conversations
      88         194 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      89         194 :         if (!c.empty()) {
      90          88 :             SyncMsg msg;
      91          88 :             msg.c = std::move(c);
      92          88 :             msgpack::pack(buffer, msg);
      93          88 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      94          88 :             if (ec) {
      95           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
      96             :                            accountId_,
      97             :                            socket->deviceId(),
      98             :                            ec.message());
      99           0 :                 return;
     100             :             }
     101          88 :         }
     102         194 :         buffer.clear();
     103             :         // Sync requests
     104         194 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
     105         194 :         if (!cr.empty()) {
     106          22 :             SyncMsg msg;
     107          22 :             msg.cr = std::move(cr);
     108          22 :             msgpack::pack(buffer, msg);
     109          22 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     110          22 :             if (ec) {
     111           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
     112             :                            accountId_,
     113             :                            socket->deviceId(),
     114             :                            ec.message());
     115           0 :                 return;
     116             :             }
     117          22 :         }
     118         194 :         buffer.clear();
     119         194 :         auto convModule = acc->convModule(true);
     120         194 :         if (!convModule)
     121           0 :             return;
     122             :         // Sync conversation's preferences
     123         194 :         auto p = convModule->convPreferences();
     124         194 :         if (!p.empty()) {
     125           3 :             SyncMsg msg;
     126           3 :             msg.p = std::move(p);
     127           3 :             msgpack::pack(buffer, msg);
     128           3 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     129           3 :             if (ec) {
     130           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
     131             :                            accountId_,
     132             :                            socket->deviceId(),
     133             :                            ec.message());
     134           0 :                 return;
     135             :             }
     136           3 :         }
     137         194 :         buffer.clear();
     138             :         // Sync read's status
     139         194 :         auto ms = convModule->convMessageStatus();
     140         194 :         if (!ms.empty()) {
     141          50 :             SyncMsg msg;
     142          50 :             msg.ms = std::move(ms);
     143          50 :             msgpack::pack(buffer, msg);
     144          50 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     145          50 :             if (ec) {
     146           6 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
     147             :                            accountId_,
     148             :                            socket->deviceId(),
     149             :                            ec.message());
     150           2 :                 return;
     151             :             }
     152          50 :         }
     153         192 :         buffer.clear();
     154             : 
     155         200 :     } else {
     156       17710 :         msgpack::pack(buffer, *syncMsg);
     157       17710 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     158       17710 :         if (ec)
     159           0 :             JAMI_ERROR("[Account {}] [device {}] {:s}",
     160             :                        accountId_,
     161             :                        socket->deviceId(),
     162             :                        ec.message());
     163             :     }
     164       17908 : }
     165             : 
     166             : ////////////////////////////////////////////////////////////////
     167             : 
     168         686 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
     169         686 :     : pimpl_ {std::make_shared<Impl>(account)}
     170         686 : {}
     171             : 
     172             : void
     173         132 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
     174             :                                     const DeviceId& device)
     175             : {
     176         132 :     std::lock_guard lk(syncConnectionsMtx_);
     177         132 :     auto connectionsIt = syncConnections_.find(device);
     178         132 :     if (connectionsIt == syncConnections_.end()) {
     179           0 :         JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.",
     180             :                      accountId_,
     181             :                      device.to_view());
     182           0 :         return;
     183             :     }
     184         132 :     auto& connections = connectionsIt->second;
     185         132 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     186         132 :     if (conn != connections.end())
     187         132 :         connections.erase(conn);
     188         396 :     JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
     189             :              accountId_,
     190             :              device.to_view(),
     191             :              connections.size());
     192         132 :     if (connections.empty())
     193          70 :         syncConnections_.erase(connectionsIt);
     194         132 : }
     195             : 
     196             : void
     197         132 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     198             :                                 const std::string& peerId,
     199             :                                 const DeviceId& device)
     200             : {
     201         132 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     202         132 :     pimpl_->syncConnections_[device].emplace_back(socket);
     203             : 
     204         132 :     socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
     205         132 :         if (auto shared = w.lock())
     206         132 :             shared->onChannelShutdown(s.lock(), device);
     207         132 :     });
     208             : 
     209             :     struct DecodingContext
     210             :     {
     211      431083 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     212             :                                nullptr,
     213             :                                512};
     214             :     };
     215             : 
     216         132 :     socket->setOnRecv([acc = pimpl_->account_.lock(),
     217             :                        device,
     218             :                        peerId,
     219             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     220       18050 :         if (!buf || !acc)
     221           0 :             return len;
     222             : 
     223       18050 :         ctx->pac.reserve_buffer(len);
     224       18050 :         std::copy_n(buf, len, ctx->pac.buffer());
     225       18050 :         ctx->pac.buffer_consumed(len);
     226             : 
     227       18050 :         msgpack::object_handle oh;
     228             :         try {
     229       36100 :             while (ctx->pac.next(oh)) {
     230       18050 :                 SyncMsg msg;
     231       18050 :                 oh.get().convert(msg);
     232       18050 :                 if (auto manager = acc->accountManager())
     233       18050 :                     manager->onSyncData(std::move(msg.ds), false);
     234             : 
     235       36019 :                 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty()
     236       36019 :                     || !msg.ms.empty())
     237         170 :                     if (auto cm = acc->convModule(true))
     238         170 :                         cm->onSyncData(msg, peerId, device.toString());
     239       18050 :             }
     240           0 :         } catch (const std::exception& e) {
     241           0 :             JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
     242             :                          acc->getAccountID(),
     243             :                          device.to_view(),
     244             :                          e.what());
     245           0 :         }
     246             : 
     247       18050 :         return len;
     248       18050 :     });
     249             : 
     250         132 :     dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket]() {
     251         132 :         if (auto s = w.lock())
     252         132 :             s->syncInfos(socket, nullptr);
     253         132 :     });
     254         132 : }
     255             : 
     256             : bool
     257          71 : SyncModule::isConnected(const DeviceId& deviceId) const
     258             : {
     259          71 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     260          71 :     auto it = pimpl_->syncConnections_.find(deviceId);
     261          71 :     if (it == pimpl_->syncConnections_.end())
     262          70 :         return false;
     263           1 :     return !it->second.empty();
     264          71 : }
     265             : 
     266             : void
     267       20073 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     268             : {
     269       20073 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     270       20073 :     size_t count = 0;
     271       37846 :     for (const auto& [did, sockets] : pimpl_->syncConnections_) {
     272       17773 :         if (not sockets.empty() and (!deviceId || deviceId == did)) {
     273       17773 :             count++;
     274       17773 :             dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
     275       17773 :                 if (auto sthis = w.lock())
     276       17773 :                     sthis->syncInfos(s, syncMsg);
     277       17773 :             });
     278             :         }
     279             :     }
     280       20073 :     if (count == 0) {
     281        6900 :         JAMI_WARNING("[Account {}] [device {}] no sync connection.",
     282             :                      pimpl_->accountId_,
     283             :                      deviceId.toString());
     284             :     } else {
     285       53319 :         JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices",
     286             :                    pimpl_->accountId_,
     287             :                    deviceId.to_view(),
     288             :                    count);
     289             :     }
     290       20073 : }
     291             : 
     292             : } // namespace jami

Generated by: LCOV version 1.14