LCOV - code coverage report
Current view: top level - foo/src - scheduled_executor.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 55 75 73.3 %
Date: 2025-12-18 10:07:43 Functions: 10 13 76.9 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2025 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  This program is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  This program is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
      16             :  */
      17             : #include "scheduled_executor.h"
      18             : #include "logger.h"
      19             : 
      20             : namespace jami {
      21             : 
      22             : std::atomic<uint64_t> task_cookie = {0};
      23             : 
      24          39 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
      25          39 :     : name_(name)
      26          39 :     , running_(std::make_shared<std::atomic<bool>>(true))
      27          39 :     , thread_([this, is_running = running_] {
      28             :         // The thread needs its own reference of `running_` in case the
      29             :         // scheduler is destroyed within the thread because of a job
      30             : 
      31       14504 :         while (*is_running)
      32       14465 :             loop();
      33          39 :     })
      34          39 : {}
      35             : 
      36          78 : ScheduledExecutor::~ScheduledExecutor()
      37             : {
      38          39 :     stop();
      39             : 
      40          39 :     if (not thread_.joinable()) {
      41           0 :         return;
      42             :     }
      43             : 
      44             :     // Avoid deadlock
      45          39 :     if (std::this_thread::get_id() == thread_.get_id()) {
      46           0 :         thread_.detach();
      47             :     } else {
      48          39 :         thread_.join();
      49             :     }
      50          39 : }
      51             : 
      52             : void
      53          78 : ScheduledExecutor::stop()
      54             : {
      55          78 :     std::lock_guard lock(jobLock_);
      56          78 :     *running_ = false;
      57          78 :     jobs_.clear();
      58          78 :     cv_.notify_all();
      59          78 : }
      60             : 
      61             : void
      62       13332 : ScheduledExecutor::run(std::function<void()>&& job, const char* filename, uint32_t linum)
      63             : {
      64       13332 :     std::lock_guard lock(jobLock_);
      65       13332 :     auto now = clock::now();
      66       13332 :     jobs_[now].emplace_back(std::move(job), filename, linum);
      67       13332 :     cv_.notify_all();
      68       13332 : }
      69             : 
      70             : std::shared_ptr<Task>
      71        2152 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t, const char* filename, uint32_t linum)
      72             : {
      73        2152 :     auto ret = std::make_shared<Task>(std::move(job), filename, linum);
      74        2152 :     schedule(ret, t);
      75        2152 :     return ret;
      76           0 : }
      77             : 
      78             : std::shared_ptr<Task>
      79        2152 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt, const char* filename, uint32_t linum)
      80             : {
      81        4304 :     return schedule(std::move(job), clock::now() + dt, filename, linum);
      82             : }
      83             : 
      84             : std::shared_ptr<RepeatedTask>
      85           0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job, duration dt, const char* filename, uint32_t linum)
      86             : {
      87           0 :     auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
      88           0 :     reschedule(ret, clock::now(), dt);
      89           0 :     return ret;
      90           0 : }
      91             : 
      92             : void
      93           0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
      94             : {
      95           0 :     const char* filename = task->job().filename;
      96           0 :     uint32_t linenum = task->job().linum;
      97           0 :     schedule(std::make_shared<Task>(
      98           0 :                  [this, task = std::move(task), t, dt]() mutable {
      99           0 :                      if (task->run(name_.c_str()))
     100           0 :                          reschedule(std::move(task), t + dt, dt);
     101           0 :                  },
     102             :                  filename,
     103             :                  linenum),
     104             :              t);
     105           0 : }
     106             : 
     107             : void
     108        2152 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
     109             : {
     110        2152 :     const char* filename = task->job().filename;
     111        2152 :     uint32_t linenum = task->job().linum;
     112        2152 :     std::lock_guard lock(jobLock_);
     113        3263 :     jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); }, filename, linenum);
     114        2152 :     cv_.notify_all();
     115        2152 : }
     116             : 
     117             : void
     118       14465 : ScheduledExecutor::loop()
     119             : {
     120       14465 :     std::vector<Job> jobs;
     121             :     {
     122       14465 :         std::unique_lock lock(jobLock_);
     123       23753 :         while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
     124        9288 :             if (jobs_.empty())
     125        6965 :                 cv_.wait(lock);
     126             :             else {
     127        2323 :                 auto nextJob = jobs_.begin()->first;
     128        2323 :                 cv_.wait_until(lock, nextJob);
     129             :             }
     130             :         }
     131       14465 :         if (not *running_)
     132          37 :             return;
     133       14428 :         jobs = std::move(jobs_.begin()->second);
     134       14428 :         jobs_.erase(jobs_.begin());
     135       14465 :     }
     136       28856 :     for (auto& job : jobs) {
     137             :         try {
     138       14428 :             job.fn();
     139           0 :         } catch (const std::exception& e) {
     140           0 :             JAMI_ERR("Exception running job: %s", e.what());
     141           0 :         }
     142             :     }
     143       14465 : }
     144             : 
     145             : } // namespace jami

Generated by: LCOV version 1.14