1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
#pragma once
#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
namespace DB
{
class StorageReplicatedMergeTree;
/**
* This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase
*/
class ReplicatedMergeMutateTaskBase : public IExecutableTask
{
public:
ReplicatedMergeMutateTaskBase(
Poco::Logger * log_,
StorageReplicatedMergeTree & storage_,
ReplicatedMergeTreeQueue::SelectedEntryPtr & selected_entry_,
IExecutableTask::TaskResultCallback & task_result_callback_)
: storage(storage_)
, selected_entry(selected_entry_)
, entry(*selected_entry->log_entry)
, log(log_)
/// This is needed to ask an asssignee to assign a new merge/mutate operation
/// It takes bool argument and true means that current task is successfully executed.
, task_result_callback(task_result_callback_)
{
}
~ReplicatedMergeMutateTaskBase() override = default;
void onCompleted() override;
StorageID getStorageID() const override;
String getQueryId() const override { return getStorageID().getShortName() + "::" + selected_entry->log_entry->new_part_name; }
bool executeStep() override;
protected:
using PartLogWriter = std::function<void(const ExecutionStatus &)>;
struct PrepareResult
{
bool prepared_successfully;
bool need_to_check_missing_part_in_fetch;
PartLogWriter part_log_writer;
};
virtual PrepareResult prepare() = 0;
virtual bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) = 0;
/// Will execute a part of inner MergeTask or MutateTask
virtual bool executeInnerTask() = 0;
StorageReplicatedMergeTree & storage;
/// A callback to reschedule merge_selecting_task after destroying merge_mutate_entry
/// The order is important, because merge_selecting_task may rely on the number of entries in MergeList
scope_guard finish_callback;
/// This is important not to execute the same mutation in parallel
/// selected_entry is a RAII class, so the time of living must be the same as for the whole task
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry;
ReplicatedMergeTreeLogEntry & entry;
MergeList::EntryPtr merge_mutate_entry{nullptr};
Poco::Logger * log;
/// ProfileEvents for current part will be stored here
ProfileEvents::Counters profile_counters;
ContextMutablePtr task_context;
private:
enum class CheckExistingPartResult
{
PART_EXISTS,
OK
};
CheckExistingPartResult checkExistingPart();
bool executeImpl();
enum class State
{
NEED_PREPARE,
NEED_EXECUTE_INNER_MERGE,
NEED_FINALIZE,
SUCCESS
};
PartLogWriter part_log_writer{};
State state{State::NEED_PREPARE};
IExecutableTask::TaskResultCallback task_result_callback;
};
}
|