Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #pragma once
18 :
19 : #include <thread>
20 : #include <functional>
21 : #include <map>
22 : #include <vector>
23 : #include <chrono>
24 : #include <memory>
25 : #include <atomic>
26 : #include <mutex>
27 : #include <condition_variable>
28 : #include <ciso646>
29 : #include <string>
30 :
31 : #include "noncopyable.h"
32 :
33 : #include "tracepoint.h"
34 : #include "trace-tools.h"
35 :
36 : namespace jami {
37 :
38 : extern std::atomic<uint64_t> task_cookie;
39 :
40 : /**
41 : * A runnable function
42 : */
43 : struct Job {
44 54032 : Job(std::function<void()>&& f, const char* file, uint32_t l)
45 54032 : : fn(std::move(f))
46 54032 : , filename(file)
47 54032 : , linum(l) { }
48 :
49 : std::function<void()> fn;
50 : const char* filename;
51 : uint32_t linum;
52 :
53 : inline operator bool() const {
54 : return static_cast<bool>(fn);
55 : }
56 :
57 0 : void reset() {
58 0 : fn = {};
59 0 : }
60 : };
61 :
62 : struct RepeatedJob {
63 0 : RepeatedJob(std::function<bool()>&& f, const char* file, uint32_t l)
64 0 : : fn(std::move(f))
65 0 : , filename(file)
66 0 : , linum(l) { }
67 :
68 : std::function<bool()> fn;
69 : const char* filename;
70 : uint32_t linum;
71 :
72 0 : inline operator bool() {
73 0 : return static_cast<bool>(fn);
74 : }
75 :
76 0 : void reset() {
77 0 : fn = {};
78 0 : }
79 : };
80 :
81 : /**
82 : * A Job that can be disposed
83 : */
84 : class Task
85 : {
86 : public:
87 2158 : Task(std::function<void()>&& fn, const char* filename, uint32_t linum)
88 2158 : : job_(std::move(fn), filename, linum)
89 2158 : , cookie_(task_cookie++) { }
90 :
91 : #pragma GCC diagnostic push
92 : #pragma GCC diagnostic ignored "-Wunused-parameter"
93 1113 : void run(const char* executor_name)
94 : {
95 1113 : if (job_.fn) {
96 : jami_tracepoint(scheduled_executor_task_begin,
97 : executor_name,
98 : job_.filename, job_.linum,
99 : cookie_);
100 1113 : job_.fn();
101 : jami_tracepoint(scheduled_executor_task_end,
102 : cookie_);
103 : }
104 1113 : }
105 : #pragma GCC pop
106 :
107 0 : void cancel() { job_.reset(); }
108 : bool isCancelled() const { return !job_; }
109 :
110 4316 : Job& job() { return job_; }
111 :
112 : private:
113 : Job job_;
114 : uint64_t cookie_;
115 : };
116 :
117 : /**
118 : * A RepeatedJob that can be disposed
119 : */
120 : class RepeatedTask
121 : {
122 : public:
123 0 : RepeatedTask(std::function<bool()>&& fn, const char* filename,
124 : uint32_t linum)
125 0 : : job_(std::move(fn), filename, linum)
126 0 : , cookie_(task_cookie++) { }
127 :
128 : #pragma GCC diagnostic push
129 : #pragma GCC diagnostic ignored "-Wunused-parameter"
130 0 : bool run(const char* executor_name)
131 : {
132 : bool cont;
133 0 : std::lock_guard l(lock_);
134 :
135 0 : if (not cancel_.load() and job_.fn) {
136 : jami_tracepoint(scheduled_executor_task_begin,
137 : executor_name,
138 : job_.filename, job_.linum,
139 : cookie_);
140 0 : cont = job_.fn();
141 : jami_tracepoint(scheduled_executor_task_end,
142 : cookie_);
143 :
144 : } else {
145 0 : cont = false;
146 : }
147 :
148 0 : if (not cont) {
149 0 : cancel_.store(true);
150 0 : job_.reset();
151 : }
152 :
153 0 : return static_cast<bool>(job_);
154 0 : }
155 : #pragma GCC pop
156 :
157 : void cancel() { cancel_.store(true); }
158 :
159 : void destroy()
160 : {
161 : cancel();
162 : std::lock_guard l(lock_);
163 : job_.reset();
164 : }
165 :
166 : bool isCancelled() const { return cancel_.load(); }
167 :
168 0 : RepeatedJob& job() { return job_; }
169 :
170 : private:
171 : NON_COPYABLE(RepeatedTask);
172 : RepeatedJob job_;
173 : mutable std::mutex lock_;
174 : std::atomic_bool cancel_ {false};
175 : uint64_t cookie_;
176 : };
177 :
178 : class ScheduledExecutor
179 : {
180 : public:
181 : using clock = std::chrono::steady_clock;
182 : using time_point = clock::time_point;
183 : using duration = clock::duration;
184 :
185 : ScheduledExecutor(const std::string& name_);
186 : ~ScheduledExecutor();
187 :
188 : /**
189 : * Schedule job to be run ASAP
190 : */
191 : void run(std::function<void()>&& job,
192 : const char* filename=CURRENT_FILENAME(),
193 : uint32_t linum=CURRENT_LINE());
194 :
195 : /**
196 : * Schedule job to be run at time t
197 : */
198 : std::shared_ptr<Task> schedule(std::function<void()>&& job, time_point t,
199 : const char* filename=CURRENT_FILENAME(),
200 : uint32_t linum=CURRENT_LINE());
201 :
202 : /**
203 : * Schedule job to be run after delay dt
204 : */
205 : std::shared_ptr<Task> scheduleIn(std::function<void()>&& job, duration dt,
206 : const char* filename=CURRENT_FILENAME(),
207 : uint32_t linum=CURRENT_LINE());
208 :
209 : /**
210 : * Schedule job to be run every dt, starting now.
211 : */
212 : std::shared_ptr<RepeatedTask> scheduleAtFixedRate(std::function<bool()>&& job,
213 : duration dt,
214 : const char* filename=CURRENT_FILENAME(),
215 : uint32_t linum=CURRENT_LINE());
216 :
217 : /**
218 : * Stop the scheduler, it is unable to be reversed
219 : */
220 : void stop();
221 :
222 : private:
223 : NON_COPYABLE(ScheduledExecutor);
224 :
225 : void loop();
226 : void schedule(std::shared_ptr<Task>, time_point t);
227 : void reschedule(std::shared_ptr<RepeatedTask>, time_point t, duration dt);
228 :
229 : std::string name_;
230 : std::shared_ptr<std::atomic<bool>> running_;
231 : std::map<time_point, std::vector<Job>> jobs_ {};
232 : std::mutex jobLock_ {};
233 : std::condition_variable cv_ {};
234 : std::thread thread_;
235 : };
236 :
237 : } // namespace jami
|