Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 : #include "scheduled_executor.h"
21 : #include "logger.h"
22 :
23 : namespace jami {
24 :
25 : std::atomic<uint64_t> task_cookie = {0};
26 :
27 39 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
28 39 : : name_(name)
29 39 : , running_(std::make_shared<std::atomic<bool>>(true))
30 39 : , thread_([this, is_running = running_] {
31 : // The thread needs its own reference of `running_` in case the
32 : // scheduler is destroyed within the thread because of a job
33 :
34 89486 : while (*is_running)
35 89447 : loop();
36 39 : })
37 39 : {}
38 :
39 78 : ScheduledExecutor::~ScheduledExecutor()
40 : {
41 39 : stop();
42 :
43 39 : if (not thread_.joinable()) {
44 0 : return;
45 : }
46 :
47 : // Avoid deadlock
48 39 : if (std::this_thread::get_id() == thread_.get_id()) {
49 0 : thread_.detach();
50 : } else {
51 39 : thread_.join();
52 : }
53 39 : }
54 :
55 : void
56 78 : ScheduledExecutor::stop()
57 : {
58 78 : std::lock_guard lock(jobLock_);
59 78 : *running_ = false;
60 78 : jobs_.clear();
61 78 : cv_.notify_all();
62 78 : }
63 :
64 : void
65 88287 : ScheduledExecutor::run(std::function<void()>&& job,
66 : const char* filename, uint32_t linum)
67 : {
68 88287 : std::lock_guard lock(jobLock_);
69 88298 : auto now = clock::now();
70 88298 : jobs_[now].emplace_back(std::move(job), filename, linum);
71 88298 : cv_.notify_all();
72 88298 : }
73 :
74 : std::shared_ptr<Task>
75 2158 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
76 : const char* filename, uint32_t linum)
77 : {
78 2158 : auto ret = std::make_shared<Task>(std::move(job), filename, linum);
79 2158 : schedule(ret, t);
80 2158 : return ret;
81 0 : }
82 :
83 : std::shared_ptr<Task>
84 2158 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
85 : const char* filename, uint32_t linum)
86 : {
87 6474 : return schedule(std::move(job), clock::now() + dt,
88 6474 : filename, linum);
89 : }
90 :
91 : std::shared_ptr<RepeatedTask>
92 0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
93 : duration dt,
94 : const char* filename, uint32_t linum)
95 : {
96 0 : auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
97 0 : reschedule(ret, clock::now(), dt);
98 0 : return ret;
99 0 : }
100 :
101 : void
102 0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
103 : {
104 0 : const char* filename = task->job().filename;
105 0 : uint32_t linenum = task->job().linum;
106 0 : schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
107 0 : if (task->run(name_.c_str()))
108 0 : reschedule(std::move(task), t + dt, dt);
109 0 : }, filename, linenum),
110 : t);
111 0 : }
112 :
113 : void
114 2158 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
115 : {
116 2158 : const char* filename = task->job().filename;
117 2158 : uint32_t linenum = task->job().linum;
118 2158 : std::lock_guard lock(jobLock_);
119 3269 : jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
120 : filename, linenum);
121 2158 : cv_.notify_all();
122 2158 : }
123 :
124 : void
125 89447 : ScheduledExecutor::loop()
126 : {
127 89447 : std::vector<Job> jobs;
128 : {
129 89447 : std::unique_lock lock(jobLock_);
130 104337 : while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
131 14890 : if (jobs_.empty())
132 12260 : cv_.wait(lock);
133 : else {
134 2630 : auto nextJob = jobs_.begin()->first;
135 2630 : cv_.wait_until(lock, nextJob);
136 : }
137 : }
138 89447 : if (not *running_)
139 38 : return;
140 89409 : jobs = std::move(jobs_.begin()->second);
141 89409 : jobs_.erase(jobs_.begin());
142 89447 : }
143 178818 : for (auto& job : jobs) {
144 : try {
145 89409 : job.fn();
146 6 : } catch (const std::exception& e) {
147 6 : JAMI_ERR("Exception running job: %s", e.what());
148 6 : }
149 : }
150 89447 : }
151 :
152 : } // namespace jami
|