aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/IntersectionsIndexes.h
blob: d9445f446ce3d060d781a4477f35f72c0541d8ec (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#pragma once

#include <fmt/format.h>
#include <Storages/MergeTree/RequestResponse.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

/// A boundary of a segment (left or right)
struct PartToRead
{
    PartBlockRange range;
    struct PartAndProjectionNames
    {
        String part;
        String projection;
        bool operator<(const PartAndProjectionNames & rhs) const
        {
            if (part == rhs.part)
                return projection < rhs.projection;
            return part < rhs.part;
        }
        bool operator==(const PartAndProjectionNames & rhs) const
        {
            return part == rhs.part && projection == rhs.projection;
        }
    };

    PartAndProjectionNames name;

    bool operator==(const PartToRead & rhs) const
    {
        return range == rhs.range && name == rhs.name;
    }

    bool operator<(const PartToRead & rhs) const
    {
        /// We allow only consecutive non-intersecting ranges
        const bool intersection =
            (range.begin <= rhs.range.begin && rhs.range.begin < range.end) ||
            (rhs.range.begin <= range.begin && range.begin <= rhs.range.end);
        if (intersection)
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Got intersecting parts. First [{}, {}]. Second [{}, {}]",
                range.begin, range.end, rhs.range.begin, rhs.range.end);
        return range.begin < rhs.range.begin && range.end <= rhs.range.begin;
    }
};

/// MergeTreeDataPart is described as a segment (min block and max block)
/// During request handling we have to know how many intersection
/// current part has with already saved parts in our state.
struct PartSegments
{
    enum class IntersectionResult
    {
        NO_INTERSECTION,
        EXACTLY_ONE_INTERSECTION,
        REJECT
    };

    void addPart(PartToRead part) { segments.insert(std::move(part)); }

    IntersectionResult getIntersectionResult(PartToRead part)
    {
        bool intersected_before = false;
        for (const auto & segment: segments)
        {
            auto are_intersect = [](auto & x, auto & y)
            {
                /// <= is important here, because we are working with segments [a, b]
                if ((x.begin <= y.begin) && (y.begin <= x.end))
                    return true;
                if ((y.begin <= x.begin) && (x.begin <= y.end))
                    return true;
                return false;
            };

            if (are_intersect(segment.range, part.range))
            {
                /// We have two or possibly more intersections
                if (intersected_before)
                    return IntersectionResult::REJECT;

                /// We have intersection with part with different name
                /// or with different min or max block
                /// It could happens if we have merged part on one replica
                /// but not on another.
                if (segment != part)
                    return IntersectionResult::REJECT;

                /// We allow only the intersection with the same part as we have
                intersected_before = true;
            }
        }

        return intersected_before ? IntersectionResult::EXACTLY_ONE_INTERSECTION : IntersectionResult::NO_INTERSECTION;
    }

    using OrderedSegments = std::set<PartToRead>;
    OrderedSegments segments;
};

/// This is used only in parallel reading from replicas
/// This struct is an ordered set of half intervals and it is responsible for
/// giving an inversion of that intervals (e.g. [a, b) => {[-inf, a), [b, +inf)})
/// or giving an intersection of two sets of intervals
/// This is needed, because MarkRange is actually a half-opened interval
/// and during the query execution we receive some kind of request from every replica
/// to read some ranges from a specific part.
/// We have to avoid the situation, where some range is read twice.
/// This struct helps us to do it using only two operations (intersection and inversion)
/// over a set of half opened intervals.
struct HalfIntervals
{
    static HalfIntervals initializeWithEntireSpace()
    {
        auto left_inf = std::numeric_limits<decltype(MarkRange::begin)>::min();
        auto right_inf = std::numeric_limits<decltype(MarkRange::end)>::max();
        return HalfIntervals{{{left_inf, right_inf}}};
    }

    static HalfIntervals initializeFromMarkRanges(MarkRanges ranges)
    {
        OrderedRanges new_intervals;
        for (const auto & range : ranges)
            new_intervals.insert(range);

        return HalfIntervals{std::move(new_intervals)};
    }

    MarkRanges convertToMarkRangesFinal()
    {
        MarkRanges result;
        std::copy(intervals.begin(), intervals.end(), std::back_inserter(result));
        return result;
    }

    HalfIntervals & intersect(const HalfIntervals & rhs)
    {
        /**
         * first   [   ) [   ) [   ) [  ) [  )
         * second    [       ) [ ) [   )  [    )
         */
        OrderedRanges intersected;

        const auto & first_intervals = intervals;
        auto first = first_intervals.begin();
        const auto & second_intervals = rhs.intervals;
        auto second = second_intervals.begin();

        while (first != first_intervals.end() && second != second_intervals.end())
        {
            auto curr_intersection = MarkRange{
                std::max(second->begin, first->begin),
                std::min(second->end, first->end)
            };

            /// Insert only if segments are intersect
            if (curr_intersection.begin < curr_intersection.end)
                intersected.insert(std::move(curr_intersection));

            if (first->end <= second->end)
                ++first;
            else
                ++second;
        }

        std::swap(intersected, intervals);

        return *this;
    }

    HalfIntervals & negate()
    {
        auto left_inf = std::numeric_limits<decltype(MarkRange::begin)>::min();
        auto right_inf = std::numeric_limits<decltype(MarkRange::end)>::max();

        if (intervals.empty())
        {
            intervals.insert(MarkRange{left_inf, right_inf});
            return *this;
        }

        OrderedRanges new_ranges;

        /// Possibly add (-inf; begin)
        if (auto begin = intervals.begin()->begin; begin != left_inf)
            new_ranges.insert(MarkRange{left_inf, begin});

        auto prev = intervals.begin();
        for (auto it = std::next(intervals.begin()); it != intervals.end(); ++it)
        {
            if (prev->end != it->begin)
                new_ranges.insert(MarkRange{prev->end, it->begin});
            prev = it;
        }

        /// Try to add (end; +inf)
        if (auto end = intervals.rbegin()->end; end != right_inf)
            new_ranges.insert(MarkRange{end, right_inf});

        std::swap(new_ranges, intervals);

        return *this;
    }

    bool operator==(const HalfIntervals & rhs) const
    {
        return intervals == rhs.intervals;
    }

    using OrderedRanges = std::set<MarkRange>;
    OrderedRanges intervals;
};


[[ maybe_unused ]] static std::ostream & operator<< (std::ostream & out, const HalfIntervals & ranges)
{
    for (const auto & range: ranges.intervals)
        out << fmt::format("({}, {}) ", range.begin, range.end);
    return out;
}

/// This is needed for tests where we don't need to modify objects
[[ maybe_unused ]] static HalfIntervals getIntersection(const HalfIntervals & first, const HalfIntervals & second)
{
    auto result = first;
    result.intersect(second);
    return result;
}

}