Line data Source code
1 : /*
2 : * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "scheduled_executor.h"
18 : #include "logger.h"
19 :
20 : namespace jami {
21 :
22 : std::atomic<uint64_t> task_cookie = {0};
23 :
24 39 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
25 39 : : name_(name)
26 39 : , running_(std::make_shared<std::atomic<bool>>(true))
27 39 : , thread_([this, is_running = running_] {
28 : // The thread needs its own reference of `running_` in case the
29 : // scheduler is destroyed within the thread because of a job
30 :
31 14504 : while (*is_running)
32 14465 : loop();
33 39 : })
34 39 : {}
35 :
36 78 : ScheduledExecutor::~ScheduledExecutor()
37 : {
38 39 : stop();
39 :
40 39 : if (not thread_.joinable()) {
41 0 : return;
42 : }
43 :
44 : // Avoid deadlock
45 39 : if (std::this_thread::get_id() == thread_.get_id()) {
46 0 : thread_.detach();
47 : } else {
48 39 : thread_.join();
49 : }
50 39 : }
51 :
52 : void
53 78 : ScheduledExecutor::stop()
54 : {
55 78 : std::lock_guard lock(jobLock_);
56 78 : *running_ = false;
57 78 : jobs_.clear();
58 78 : cv_.notify_all();
59 78 : }
60 :
61 : void
62 13332 : ScheduledExecutor::run(std::function<void()>&& job, const char* filename, uint32_t linum)
63 : {
64 13332 : std::lock_guard lock(jobLock_);
65 13332 : auto now = clock::now();
66 13332 : jobs_[now].emplace_back(std::move(job), filename, linum);
67 13332 : cv_.notify_all();
68 13332 : }
69 :
70 : std::shared_ptr<Task>
71 2152 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t, const char* filename, uint32_t linum)
72 : {
73 2152 : auto ret = std::make_shared<Task>(std::move(job), filename, linum);
74 2152 : schedule(ret, t);
75 2152 : return ret;
76 0 : }
77 :
78 : std::shared_ptr<Task>
79 2152 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt, const char* filename, uint32_t linum)
80 : {
81 4304 : return schedule(std::move(job), clock::now() + dt, filename, linum);
82 : }
83 :
84 : std::shared_ptr<RepeatedTask>
85 0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job, duration dt, const char* filename, uint32_t linum)
86 : {
87 0 : auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
88 0 : reschedule(ret, clock::now(), dt);
89 0 : return ret;
90 0 : }
91 :
92 : void
93 0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
94 : {
95 0 : const char* filename = task->job().filename;
96 0 : uint32_t linenum = task->job().linum;
97 0 : schedule(std::make_shared<Task>(
98 0 : [this, task = std::move(task), t, dt]() mutable {
99 0 : if (task->run(name_.c_str()))
100 0 : reschedule(std::move(task), t + dt, dt);
101 0 : },
102 : filename,
103 : linenum),
104 : t);
105 0 : }
106 :
107 : void
108 2152 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
109 : {
110 2152 : const char* filename = task->job().filename;
111 2152 : uint32_t linenum = task->job().linum;
112 2152 : std::lock_guard lock(jobLock_);
113 3263 : jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); }, filename, linenum);
114 2152 : cv_.notify_all();
115 2152 : }
116 :
117 : void
118 14465 : ScheduledExecutor::loop()
119 : {
120 14465 : std::vector<Job> jobs;
121 : {
122 14465 : std::unique_lock lock(jobLock_);
123 23753 : while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
124 9288 : if (jobs_.empty())
125 6965 : cv_.wait(lock);
126 : else {
127 2323 : auto nextJob = jobs_.begin()->first;
128 2323 : cv_.wait_until(lock, nextJob);
129 : }
130 : }
131 14465 : if (not *running_)
132 37 : return;
133 14428 : jobs = std::move(jobs_.begin()->second);
134 14428 : jobs_.erase(jobs_.begin());
135 14465 : }
136 28856 : for (auto& job : jobs) {
137 : try {
138 14428 : job.fn();
139 0 : } catch (const std::exception& e) {
140 0 : JAMI_ERR("Exception running job: %s", e.what());
141 0 : }
142 : }
143 14465 : }
144 :
145 : } // namespace jami
|