aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h
blob: ba514f11f2097dba946d71f2bbe841f9a1528b3a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#pragma once


#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>


namespace DB
{

class StorageReplicatedMergeTree;

/**
 * This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase
 */
class ReplicatedMergeMutateTaskBase : public IExecutableTask
{
public:
    ReplicatedMergeMutateTaskBase(
        Poco::Logger * log_,
        StorageReplicatedMergeTree & storage_,
        ReplicatedMergeTreeQueue::SelectedEntryPtr & selected_entry_,
        IExecutableTask::TaskResultCallback & task_result_callback_)
        : storage(storage_)
        , selected_entry(selected_entry_)
        , entry(*selected_entry->log_entry)
        , log(log_)
        /// This is needed to ask an asssignee to assign a new merge/mutate operation
        /// It takes bool argument and true means that current task is successfully executed.
        , task_result_callback(task_result_callback_)
    {
    }

    ~ReplicatedMergeMutateTaskBase() override = default;
    void onCompleted() override;
    StorageID getStorageID() const override;
    String getQueryId() const override { return getStorageID().getShortName() + "::" + selected_entry->log_entry->new_part_name; }
    bool executeStep() override;

protected:
    using PartLogWriter =  std::function<void(const ExecutionStatus &)>;

    struct PrepareResult
    {
        bool prepared_successfully;
        bool need_to_check_missing_part_in_fetch;
        PartLogWriter part_log_writer;
    };

    virtual PrepareResult prepare() = 0;
    virtual bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) = 0;

    /// Will execute a part of inner MergeTask or MutateTask
    virtual bool executeInnerTask() = 0;

    StorageReplicatedMergeTree & storage;

    /// A callback to reschedule merge_selecting_task after destroying merge_mutate_entry
    /// The order is important, because merge_selecting_task may rely on the number of entries in MergeList
    scope_guard finish_callback;

    /// This is important not to execute the same mutation in parallel
    /// selected_entry is a RAII class, so the time of living must be the same as for the whole task
    ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry;
    ReplicatedMergeTreeLogEntry & entry;
    MergeList::EntryPtr merge_mutate_entry{nullptr};
    Poco::Logger * log;
    /// ProfileEvents for current part will be stored here
    ProfileEvents::Counters profile_counters;
    ContextMutablePtr task_context;

private:
    enum class CheckExistingPartResult
    {
        PART_EXISTS,
        OK
    };

    CheckExistingPartResult checkExistingPart();
    bool executeImpl();

    enum class State
    {
        NEED_PREPARE,
        NEED_EXECUTE_INNER_MERGE,
        NEED_FINALIZE,

        SUCCESS
    };

    PartLogWriter part_log_writer{};
    State state{State::NEED_PREPARE};
    IExecutableTask::TaskResultCallback task_result_callback;
};

}