Line data Source code
1 : /*
2 : * Copyright (C) 2004-2024 Savoir-faire Linux Inc.
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 : */
17 : #include "scheduled_executor.h"
18 : #include "logger.h"
19 :
20 : namespace jami {
21 :
22 : std::atomic<uint64_t> task_cookie = {0};
23 :
24 39 : ScheduledExecutor::ScheduledExecutor(const std::string& name)
25 39 : : name_(name)
26 39 : , running_(std::make_shared<std::atomic<bool>>(true))
27 39 : , thread_([this, is_running = running_] {
28 : // The thread needs its own reference of `running_` in case the
29 : // scheduler is destroyed within the thread because of a job
30 :
31 30735 : while (*is_running)
32 30696 : loop();
33 39 : })
34 39 : {}
35 :
36 78 : ScheduledExecutor::~ScheduledExecutor()
37 : {
38 39 : stop();
39 :
40 39 : if (not thread_.joinable()) {
41 0 : return;
42 : }
43 :
44 : // Avoid deadlock
45 39 : if (std::this_thread::get_id() == thread_.get_id()) {
46 0 : thread_.detach();
47 : } else {
48 39 : thread_.join();
49 : }
50 39 : }
51 :
52 : void
53 78 : ScheduledExecutor::stop()
54 : {
55 78 : std::lock_guard lock(jobLock_);
56 78 : *running_ = false;
57 78 : jobs_.clear();
58 78 : cv_.notify_all();
59 78 : }
60 :
61 : void
62 29548 : ScheduledExecutor::run(std::function<void()>&& job,
63 : const char* filename, uint32_t linum)
64 : {
65 29548 : std::lock_guard lock(jobLock_);
66 29548 : auto now = clock::now();
67 29548 : jobs_[now].emplace_back(std::move(job), filename, linum);
68 29548 : cv_.notify_all();
69 29548 : }
70 :
71 : std::shared_ptr<Task>
72 2157 : ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
73 : const char* filename, uint32_t linum)
74 : {
75 2157 : auto ret = std::make_shared<Task>(std::move(job), filename, linum);
76 2157 : schedule(ret, t);
77 2157 : return ret;
78 0 : }
79 :
80 : std::shared_ptr<Task>
81 2157 : ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
82 : const char* filename, uint32_t linum)
83 : {
84 6471 : return schedule(std::move(job), clock::now() + dt,
85 6471 : filename, linum);
86 : }
87 :
88 : std::shared_ptr<RepeatedTask>
89 0 : ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
90 : duration dt,
91 : const char* filename, uint32_t linum)
92 : {
93 0 : auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
94 0 : reschedule(ret, clock::now(), dt);
95 0 : return ret;
96 0 : }
97 :
98 : void
99 0 : ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
100 : {
101 0 : const char* filename = task->job().filename;
102 0 : uint32_t linenum = task->job().linum;
103 0 : schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
104 0 : if (task->run(name_.c_str()))
105 0 : reschedule(std::move(task), t + dt, dt);
106 0 : }, filename, linenum),
107 : t);
108 0 : }
109 :
110 : void
111 2157 : ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
112 : {
113 2157 : const char* filename = task->job().filename;
114 2157 : uint32_t linenum = task->job().linum;
115 2157 : std::lock_guard lock(jobLock_);
116 3267 : jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
117 : filename, linenum);
118 2157 : cv_.notify_all();
119 2157 : }
120 :
121 : void
122 30696 : ScheduledExecutor::loop()
123 : {
124 30696 : std::vector<Job> jobs;
125 : {
126 30696 : std::unique_lock lock(jobLock_);
127 49450 : while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
128 18754 : if (jobs_.empty())
129 16192 : cv_.wait(lock);
130 : else {
131 2562 : auto nextJob = jobs_.begin()->first;
132 2562 : cv_.wait_until(lock, nextJob);
133 : }
134 : }
135 30696 : if (not *running_)
136 38 : return;
137 30658 : jobs = std::move(jobs_.begin()->second);
138 30658 : jobs_.erase(jobs_.begin());
139 30696 : }
140 61316 : for (auto& job : jobs) {
141 : try {
142 30658 : job.fn();
143 0 : } catch (const std::exception& e) {
144 0 : JAMI_ERR("Exception running job: %s", e.what());
145 0 : }
146 : }
147 30696 : }
148 :
149 : } // namespace jami
|