aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MarkRange.cpp
blob: bd8546f04cc7c5c6a8302fd8a5e7e3c50e9d223e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include "MarkRange.h"

#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

size_t MarkRange::getNumberOfMarks() const
{
    return end - begin;
}

bool MarkRange::operator==(const MarkRange & rhs) const
{
    return begin == rhs.begin && end == rhs.end;
}

bool MarkRange::operator<(const MarkRange & rhs) const
{
    /// We allow only consecutive non-intersecting ranges
    /// Here we check whether a beginning of one range lies inside another range
    /// (ranges are intersect)
    if (this != &rhs)
    {
        const bool is_intersection = (begin <= rhs.begin && rhs.begin < end) ||
            (rhs.begin <= begin && begin < rhs.end);

        if (is_intersection)
            throw Exception(
                ErrorCodes::LOGICAL_ERROR,
                "Intersecting mark ranges are not allowed, it is a bug! "
                "First range ({}, {}), second range ({}, {})",
                begin, end, rhs.begin, rhs.end);
    }

    return begin < rhs.begin && end <= rhs.begin;
}

size_t getLastMark(const MarkRanges & ranges)
{
    size_t current_task_last_mark = 0;
    for (const auto & mark_range : ranges)
        current_task_last_mark = std::max(current_task_last_mark, mark_range.end);
    return current_task_last_mark;
}

std::string toString(const MarkRanges & ranges)
{
    std::string result;
    for (const auto & mark_range : ranges)
    {
        if (!result.empty())
            result += ", ";
        result += "(" + std::to_string(mark_range.begin) + ", " + std::to_string(mark_range.end) + ")";
    }
    return result;
}

void assertSortedAndNonIntersecting(const MarkRanges & ranges)
{
    MarkRanges ranges_copy(ranges.begin(), ranges.end());
    /// Should also throw an exception if interseting range is found during comparison.
    std::sort(ranges_copy.begin(), ranges_copy.end());
    if (ranges_copy != ranges)
        throw Exception(
            ErrorCodes::LOGICAL_ERROR, "Expected sorted and non intersecting ranges. Ranges: {}",
            toString(ranges));
}

size_t MarkRanges::getNumberOfMarks() const
{
    size_t result = 0;
    for (const auto & mark : *this)
        result += mark.getNumberOfMarks();
    return result;
}

void MarkRanges::serialize(WriteBuffer & out) const
{
    writeBinaryLittleEndian(this->size(), out);

    for (const auto & [begin, end] : *this)
    {
        writeBinaryLittleEndian(begin, out);
        writeBinaryLittleEndian(end, out);
    }
}

String MarkRanges::describe() const
{
    return fmt::format("Size: {}, Data: {}", this->size(), fmt::join(*this, ","));
}

void MarkRanges::deserialize(ReadBuffer & in)
{
    size_t size = 0;
    readBinaryLittleEndian(size, in);

    this->resize(size);
    for (size_t i = 0; i < size; ++i)
    {
        readBinaryLittleEndian((*this)[i].begin, in);
        readBinaryLittleEndian((*this)[i].end, in);
    }
}

}