LCOV - code coverage report
Current view: top level - src - scheduled_executor.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 56 75 74.7 %
Date: 2024-12-21 08:56:24 Functions: 10 13 76.9 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : #include "scheduled_executor.h"
      18             : #include "logger.h"
      19             : 
      20             : namespace jami {
      21             : 
      22             : std::atomic<uint64_t> task_cookie = {0};
      23             : 
      24          39 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
      25          39 :     : name_(name)
      26          39 :     , running_(std::make_shared<std::atomic<bool>>(true))
      27          39 :     , thread_([this, is_running = running_] {
      28             :         // The thread needs its own reference of `running_` in case the
      29             :         // scheduler is destroyed within the thread because of a job
      30             : 
      31       30735 :         while (*is_running)
      32       30696 :             loop();
      33          39 :     })
      34          39 : {}
      35             : 
      36          78 : ScheduledExecutor::~ScheduledExecutor()
      37             : {
      38          39 :     stop();
      39             : 
      40          39 :     if (not thread_.joinable()) {
      41           0 :         return;
      42             :     }
      43             : 
      44             :     // Avoid deadlock
      45          39 :     if (std::this_thread::get_id() == thread_.get_id()) {
      46           0 :         thread_.detach();
      47             :     } else {
      48          39 :         thread_.join();
      49             :     }
      50          39 : }
      51             : 
      52             : void
      53          78 : ScheduledExecutor::stop()
      54             : {
      55          78 :     std::lock_guard lock(jobLock_);
      56          78 :     *running_ = false;
      57          78 :     jobs_.clear();
      58          78 :     cv_.notify_all();
      59          78 : }
      60             : 
      61             : void
      62       29548 : ScheduledExecutor::run(std::function<void()>&& job,
      63             :                        const char* filename, uint32_t linum)
      64             : {
      65       29548 :     std::lock_guard lock(jobLock_);
      66       29548 :     auto now = clock::now();
      67       29548 :     jobs_[now].emplace_back(std::move(job), filename, linum);
      68       29548 :     cv_.notify_all();
      69       29548 : }
      70             : 
      71             : std::shared_ptr<Task>
      72        2157 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
      73             :                             const char* filename, uint32_t linum)
      74             : {
      75        2157 :     auto ret = std::make_shared<Task>(std::move(job), filename, linum);
      76        2157 :     schedule(ret, t);
      77        2157 :     return ret;
      78           0 : }
      79             : 
      80             : std::shared_ptr<Task>
      81        2157 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
      82             :                               const char* filename, uint32_t linum)
      83             : {
      84        6471 :     return schedule(std::move(job), clock::now() + dt,
      85        6471 :                     filename, linum);
      86             : }
      87             : 
      88             : std::shared_ptr<RepeatedTask>
      89           0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
      90             :                                        duration dt,
      91             :                                        const char* filename, uint32_t linum)
      92             : {
      93           0 :     auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
      94           0 :     reschedule(ret, clock::now(), dt);
      95           0 :     return ret;
      96           0 : }
      97             : 
      98             : void
      99           0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
     100             : {
     101           0 :     const char* filename =  task->job().filename;
     102           0 :     uint32_t linenum = task->job().linum;
     103           0 :     schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
     104           0 :         if (task->run(name_.c_str()))
     105           0 :                 reschedule(std::move(task), t + dt, dt);
     106           0 :     }, filename, linenum),
     107             :              t);
     108           0 : }
     109             : 
     110             : void
     111        2157 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
     112             : {
     113        2157 :     const char* filename =  task->job().filename;
     114        2157 :     uint32_t linenum = task->job().linum;
     115        2157 :     std::lock_guard lock(jobLock_);
     116        3267 :     jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
     117             :                             filename, linenum);
     118        2157 :     cv_.notify_all();
     119        2157 : }
     120             : 
     121             : void
     122       30696 : ScheduledExecutor::loop()
     123             : {
     124       30696 :     std::vector<Job> jobs;
     125             :     {
     126       30696 :         std::unique_lock lock(jobLock_);
     127       49450 :         while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
     128       18754 :             if (jobs_.empty())
     129       16192 :                 cv_.wait(lock);
     130             :             else {
     131        2562 :                 auto nextJob = jobs_.begin()->first;
     132        2562 :                 cv_.wait_until(lock, nextJob);
     133             :             }
     134             :         }
     135       30696 :         if (not *running_)
     136          38 :             return;
     137       30658 :         jobs = std::move(jobs_.begin()->second);
     138       30658 :         jobs_.erase(jobs_.begin());
     139       30696 :     }
     140       61316 :     for (auto& job : jobs) {
     141             :         try {
     142       30658 :             job.fn();
     143           0 :         } catch (const std::exception& e) {
     144           0 :             JAMI_ERR("Exception running job: %s", e.what());
     145           0 :         }
     146             :     }
     147       30696 : }
     148             : 
     149             : } // namespace jami

Generated by: LCOV version 1.14