1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
#pragma once
#include <Poco/Notification.h>
#include <Poco/NotificationQueue.h>
#include <Poco/Timestamp.h>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <map>
#include <functional>
#include <boost/noncopyable.hpp>
#include <Common/ZooKeeper/Types.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool_fwd.h>
#include <base/scope_guard.h>
namespace DB
{
class TaskNotification;
class BackgroundSchedulePoolTaskInfo;
class BackgroundSchedulePoolTaskHolder;
/** Executes functions scheduled at a specific point in time.
* Basically all tasks are added in a queue and precessed by worker threads.
*
* The most important difference between this and BackgroundProcessingPool
* is that we have the guarantee that the same function is not executed from many workers in the same time.
*
* The usage scenario: instead starting a separate thread for each task,
* register a task in BackgroundSchedulePool and when you need to run the task,
* call schedule or scheduleAfter(duration) method.
*/
class BackgroundSchedulePool
{
public:
friend class BackgroundSchedulePoolTaskInfo;
using TaskInfo = BackgroundSchedulePoolTaskInfo;
using TaskInfoPtr = std::shared_ptr<TaskInfo>;
using TaskFunc = std::function<void()>;
using TaskHolder = BackgroundSchedulePoolTaskHolder;
using DelayedTasks = std::multimap<Poco::Timestamp, TaskInfoPtr>;
TaskHolder createTask(const std::string & log_name, const TaskFunc & function);
/// As for MergeTreeBackgroundExecutor we refuse to implement tasks eviction, because it will
/// be error prone. We support only increasing number of threads at runtime.
void increaseThreadsCount(size_t new_threads_count);
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_);
~BackgroundSchedulePool();
private:
/// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level.
/// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information.
using Threads = std::vector<ThreadFromGlobalPoolNoTracingContextPropagation>;
void threadFunction();
void delayExecutionThreadFunction();
void scheduleTask(TaskInfoPtr task_info);
/// Schedule task for execution after specified delay from now.
void scheduleDelayedTask(const TaskInfoPtr & task_info, size_t ms, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
/// Remove task, that was scheduled with delay, from schedule.
void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
std::atomic<bool> shutdown {false};
/// Tasks.
std::condition_variable tasks_cond_var;
std::mutex tasks_mutex;
std::deque<TaskInfoPtr> tasks;
Threads threads;
/// Delayed tasks.
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
std::unique_ptr<ThreadFromGlobalPoolNoTracingContextPropagation> delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;
CurrentMetrics::Metric tasks_metric;
CurrentMetrics::Increment size_metric;
std::string thread_name;
};
class BackgroundSchedulePoolTaskInfo : public std::enable_shared_from_this<BackgroundSchedulePoolTaskInfo>, private boost::noncopyable
{
public:
BackgroundSchedulePoolTaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_);
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
bool schedule();
/// Schedule for execution after specified delay.
/// If overwrite is set then the task will be re-scheduled (if it was already scheduled, i.e. delayed == true).
bool scheduleAfter(size_t milliseconds, bool overwrite = true);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback();
/// Returns lock that protects from concurrent task execution.
/// This lock should not be held for a long time.
std::unique_lock<std::mutex> getExecLock();
private:
friend class TaskNotification;
friend class BackgroundSchedulePool;
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
BackgroundSchedulePool::TaskFunc function;
std::mutex exec_mutex;
std::mutex schedule_mutex;
/// Invariants:
/// * If deactivated is true then scheduled, delayed and executing are all false.
/// * scheduled and delayed cannot be true at the same time.
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
/// If the task is scheduled with delay, points to element of delayed_tasks.
BackgroundSchedulePool::DelayedTasks::iterator iterator;
};
using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr<BackgroundSchedulePoolTaskInfo>;
class BackgroundSchedulePoolTaskHolder
{
public:
BackgroundSchedulePoolTaskHolder() = default;
explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
BackgroundSchedulePoolTaskHolder(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
BackgroundSchedulePoolTaskHolder & operator=(const BackgroundSchedulePoolTaskHolder & other) noexcept = delete;
BackgroundSchedulePoolTaskHolder & operator=(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
~BackgroundSchedulePoolTaskHolder()
{
if (task_info)
task_info->deactivate();
}
explicit operator bool() const { return task_info != nullptr; }
BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
private:
BackgroundSchedulePoolTaskInfoPtr task_info;
};
}
|