Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * Author: Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 : #pragma once
21 :
22 : #include <thread>
23 : #include <functional>
24 : #include <map>
25 : #include <vector>
26 : #include <chrono>
27 : #include <memory>
28 : #include <atomic>
29 : #include <mutex>
30 : #include <condition_variable>
31 : #include <ciso646>
32 : #include <string>
33 :
34 : #include "noncopyable.h"
35 :
36 : #include "tracepoint.h"
37 : #include "trace-tools.h"
38 :
39 : namespace jami {
40 :
41 : extern std::atomic<uint64_t> task_cookie;
42 :
43 : /**
44 : * A runnable function
45 : */
46 : struct Job {
47 91510 : Job(std::function<void()>&& f, const char* file, uint32_t l)
48 91510 : : fn(std::move(f))
49 91510 : , filename(file)
50 91510 : , linum(l) { }
51 :
52 : std::function<void()> fn;
53 : const char* filename;
54 : uint32_t linum;
55 :
56 : inline operator bool() const {
57 : return static_cast<bool>(fn);
58 : }
59 :
60 0 : void reset() {
61 0 : fn = {};
62 0 : }
63 : };
64 :
65 : struct RepeatedJob {
66 0 : RepeatedJob(std::function<bool()>&& f, const char* file, uint32_t l)
67 0 : : fn(std::move(f))
68 0 : , filename(file)
69 0 : , linum(l) { }
70 :
71 : std::function<bool()> fn;
72 : const char* filename;
73 : uint32_t linum;
74 :
75 0 : inline operator bool() {
76 0 : return static_cast<bool>(fn);
77 : }
78 :
79 0 : void reset() {
80 0 : fn = {};
81 0 : }
82 : };
83 :
84 : /**
85 : * A Job that can be disposed
86 : */
87 : class Task
88 : {
89 : public:
90 2158 : Task(std::function<void()>&& fn, const char* filename, uint32_t linum)
91 2158 : : job_(std::move(fn), filename, linum)
92 2158 : , cookie_(task_cookie++) { }
93 :
94 : #pragma GCC diagnostic push
95 : #pragma GCC diagnostic ignored "-Wunused-parameter"
96 1114 : void run(const char* executor_name)
97 : {
98 1114 : if (job_.fn) {
99 : jami_tracepoint(scheduled_executor_task_begin,
100 : executor_name,
101 : job_.filename, job_.linum,
102 : cookie_);
103 1114 : job_.fn();
104 : jami_tracepoint(scheduled_executor_task_end,
105 : cookie_);
106 : }
107 1114 : }
108 : #pragma GCC pop
109 :
110 0 : void cancel() { job_.reset(); }
111 : bool isCancelled() const { return !job_; }
112 :
113 4316 : Job& job() { return job_; }
114 :
115 : private:
116 : Job job_;
117 : uint64_t cookie_;
118 : };
119 :
120 : /**
121 : * A RepeatedJob that can be disposed
122 : */
123 : class RepeatedTask
124 : {
125 : public:
126 0 : RepeatedTask(std::function<bool()>&& fn, const char* filename,
127 : uint32_t linum)
128 0 : : job_(std::move(fn), filename, linum)
129 0 : , cookie_(task_cookie++) { }
130 :
131 : #pragma GCC diagnostic push
132 : #pragma GCC diagnostic ignored "-Wunused-parameter"
133 0 : bool run(const char* executor_name)
134 : {
135 : bool cont;
136 0 : std::lock_guard l(lock_);
137 :
138 0 : if (not cancel_.load() and job_.fn) {
139 : jami_tracepoint(scheduled_executor_task_begin,
140 : executor_name,
141 : job_.filename, job_.linum,
142 : cookie_);
143 0 : cont = job_.fn();
144 : jami_tracepoint(scheduled_executor_task_end,
145 : cookie_);
146 :
147 : } else {
148 0 : cont = false;
149 : }
150 :
151 0 : if (not cont) {
152 0 : cancel_.store(true);
153 0 : job_.reset();
154 : }
155 :
156 0 : return static_cast<bool>(job_);
157 0 : }
158 : #pragma GCC pop
159 :
160 : void cancel() { cancel_.store(true); }
161 :
162 : void destroy()
163 : {
164 : cancel();
165 : std::lock_guard l(lock_);
166 : job_.reset();
167 : }
168 :
169 : bool isCancelled() const { return cancel_.load(); }
170 :
171 0 : RepeatedJob& job() { return job_; }
172 :
173 : private:
174 : NON_COPYABLE(RepeatedTask);
175 : RepeatedJob job_;
176 : mutable std::mutex lock_;
177 : std::atomic_bool cancel_ {false};
178 : uint64_t cookie_;
179 : };
180 :
181 : class ScheduledExecutor
182 : {
183 : public:
184 : using clock = std::chrono::steady_clock;
185 : using time_point = clock::time_point;
186 : using duration = clock::duration;
187 :
188 : ScheduledExecutor(const std::string& name_);
189 : ~ScheduledExecutor();
190 :
191 : /**
192 : * Schedule job to be run ASAP
193 : */
194 : void run(std::function<void()>&& job,
195 : const char* filename=CURRENT_FILENAME(),
196 : uint32_t linum=CURRENT_LINE());
197 :
198 : /**
199 : * Schedule job to be run at time t
200 : */
201 : std::shared_ptr<Task> schedule(std::function<void()>&& job, time_point t,
202 : const char* filename=CURRENT_FILENAME(),
203 : uint32_t linum=CURRENT_LINE());
204 :
205 : /**
206 : * Schedule job to be run after delay dt
207 : */
208 : std::shared_ptr<Task> scheduleIn(std::function<void()>&& job, duration dt,
209 : const char* filename=CURRENT_FILENAME(),
210 : uint32_t linum=CURRENT_LINE());
211 :
212 : /**
213 : * Schedule job to be run every dt, starting now.
214 : */
215 : std::shared_ptr<RepeatedTask> scheduleAtFixedRate(std::function<bool()>&& job,
216 : duration dt,
217 : const char* filename=CURRENT_FILENAME(),
218 : uint32_t linum=CURRENT_LINE());
219 :
220 : /**
221 : * Stop the scheduler, can't be reversed
222 : */
223 : void stop();
224 :
225 : private:
226 : NON_COPYABLE(ScheduledExecutor);
227 :
228 : void loop();
229 : void schedule(std::shared_ptr<Task>, time_point t);
230 : void reschedule(std::shared_ptr<RepeatedTask>, time_point t, duration dt);
231 :
232 : std::string name_;
233 : std::shared_ptr<std::atomic<bool>> running_;
234 : std::map<time_point, std::vector<Job>> jobs_ {};
235 : std::mutex jobLock_ {};
236 : std::condition_variable cv_ {};
237 : std::thread thread_;
238 : };
239 :
240 : } // namespace jami
|