LCOV - code coverage report
Current view: top level - src/jamidht - sync_module.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 120 144 83.3 %
Date: 2024-05-01 08:46:49 Functions: 13 25 52.0 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2021-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
       5             :  *
       6             :  *  This program is free software; you can redistribute it and/or modify
       7             :  *  it under the terms of the GNU General Public License as published by
       8             :  *  the Free Software Foundation; either version 3 of the License, or
       9             :  *  (at your option) any later version.
      10             :  *
      11             :  *  This program is distributed in the hope that it will be useful,
      12             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  *  GNU General Public License for more details.
      15             :  *
      16             :  *  You should have received a copy of the GNU General Public License
      17             :  *  along with this program; if not, write to the Free Software
      18             :  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "sync_module.h"
      22             : 
      23             : #include "jamidht/conversation_module.h"
      24             : #include "jamidht/archive_account_manager.h"
      25             : #include <dhtnet/multiplexed_socket.h>
      26             : 
      27             : namespace jami {
      28             : 
      29             : class SyncModule::Impl : public std::enable_shared_from_this<Impl>
      30             : {
      31             : public:
      32             :     Impl(std::weak_ptr<JamiAccount>&& account);
      33             : 
      34             :     std::weak_ptr<JamiAccount> account_;
      35             : 
      36             :     // Sync connections
      37             :     std::recursive_mutex syncConnectionsMtx_;
      38             :     std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
      39             : 
      40         168 :     std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
      41             : 
      42             :     /**
      43             :      * Build SyncMsg and send it on socket
      44             :      * @param socket
      45             :      */
      46             :     void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      47             :                    const std::shared_ptr<SyncMsg>& syncMsg);
      48             : };
      49             : 
      50         289 : SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
      51         289 :     : account_(account)
      52         289 : {}
      53             : 
      54             : void
      55         240 : SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
      56             :                             const std::shared_ptr<SyncMsg>& syncMsg)
      57             : {
      58         240 :     auto acc = account_.lock();
      59         239 :     if (!acc)
      60           0 :         return;
      61         239 :     Json::Value syncValue;
      62         240 :     msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
      63         240 :     std::error_code ec;
      64         240 :     if (!syncMsg) {
      65             :         // Send contacts infos
      66             :         // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
      67             :         // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all informations).
      68         222 :         if (auto info = acc->accountManager()->getInfo()) {
      69         221 :             if (info->contacts) {
      70         222 :                 SyncMsg msg;
      71         221 :                 msg.ds = info->contacts->getSyncData();
      72         221 :                 msgpack::pack(buffer, msg);
      73         221 :                 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
      74             :                               buffer.size(),
      75             :                               ec);
      76         222 :                 if (ec) {
      77           0 :                     JAMI_ERROR("{:s}", ec.message());
      78           0 :                     return;
      79             :                 }
      80         222 :             }
      81             :         }
      82         222 :         buffer.clear();
      83             :         // Sync conversations
      84         221 :         auto c = ConversationModule::convInfos(acc->getAccountID());
      85         222 :         if (!c.empty()) {
      86          86 :             SyncMsg msg;
      87          86 :             msg.c = std::move(c);
      88          86 :             msgpack::pack(buffer, msg);
      89          86 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
      90          86 :             if (ec) {
      91           6 :                 JAMI_ERROR("{:s}", ec.message());
      92           2 :                 return;
      93             :             }
      94          86 :         }
      95         220 :         buffer.clear();
      96             :         // Sync requests
      97         220 :         auto cr = ConversationModule::convRequests(acc->getAccountID());
      98         220 :         if (!cr.empty()) {
      99          20 :             SyncMsg msg;
     100          20 :             msg.cr = std::move(cr);
     101          20 :             msgpack::pack(buffer, msg);
     102          20 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     103          20 :             if (ec) {
     104           0 :                 JAMI_ERROR("{:s}", ec.message());
     105           0 :                 return;
     106             :             }
     107          20 :         }
     108             : 
     109         220 :         auto convModule = acc->convModule(true);
     110         220 :         if (!convModule)
     111           0 :             return;
     112             :         // Sync conversation's preferences
     113         220 :         auto p = convModule->convPreferences();
     114         220 :         if (!p.empty()) {
     115           0 :             SyncMsg msg;
     116           0 :             msg.p = std::move(p);
     117           0 :             msgpack::pack(buffer, msg);
     118           0 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     119           0 :             if (ec) {
     120           0 :                 JAMI_ERROR("{:s}", ec.message());
     121           0 :                 return;
     122             :             }
     123           0 :         }
     124         220 :         buffer.clear();
     125             :         // Sync read's status
     126         220 :         auto ms = convModule->convMessageStatus();
     127         220 :         if (!ms.empty()) {
     128          54 :             SyncMsg msg;
     129          54 :             msg.ms = std::move(ms);
     130          54 :             msgpack::pack(buffer, msg);
     131          54 :             socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     132          54 :             if (ec) {
     133           0 :                 JAMI_ERROR("{:s}", ec.message());
     134           0 :                 return;
     135             :             }
     136          54 :         }
     137         220 :         buffer.clear();
     138             : 
     139         222 :     } else {
     140          18 :         msgpack::pack(buffer, *syncMsg);
     141          18 :         socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
     142          18 :         if (ec)
     143           0 :             JAMI_ERROR("{:s}", ec.message());
     144             :     }
     145         244 : }
     146             : 
     147             : ////////////////////////////////////////////////////////////////
     148             : 
     149         289 : SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
     150         289 :     : pimpl_ {std::make_shared<Impl>(std::move(account))}
     151         289 : {}
     152             : 
     153             : void
     154         111 : SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
     155             :                                 const std::string& peerId,
     156             :                                 const DeviceId& device)
     157             : {
     158         111 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     159         112 :     pimpl_->syncConnections_[device].emplace_back(socket);
     160             : 
     161         112 :     socket->onShutdown([w = pimpl_->weak(), peerId, device, socket]() {
     162          56 :         auto shared = w.lock();
     163          56 :         if (!shared)
     164           0 :             return;
     165          56 :         std::lock_guard lk(shared->syncConnectionsMtx_);
     166          56 :         auto& connections = shared->syncConnections_[device];
     167          56 :         auto conn = connections.begin();
     168         166 :         while (conn != connections.end()) {
     169         110 :             if (*conn == socket)
     170          56 :                 conn = connections.erase(conn);
     171             :             else
     172          54 :                 conn++;
     173             :         }
     174          56 :     });
     175             : 
     176             : 
     177             :     struct DecodingContext
     178             :     {
     179        7071 :         msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
     180             :                                nullptr,
     181             :                                512};
     182             :     };
     183             : 
     184         112 :     socket->setOnRecv([acc = pimpl_->account_.lock(), device, peerId,
     185             :                        ctx = std::make_shared<DecodingContext>()
     186             :     ](const uint8_t* buf, size_t len) {
     187         384 :         if (!buf || !acc)
     188           0 :             return len;
     189             : 
     190         384 :         ctx->pac.reserve_buffer(len);
     191         384 :         std::copy_n(buf, len, ctx->pac.buffer());
     192         384 :         ctx->pac.buffer_consumed(len);
     193             : 
     194         384 :         msgpack::object_handle oh;
     195         384 :         SyncMsg msg;
     196             : 
     197             :         try {
     198         769 :             while (ctx->pac.next(oh)) {
     199         384 :                 oh.get().convert(msg);
     200         384 :                 if (auto manager = acc->accountManager())
     201         382 :                     manager->onSyncData(std::move(msg.ds), false);
     202             : 
     203         384 :                 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
     204         162 :                     if (auto cm = acc->convModule(true))
     205         162 :                         cm->onSyncData(msg, peerId, device.toString());
     206             :             }
     207           0 :         } catch (const std::exception& e) {
     208           0 :             JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
     209           0 :         }
     210             : 
     211         384 :         return len;
     212         384 :     });
     213             : 
     214         112 :     pimpl_->syncInfos(socket, nullptr);
     215         111 : }
     216             : 
     217             : void
     218          56 : SyncModule::syncWith(const DeviceId& deviceId,
     219             :                      const std::shared_ptr<dhtnet::ChannelSocket>& socket,
     220             :                      const std::shared_ptr<SyncMsg>& syncMsg)
     221             : {
     222          56 :     if (!socket)
     223           0 :         return;
     224             :     {
     225          56 :         std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     226          56 :         socket->onShutdown([w = pimpl_->weak(), socket, deviceId]() {
     227             :             // When sock is shutdown update syncConnections_ to be able to resync asap
     228          56 :             auto shared = w.lock();
     229          56 :             if (!shared)
     230           0 :                 return;
     231          56 :             std::lock_guard lk(shared->syncConnectionsMtx_);
     232          56 :             auto& connections = shared->syncConnections_[deviceId];
     233          56 :             auto conn = connections.begin();
     234         195 :             while (conn != connections.end()) {
     235         139 :                 if (*conn == socket)
     236         112 :                     conn = connections.erase(conn);
     237             :                 else
     238          27 :                     conn++;
     239             :             }
     240          56 :             if (connections.empty()) {
     241          29 :                 shared->syncConnections_.erase(deviceId);
     242             :             }
     243          56 :         });
     244          56 :         pimpl_->syncConnections_[deviceId].emplace_back(socket);
     245          56 :     }
     246          56 :     pimpl_->syncInfos(socket, syncMsg);
     247             : }
     248             : 
     249             : void
     250        1326 : SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
     251             : {
     252        1326 :     std::lock_guard lk(pimpl_->syncConnectionsMtx_);
     253        1399 :     for (auto& [did, sockets] : pimpl_->syncConnections_) {
     254          73 :         if (not sockets.empty()) {
     255          72 :             if (!deviceId || deviceId == did) {
     256          72 :                 pimpl_->syncInfos(sockets[0], syncMsg);
     257             :             }
     258             :         }
     259             :     }
     260        1326 : }
     261             : } // namespace jami

Generated by: LCOV version 1.14