blob: 27e75a79b9731415c4159498641be8bce2f70b48 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
#pragma once
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Core/BackgroundSchedulePool.h>
#include <pcg_random.hpp>
namespace DB
{
/// Settings for background tasks scheduling. Each background assignee has one
/// BackgroundSchedulingPoolTask and depending on execution result may put this
/// task to sleep according to settings. Look at scheduleTask function for details.
struct BackgroundTaskSchedulingSettings
{
double thread_sleep_seconds_random_part = 1.0;
double thread_sleep_seconds_if_nothing_to_do = 0.1;
double task_sleep_seconds_when_no_work_max = 600;
/// For exponential backoff.
double task_sleep_seconds_when_no_work_multiplier = 1.1;
double task_sleep_seconds_when_no_work_random_part = 1.0;
/// Deprecated settings, don't affect background execution
double thread_sleep_seconds = 10;
double task_sleep_seconds_when_no_work_min = 10;
};
class MergeTreeData;
class BackgroundJobsAssignee : public WithContext
{
private:
MergeTreeData & data;
/// Settings for execution control of background scheduling task
BackgroundTaskSchedulingSettings sleep_settings;
/// Useful for random backoff timeouts generation
pcg64 rng;
/// How many times execution of background job failed or we have
/// no new jobs.
size_t no_work_done_count = 0;
/// Scheduling task which assign jobs in background pool
BackgroundSchedulePool::TaskHolder holder;
/// Mutex for thread safety
std::mutex holder_mutex;
public:
/// In case of ReplicatedMergeTree the first assignee will be responsible for
/// polling the replication queue and schedule operations according to the LogEntry type
/// e.g. merges, mutations and fetches. The same will be for Plain MergeTree except there is no
/// replication queue, so we will just scan parts and decide what to do.
/// Moving operations are the same for all types of MergeTree and also have their own timetable.
enum class Type
{
DataProcessing,
Moving
};
Type type{Type::DataProcessing};
void start();
void trigger();
void postpone();
void finish();
bool scheduleMergeMutateTask(ExecutableTaskPtr merge_task);
void scheduleFetchTask(ExecutableTaskPtr fetch_task);
void scheduleMoveTask(ExecutableTaskPtr move_task);
void scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
/// Just call finish
~BackgroundJobsAssignee();
BackgroundJobsAssignee(
MergeTreeData & data_,
Type type,
ContextPtr global_context_);
private:
static String toString(Type type);
/// Function that executes in background scheduling pool
void threadFunc();
};
}
|