Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "scheduled_executor.h"
18 : #include "logger.h"
19 :
20 : namespace jami {
21 :
22 : std::atomic<uint64_t> task_cookie = {0};
23 :
24 40 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
25 40 : : name_(name)
26 40 : , running_(std::make_shared<std::atomic<bool>>(true))
27 40 : , thread_([this, is_running = running_] {
28 : // The thread needs its own reference of `running_` in case the
29 : // scheduler is destroyed within the thread because of a job
30 :
31 11769 : while (*is_running)
32 11729 : loop();
33 40 : })
34 40 : {}
35 :
36 80 : ScheduledExecutor::~ScheduledExecutor()
37 : {
38 40 : stop();
39 :
40 40 : if (not thread_.joinable()) {
41 0 : return;
42 : }
43 :
44 : // Avoid deadlock
45 40 : if (std::this_thread::get_id() == thread_.get_id()) {
46 0 : thread_.detach();
47 : } else {
48 40 : thread_.join();
49 : }
50 40 : }
51 :
52 : void
53 80 : ScheduledExecutor::stop()
54 : {
55 80 : std::lock_guard lock(jobLock_);
56 80 : *running_ = false;
57 80 : jobs_.clear();
58 80 : cv_.notify_all();
59 80 : }
60 :
61 : void
62 10576 : ScheduledExecutor::run(std::function<void()>&& job,
63 : const char* filename, uint32_t linum)
64 : {
65 10576 : std::lock_guard lock(jobLock_);
66 10576 : auto now = clock::now();
67 10576 : jobs_[now].emplace_back(std::move(job), filename, linum);
68 10576 : cv_.notify_all();
69 10576 : }
70 :
71 : std::shared_ptr<Task>
72 2157 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
73 : const char* filename, uint32_t linum)
74 : {
75 2157 : auto ret = std::make_shared<Task>(std::move(job), filename, linum);
76 2157 : schedule(ret, t);
77 2157 : return ret;
78 0 : }
79 :
80 : std::shared_ptr<Task>
81 2157 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
82 : const char* filename, uint32_t linum)
83 : {
84 6471 : return schedule(std::move(job), clock::now() + dt,
85 6471 : filename, linum);
86 : }
87 :
88 : std::shared_ptr<RepeatedTask>
89 0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
90 : duration dt,
91 : const char* filename, uint32_t linum)
92 : {
93 0 : auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
94 0 : reschedule(ret, clock::now(), dt);
95 0 : return ret;
96 0 : }
97 :
98 : void
99 0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
100 : {
101 0 : const char* filename = task->job().filename;
102 0 : uint32_t linenum = task->job().linum;
103 0 : schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
104 0 : if (task->run(name_.c_str()))
105 0 : reschedule(std::move(task), t + dt, dt);
106 0 : }, filename, linenum),
107 : t);
108 0 : }
109 :
110 : void
111 2157 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
112 : {
113 2157 : const char* filename = task->job().filename;
114 2157 : uint32_t linenum = task->job().linum;
115 2157 : std::lock_guard lock(jobLock_);
116 3271 : jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
117 : filename, linenum);
118 2157 : cv_.notify_all();
119 2157 : }
120 :
121 : void
122 11729 : ScheduledExecutor::loop()
123 : {
124 11729 : std::vector<Job> jobs;
125 : {
126 11729 : std::unique_lock lock(jobLock_);
127 18135 : while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
128 6406 : if (jobs_.empty())
129 3941 : cv_.wait(lock);
130 : else {
131 2465 : auto nextJob = jobs_.begin()->first;
132 2465 : cv_.wait_until(lock, nextJob);
133 : }
134 : }
135 11729 : if (not *running_)
136 39 : return;
137 11690 : jobs = std::move(jobs_.begin()->second);
138 11690 : jobs_.erase(jobs_.begin());
139 11729 : }
140 23380 : for (auto& job : jobs) {
141 : try {
142 11690 : job.fn();
143 0 : } catch (const std::exception& e) {
144 0 : JAMI_ERR("Exception running job: %s", e.what());
145 0 : }
146 : }
147 11729 : }
148 :
149 : } // namespace jami
|