LCOV - code coverage report
Current view: top level - src - scheduled_executor.cpp (source / functions) Hit Total Coverage
Test: jami-coverage-filtered.info Lines: 59 75 78.7 %
Date: 2024-04-23 08:02:50 Functions: 10 13 76.9 %

          Line data    Source code
       1             : /*
       2             :  *  Copyright (C) 2004-2024 Savoir-faire Linux Inc.
       3             :  *
       4             :  *  Author: Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
       5             :  *
       6             :  *  This program is free software; you can redistribute it and/or modify
       7             :  *  it under the terms of the GNU General Public License as published by
       8             :  *  the Free Software Foundation; either version 3 of the License, or
       9             :  *  (at your option) any later version.
      10             :  *
      11             :  *  This program is distributed in the hope that it will be useful,
      12             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  *  GNU General Public License for more details.
      15             :  *
      16             :  *  You should have received a copy of the GNU General Public License
      17             :  *  along with this program; if not, write to the Free Software
      18             :  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.
      19             :  */
      20             : #include "scheduled_executor.h"
      21             : #include "logger.h"
      22             : 
      23             : namespace jami {
      24             : 
      25             : std::atomic<uint64_t> task_cookie = {0};
      26             : 
      27          39 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
      28          39 :     : name_(name)
      29          39 :     , running_(std::make_shared<std::atomic<bool>>(true))
      30          39 :     , thread_([this, is_running = running_] {
      31             :         // The thread needs its own reference of `running_` in case the
      32             :         // scheduler is destroyed within the thread because of a job
      33             : 
      34       89486 :         while (*is_running)
      35       89447 :             loop();
      36          39 :     })
      37          39 : {}
      38             : 
      39          78 : ScheduledExecutor::~ScheduledExecutor()
      40             : {
      41          39 :     stop();
      42             : 
      43          39 :     if (not thread_.joinable()) {
      44           0 :         return;
      45             :     }
      46             : 
      47             :     // Avoid deadlock
      48          39 :     if (std::this_thread::get_id() == thread_.get_id()) {
      49           0 :         thread_.detach();
      50             :     } else {
      51          39 :         thread_.join();
      52             :     }
      53          39 : }
      54             : 
      55             : void
      56          78 : ScheduledExecutor::stop()
      57             : {
      58          78 :     std::lock_guard lock(jobLock_);
      59          78 :     *running_ = false;
      60          78 :     jobs_.clear();
      61          78 :     cv_.notify_all();
      62          78 : }
      63             : 
      64             : void
      65       88287 : ScheduledExecutor::run(std::function<void()>&& job,
      66             :                        const char* filename, uint32_t linum)
      67             : {
      68       88287 :     std::lock_guard lock(jobLock_);
      69       88298 :     auto now = clock::now();
      70       88298 :     jobs_[now].emplace_back(std::move(job), filename, linum);
      71       88298 :     cv_.notify_all();
      72       88298 : }
      73             : 
      74             : std::shared_ptr<Task>
      75        2158 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
      76             :                             const char* filename, uint32_t linum)
      77             : {
      78        2158 :     auto ret = std::make_shared<Task>(std::move(job), filename, linum);
      79        2158 :     schedule(ret, t);
      80        2158 :     return ret;
      81           0 : }
      82             : 
      83             : std::shared_ptr<Task>
      84        2158 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
      85             :                               const char* filename, uint32_t linum)
      86             : {
      87        6474 :     return schedule(std::move(job), clock::now() + dt,
      88        6474 :                     filename, linum);
      89             : }
      90             : 
      91             : std::shared_ptr<RepeatedTask>
      92           0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
      93             :                                        duration dt,
      94             :                                        const char* filename, uint32_t linum)
      95             : {
      96           0 :     auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
      97           0 :     reschedule(ret, clock::now(), dt);
      98           0 :     return ret;
      99           0 : }
     100             : 
     101             : void
     102           0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
     103             : {
     104           0 :     const char* filename =  task->job().filename;
     105           0 :     uint32_t linenum = task->job().linum;
     106           0 :     schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
     107           0 :         if (task->run(name_.c_str()))
     108           0 :                 reschedule(std::move(task), t + dt, dt);
     109           0 :     }, filename, linenum),
     110             :              t);
     111           0 : }
     112             : 
     113             : void
     114        2158 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
     115             : {
     116        2158 :     const char* filename =  task->job().filename;
     117        2158 :     uint32_t linenum = task->job().linum;
     118        2158 :     std::lock_guard lock(jobLock_);
     119        3269 :     jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
     120             :                             filename, linenum);
     121        2158 :     cv_.notify_all();
     122        2158 : }
     123             : 
     124             : void
     125       89447 : ScheduledExecutor::loop()
     126             : {
     127       89447 :     std::vector<Job> jobs;
     128             :     {
     129       89447 :         std::unique_lock lock(jobLock_);
     130      104337 :         while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
     131       14890 :             if (jobs_.empty())
     132       12260 :                 cv_.wait(lock);
     133             :             else {
     134        2630 :                 auto nextJob = jobs_.begin()->first;
     135        2630 :                 cv_.wait_until(lock, nextJob);
     136             :             }
     137             :         }
     138       89447 :         if (not *running_)
     139          38 :             return;
     140       89409 :         jobs = std::move(jobs_.begin()->second);
     141       89409 :         jobs_.erase(jobs_.begin());
     142       89447 :     }
     143      178818 :     for (auto& job : jobs) {
     144             :         try {
     145       89409 :             job.fn();
     146           6 :         } catch (const std::exception& e) {
     147           6 :             JAMI_ERR("Exception running job: %s", e.what());
     148           6 :         }
     149             :     }
     150       89447 : }
     151             : 
     152             : } // namespace jami

Generated by: LCOV version 1.14