LCOV - code coverage report
Current view: top level - foo/src/jamidht - sync_module.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 125 148 84.5 %
Date: 2025-08-24 09:11:10 Functions: 20 34 58.8 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "sync_module.h"
      19             : 
      20             : #include "jamidht/conversation_module.h"
      21             : #include "jamidht/archive_account_manager.h"
      22             : 
      23             : #include <dhtnet/multiplexed_socket.h>
      24             : #include <opendht/thread_pool.h>
      25             : 
      26             : namespace jami {
      27             : 
      28             : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      29             : {
      30             : public:
      31             :     Impl(const std::shared_ptr<JamiAccount>& account);
      32             : 
      33             :     std::weak_ptr<JamiAccount> account_;
      34             :     const std::string accountId_;
      35             : 
      36             :     // Sync connections
      37             :     std::recursive_mutex syncConnectionsMtx_;
      38             :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
      39             :         syncConnections_;
      40             : 
      41             :     /**
      42             :      * Build SyncMsg and send it on socket
      43             :      * @param socket
      44             :      */
      45             :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      46             :                    const std::shared_ptr<SyncMsg>& syncMsg);
      47             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      48             :                            const DeviceId& device);
      49             : };
      50             : 
      51         559 : SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
      52         559 :     : account_(account)
      53        1118 :     , accountId_ {account->getAccountID()}
      54         559 : {}
      55             : 
      56             : void
      57         200 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      58             :                             const std::shared_ptr<SyncMsg>& syncMsg)
      59             : {
      60         200 :     auto acc = account_.lock();
      61         200 :     if (!acc)
      62           0 :         return;
      63         200 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      64         200 :     std::error_code ec;
      65         200 :     if (!syncMsg) {
      66             :         // Send contacts infos
      67             :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      68             :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
      69         156 :         if (auto info = acc->accountManager()->getInfo()) {
      70         156 :             if (info->contacts) {
      71         156 :                 SyncMsg msg;
      72         156 :                 msg.ds = info->contacts->getSyncData();
      73         156 :                 msgpack::pack(buffer, msg);
      74         156 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
      75             :                               buffer.size(),
      76             :                               ec);
      77         156 :                 if (ec) {
      78           0 :                     JAMI_ERROR("[Account {}] [device {}] {:s}",
      79             :                                accountId_,
      80             :                                socket->deviceId(),
      81             :                                ec.message());
      82           0 :                     return;
      83             :                 }
      84         156 :             }
      85             :         }
      86         156 :         buffer.clear();
      87             :         // Sync conversations
      88         156 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      89         156 :         if (!c.empty()) {
      90          66 :             SyncMsg msg;
      91          66 :             msg.c = std::move(c);
      92          66 :             msgpack::pack(buffer, msg);
      93          66 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      94          66 :             if (ec) {
      95           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
      96             :                            accountId_,
      97             :                            socket->deviceId(),
      98             :                            ec.message());
      99           0 :                 return;
     100             :             }
     101          66 :         }
     102         156 :         buffer.clear();
     103             :         // Sync requests
     104         156 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
     105         156 :         if (!cr.empty()) {
     106          19 :             SyncMsg msg;
     107          19 :             msg.cr = std::move(cr);
     108          19 :             msgpack::pack(buffer, msg);
     109          19 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     110          19 :             if (ec) {
     111           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
     112             :                            accountId_,
     113             :                            socket->deviceId(),
     114             :                            ec.message());
     115           0 :                 return;
     116             :             }
     117          19 :         }
     118         156 :         buffer.clear();
     119         156 :         auto convModule = acc->convModule(true);
     120         156 :         if (!convModule)
     121           0 :             return;
     122             :         // Sync conversation's preferences
     123         156 :         auto p = convModule->convPreferences();
     124         156 :         if (!p.empty()) {
     125           0 :             SyncMsg msg;
     126           0 :             msg.p = std::move(p);
     127           0 :             msgpack::pack(buffer, msg);
     128           0 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     129           0 :             if (ec) {
     130           0 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
     131             :                            accountId_,
     132             :                            socket->deviceId(),
     133             :                            ec.message());
     134           0 :                 return;
     135             :             }
     136           0 :         }
     137         156 :         buffer.clear();
     138             :         // Sync read's status
     139         156 :         auto ms = convModule->convMessageStatus();
     140         156 :         if (!ms.empty()) {
     141          41 :             SyncMsg msg;
     142          41 :             msg.ms = std::move(ms);
     143          41 :             msgpack::pack(buffer, msg);
     144          41 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     145          41 :             if (ec) {
     146           3 :                 JAMI_ERROR("[Account {}] [device {}] {:s}",
     147             :                            accountId_,
     148             :                            socket->deviceId(),
     149             :                            ec.message());
     150           1 :                 return;
     151             :             }
     152          41 :         }
     153         155 :         buffer.clear();
     154             : 
     155         159 :     } else {
     156          44 :         msgpack::pack(buffer, *syncMsg);
     157          44 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     158          44 :         if (ec)
     159           0 :             JAMI_ERROR("[Account {}] [device {}] {:s}",
     160             :                        accountId_,
     161             :                        socket->deviceId(),
     162             :                        ec.message());
     163             :     }
     164         201 : }
     165             : 
     166             : ////////////////////////////////////////////////////////////////
     167             : 
     168         559 : SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
     169         559 :     : pimpl_ {std::make_shared<Impl>(account)}
     170         559 : {}
     171             : 
     172             : void
     173         108 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
     174             :                                     const DeviceId& device)
     175             : {
     176         108 :     std::lock_guard lk(syncConnectionsMtx_);
     177         108 :     auto connectionsIt = syncConnections_.find(device);
     178         108 :     if (connectionsIt == syncConnections_.end()) {
     179           0 :         JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.",
     180             :                      accountId_,
     181             :                      device.to_view());
     182           0 :         return;
     183             :     }
     184         108 :     auto& connections = connectionsIt->second;
     185         108 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     186         108 :     if (conn != connections.end())
     187         108 :         connections.erase(conn);
     188         324 :     JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
     189             :              accountId_,
     190             :              device.to_view(),
     191             :              connections.size());
     192         108 :     if (connections.empty())
     193          56 :         syncConnections_.erase(connectionsIt);
     194         108 : }
     195             : 
     196             : void
     197         108 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     198             :                                 const std::string& peerId,
     199             :                                 const DeviceId& device)
     200             : {
     201         108 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     202         108 :     pimpl_->syncConnections_[device].emplace_back(socket);
     203             : 
     204         108 :     socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
     205         108 :         if (auto shared = w.lock())
     206         108 :             shared->onChannelShutdown(s.lock(), device);
     207         108 :     });
     208             : 
     209             :     struct DecodingContext
     210             :     {
     211        5784 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     212             :                                nullptr,
     213             :                                512};
     214             :     };
     215             : 
     216         108 :     socket->setOnRecv([acc = pimpl_->account_.lock(),
     217             :                        device,
     218             :                        peerId,
     219             :                        ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
     220         309 :         if (!buf || !acc)
     221           0 :             return len;
     222             : 
     223         309 :         ctx->pac.reserve_buffer(len);
     224         309 :         std::copy_n(buf, len, ctx->pac.buffer());
     225         309 :         ctx->pac.buffer_consumed(len);
     226             : 
     227         309 :         msgpack::object_handle oh;
     228             :         try {
     229         618 :             while (ctx->pac.next(oh)) {
     230         309 :                 SyncMsg msg;
     231         309 :                 oh.get().convert(msg);
     232         309 :                 if (auto manager = acc->accountManager())
     233         309 :                     manager->onSyncData(std::move(msg.ds), false);
     234             : 
     235         560 :                 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty()
     236         560 :                     || !msg.ms.empty())
     237         127 :                     if (auto cm = acc->convModule(true))
     238         127 :                         cm->onSyncData(msg, peerId, device.toString());
     239         309 :             }
     240           0 :         } catch (const std::exception& e) {
     241           0 :             JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
     242             :                          acc->getAccountID(),
     243             :                          device.to_view(),
     244             :                          e.what());
     245           0 :         }
     246             : 
     247         309 :         return len;
     248         309 :     });
     249             : 
     250         108 :     dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket]() {
     251         108 :         if (auto s = w.lock())
     252         108 :             s->syncInfos(socket, nullptr);
     253         108 :     });
     254         108 : }
     255             : 
     256             : bool
     257         111 : SyncModule::isConnected(const DeviceId& deviceId) const
     258             : {
     259         111 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     260         111 :     auto it = pimpl_->syncConnections_.find(deviceId);
     261         111 :     if (it == pimpl_->syncConnections_.end())
     262         104 :         return false;
     263           7 :     return !it->second.empty();
     264         111 : }
     265             : 
     266             : void
     267        2610 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     268             : {
     269        2610 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     270        2610 :     size_t count = 0;
     271        2702 :     for (const auto& [did, sockets] : pimpl_->syncConnections_) {
     272          92 :         if (not sockets.empty() and (!deviceId || deviceId == did)) {
     273          92 :             count++;
     274          92 :             dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
     275          92 :                 if (auto sthis = w.lock())
     276          92 :                     sthis->syncInfos(s, syncMsg);
     277          92 :             });
     278             :         }
     279             :     }
     280        2610 :     if (count == 0) {
     281        7554 :         JAMI_WARNING("[Account {}] [device {}] no sync connection.",
     282             :                      pimpl_->accountId_,
     283             :                      deviceId.toString());
     284             :     } else {
     285         276 :         JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices",
     286             :                    pimpl_->accountId_,
     287             :                    deviceId.to_view(),
     288             :                    count);
     289             :     }
     290        2610 : }
     291             : 
     292             : } // namespace jami

Generated by: LCOV version 1.14