LCOV - code coverage report
Current view: top level - src/jamidht - sync_module.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 117 133 88.0 %
Date: 2024-11-13 09:04:27 Functions: 12 24 50.0 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "sync_module.h"
      19             : 
      20             : #include "jamidht/conversation_module.h"
      21             : #include "jamidht/archive_account_manager.h"
      22             : #include <dhtnet/multiplexed_socket.h>
      23             : 
      24             : namespace jami {
      25             : 
      26             : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      27             : {
      28             : public:
      29             :     Impl(std::weak_ptr<JamiAccount>&& account);
      30             : 
      31             :     std::weak_ptr<JamiAccount> account_;
      32             : 
      33             :     // Sync connections
      34             :     std::recursive_mutex syncConnectionsMtx_;
      35             :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
      36             : 
      37             :     /**
      38             :      * Build SyncMsg and send it on socket
      39             :      * @param socket
      40             :      */
      41             :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      42             :                    const std::shared_ptr<SyncMsg>& syncMsg);
      43             :     void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
      44             : };
      45             : 
      46         383 : SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
      47         383 :     : account_(account)
      48         383 : {}
      49             : 
      50             : void
      51         223 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      52             :                             const std::shared_ptr<SyncMsg>& syncMsg)
      53             : {
      54         223 :     auto acc = account_.lock();
      55         223 :     if (!acc)
      56           0 :         return;
      57         223 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      58         223 :     std::error_code ec;
      59         223 :     if (!syncMsg) {
      60             :         // Send contacts infos
      61             :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      62             :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
      63         201 :         if (auto info = acc->accountManager()->getInfo()) {
      64         201 :             if (info->contacts) {
      65         201 :                 SyncMsg msg;
      66         202 :                 msg.ds = info->contacts->getSyncData();
      67         202 :                 msgpack::pack(buffer, msg);
      68         202 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
      69             :                               buffer.size(),
      70             :                               ec);
      71         202 :                 if (ec) {
      72           3 :                     JAMI_ERROR("{:s}", ec.message());
      73           1 :                     return;
      74             :                 }
      75         202 :             }
      76             :         }
      77         201 :         buffer.clear();
      78             :         // Sync conversations
      79         201 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      80         201 :         if (!c.empty()) {
      81          92 :             SyncMsg msg;
      82          92 :             msg.c = std::move(c);
      83          92 :             msgpack::pack(buffer, msg);
      84          92 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      85          92 :             if (ec) {
      86           0 :                 JAMI_ERROR("{:s}", ec.message());
      87           0 :                 return;
      88             :             }
      89          92 :         }
      90         200 :         buffer.clear();
      91             :         // Sync requests
      92         200 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
      93         201 :         if (!cr.empty()) {
      94          20 :             SyncMsg msg;
      95          20 :             msg.cr = std::move(cr);
      96          20 :             msgpack::pack(buffer, msg);
      97          20 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      98          20 :             if (ec) {
      99           0 :                 JAMI_ERROR("{:s}", ec.message());
     100           0 :                 return;
     101             :             }
     102          20 :         }
     103             : 
     104         201 :         auto convModule = acc->convModule(true);
     105         201 :         if (!convModule)
     106           0 :             return;
     107             :         // Sync conversation's preferences
     108         201 :         auto p = convModule->convPreferences();
     109         201 :         if (!p.empty()) {
     110           3 :             SyncMsg msg;
     111           3 :             msg.p = std::move(p);
     112           3 :             msgpack::pack(buffer, msg);
     113           3 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     114           3 :             if (ec) {
     115           0 :                 JAMI_ERROR("{:s}", ec.message());
     116           0 :                 return;
     117             :             }
     118           3 :         }
     119         201 :         buffer.clear();
     120             :         // Sync read's status
     121         201 :         auto ms = convModule->convMessageStatus();
     122         201 :         if (!ms.empty()) {
     123          57 :             SyncMsg msg;
     124          57 :             msg.ms = std::move(ms);
     125          57 :             msgpack::pack(buffer, msg);
     126          57 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     127          57 :             if (ec) {
     128           0 :                 JAMI_ERROR("{:s}", ec.message());
     129           0 :                 return;
     130             :             }
     131          57 :         }
     132         201 :         buffer.clear();
     133             : 
     134         201 :     } else {
     135          22 :         msgpack::pack(buffer, *syncMsg);
     136          22 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     137          22 :         if (ec)
     138           0 :             JAMI_ERROR("{:s}", ec.message());
     139             :     }
     140         225 : }
     141             : 
     142             : ////////////////////////////////////////////////////////////////
     143             : 
     144         383 : SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
     145         383 :     : pimpl_ {std::make_shared<Impl>(std::move(account))}
     146         383 : {}
     147             : 
     148             : void
     149         138 : SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
     150             : {
     151         138 :     std::lock_guard lk(syncConnectionsMtx_);
     152         138 :     auto connectionsIt = syncConnections_.find(device);
     153         138 :     if (connectionsIt == syncConnections_.end())
     154           0 :         return;
     155         138 :     auto& connections = connectionsIt->second;
     156         138 :     auto conn = std::find(connections.begin(), connections.end(), socket);
     157         138 :     if (conn != connections.end())
     158         138 :         connections.erase(conn);
     159         138 :     if (connections.empty())
     160          72 :         syncConnections_.erase(connectionsIt);
     161         138 : }
     162             : 
     163             : void
     164         138 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     165             :                                 const std::string& peerId,
     166             :                                 const DeviceId& device)
     167             : {
     168         138 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     169         138 :     pimpl_->syncConnections_[device].emplace_back(socket);
     170             : 
     171         138 :     socket->onShutdown([w = pimpl_->weak_from_this(), device, s=std::weak_ptr(socket)]() {
     172         138 :         if (auto shared = w.lock())
     173         138 :             shared->onChannelShutdown(s.lock(), device);
     174         138 :     });
     175             : 
     176             :     struct DecodingContext
     177             :     {
     178        6841 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     179             :                                nullptr,
     180             :                                512};
     181             :     };
     182             : 
     183         138 :     socket->setOnRecv([acc = pimpl_->account_.lock(), device, peerId,
     184             :                        ctx = std::make_shared<DecodingContext>()
     185             :     ](const uint8_t* buf, size_t len) {
     186         375 :         if (!buf || !acc)
     187           0 :             return len;
     188             : 
     189         375 :         ctx->pac.reserve_buffer(len);
     190         375 :         std::copy_n(buf, len, ctx->pac.buffer());
     191         375 :         ctx->pac.buffer_consumed(len);
     192             : 
     193         375 :         msgpack::object_handle oh;
     194             :         try {
     195         750 :             while (ctx->pac.next(oh)) {
     196         375 :                 SyncMsg msg;
     197         375 :                 oh.get().convert(msg);
     198         375 :                 if (auto manager = acc->accountManager())
     199         375 :                     manager->onSyncData(std::move(msg.ds), false);
     200             : 
     201         375 :                 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
     202         177 :                     if (auto cm = acc->convModule(true))
     203         177 :                         cm->onSyncData(msg, peerId, device.toString());
     204         375 :             }
     205           0 :         } catch (const std::exception& e) {
     206           0 :             JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
     207           0 :         }
     208             : 
     209         375 :         return len;
     210         375 :     });
     211             : 
     212         137 :     pimpl_->syncInfos(socket, nullptr);
     213         138 : }
     214             : 
     215             : bool
     216         112 : SyncModule::isConnected(const DeviceId& deviceId) const
     217             : {
     218         112 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     219         112 :     auto it = pimpl_->syncConnections_.find(deviceId);
     220         112 :     if (it == pimpl_->syncConnections_.end())
     221          83 :         return false;
     222          29 :     return !it->second.empty();
     223         112 : }
     224             : 
     225             : void
     226        1552 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     227             : {
     228        1552 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     229        1638 :     for (auto& [did, sockets] : pimpl_->syncConnections_) {
     230          86 :         if (not sockets.empty()) {
     231          86 :             if (!deviceId || deviceId == did) {
     232          86 :                 pimpl_->syncInfos(sockets[0], syncMsg);
     233             :             }
     234             :         }
     235             :     }
     236        1552 : }
     237             : } // namespace jami

Generated by: LCOV version 1.14