aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeList.h
blob: d40af6abf4328cf657ef73841001555228d1d73a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#pragma once

#include <Core/Names.h>
#include <Core/Field.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadStatus.h>
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <list>
#include <mutex>
#include <atomic>


namespace CurrentMetrics
{
    extern const Metric Merge;
}

namespace DB
{

struct MergeInfo
{
    std::string database;
    std::string table;
    std::string result_part_name;
    std::string result_part_path;
    Array source_part_names;
    Array source_part_paths;
    std::string partition_id;
    std::string partition;
    bool is_mutation;
    Float64 elapsed;
    Float64 progress;
    UInt64 num_parts;
    UInt64 total_size_bytes_compressed;
    UInt64 total_size_bytes_uncompressed;
    UInt64 total_size_marks;
    UInt64 total_rows_count;
    UInt64 bytes_read_uncompressed;
    UInt64 bytes_written_uncompressed;
    UInt64 rows_read;
    UInt64 rows_written;
    UInt64 columns_written;
    UInt64 memory_usage;
    UInt64 thread_id;
    std::string merge_type;
    std::string merge_algorithm;
};

struct FutureMergedMutatedPart;
using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;

struct MergeListElement;
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;

struct Settings;


struct MergeListElement : boost::noncopyable
{
    const StorageID table_id;
    std::string partition_id;
    std::string partition;

    const std::string result_part_name;
    const std::string result_part_path;
    MergeTreePartInfo result_part_info;
    bool is_mutation{};

    UInt64 num_parts{};
    Names source_part_names;
    Names source_part_paths;
    Int64 source_data_version{};

    Stopwatch watch;
    std::atomic<Float64> progress{};
    std::atomic<bool> is_cancelled{};

    UInt64 total_size_bytes_compressed{};
    UInt64 total_size_bytes_uncompressed{};
    UInt64 total_size_marks{};
    UInt64 total_rows_count{};
    std::atomic<UInt64> bytes_read_uncompressed{};
    std::atomic<UInt64> bytes_written_uncompressed{};

    /// In case of Vertical algorithm they are actual only for primary key columns
    std::atomic<UInt64> rows_read{};
    std::atomic<UInt64> rows_written{};

    /// Updated only for Vertical algorithm
    std::atomic<UInt64> columns_written{};

    UInt64 thread_id;
    MergeType merge_type;
    /// Detected after merge already started
    std::atomic<MergeAlgorithm> merge_algorithm;

    ThreadGroupPtr thread_group;

    MergeListElement(
        const StorageID & table_id_,
        FutureMergedMutatedPartPtr future_part,
        const ContextPtr & context);

    MergeInfo getInfo() const;

    const MemoryTracker & getMemoryTracker() const { return thread_group->memory_tracker; }

    MergeListElement * ptr() { return this; }

    MergeListElement & ref() { return *this; }

    ~MergeListElement();
};

/** Maintains a list of currently running merges.
  * For implementation of system.merges table.
  */
class MergeList final : public BackgroundProcessList<MergeListElement, MergeInfo>
{
private:
    using Parent = BackgroundProcessList<MergeListElement, MergeInfo>;
    std::atomic<size_t> merges_with_ttl_counter = 0;
public:
    MergeList()
        : Parent(CurrentMetrics::Merge)
    {}

    void onEntryDestroy(const Parent::Entry & entry) override
    {
        if (isTTLMergeType(entry->merge_type))
            --merges_with_ttl_counter;
    }

    void cancelPartMutations(const StorageID & table_id, const String & partition_id, Int64 mutation_version)
    {
        std::lock_guard lock{mutex};
        for (auto & merge_element : entries)
        {
            if ((partition_id.empty() || merge_element.partition_id == partition_id)
                && merge_element.table_id == table_id
                && merge_element.source_data_version < mutation_version
                && merge_element.result_part_info.getDataVersion() >= mutation_version)
                merge_element.is_cancelled = true;
        }
    }

    void cancelInPartition(const StorageID & table_id, const String & partition_id, Int64 delimiting_block_number)
    {
        std::lock_guard lock{mutex};
        for (auto & merge_element : entries)
        {
            if (merge_element.table_id == table_id
                && merge_element.partition_id == partition_id
                && merge_element.result_part_info.min_block < delimiting_block_number)
                merge_element.is_cancelled = true;
        }
    }

    /// Merge consists of two parts: assignment and execution. We add merge to
    /// merge list on execution, but checking merge list during merge
    /// assignment. This lead to the logical race condition (we can assign more
    /// merges with TTL than allowed). So we "book" merge with ttl during
    /// assignment, and remove from list after merge execution.
    ///
    /// NOTE: Not important for replicated merge tree, we check count of merges twice:
    /// in assignment and in queue before execution.
    void bookMergeWithTTL()
    {
        ++merges_with_ttl_counter;
    }

    void cancelMergeWithTTL()
    {
        --merges_with_ttl_counter;
    }

    size_t getMergesWithTTLCount() const
    {
        return merges_with_ttl_counter;
    }
};

}