aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp
blob: e7a431dc1d00dc619da631b77552f1a8ed08afd3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Columns/ColumnsNumber.h>
#include <IO/WriteBuffer.h>

namespace DB
{

static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;

VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
    const Block & header_,
    size_t num_inputs,
    SortDescription description_,
    const String & sign_column_,
    size_t max_block_size_rows_,
    size_t max_block_size_bytes_,
    WriteBuffer * out_row_sources_buf_,
    bool use_average_block_sizes)
    : IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE)
    , merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_)
    /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
    , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
    , current_keys(max_rows_in_queue)
{
    sign_column_number = header_.getPositionByName(sign_column_);
}

inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{
    if constexpr (sizeof(RowSourcePart) == 1)
        buffer.write(*reinterpret_cast<const char *>(&row_source));
    else
        buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
}

void VersionedCollapsingAlgorithm::insertGap(size_t gap_size)
{
    if (out_row_sources_buf)
    {
        for (size_t i = 0; i < gap_size; ++i)
        {
            writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
            current_row_sources.pop();
        }
    }
}

void VersionedCollapsingAlgorithm::insertRow(size_t skip_rows, const RowRef & row)
{
    merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows());

    insertGap(skip_rows);

    if (out_row_sources_buf)
    {
        current_row_sources.front().setSkipFlag(false);
        writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
        current_row_sources.pop();
    }
}

IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
{
    /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
    while (queue.isValid())
    {
        SortCursor current = queue.current();

        if (current->isLast() && skipLastRowFor(current->order))
        {
            /// Get the next block from the corresponding source, if there is one.
            queue.removeTop();
            return Status(current.impl->order);
        }

        RowRef current_row;

        Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()];

        setRowRef(current_row, current);

        /// At first, let's decide the number of rows needed to insert right now.
        size_t num_rows_to_insert = 0;
        if (!current_keys.empty())
        {
            auto key_differs = !current_row.hasEqualSortColumnsWith(current_keys.back());

            if (key_differs) /// Flush whole queue
                num_rows_to_insert = current_keys.size();
            else if (current_keys.size() >= max_rows_in_queue) /// Flush single row if queue is big
                num_rows_to_insert = 1;
        }

        /// Insert ready rows if any.
        while (num_rows_to_insert)
        {
            const auto & row = current_keys.front();
            auto gap = current_keys.frontGap();

            insertRow(gap, row);

            current_keys.popFront();

            --num_rows_to_insert;

            /// It's ok to return here, because we didn't affect queue.
            if (merged_data.hasEnoughRows())
                return Status(merged_data.pull());
        }

        if (current_keys.empty())
        {
            sign_in_queue = sign;
            current_keys.pushBack(current_row);
        }
        else /// If queue is not empty, then current_row has the same key as in current_keys queue
        {
            if (sign == sign_in_queue)
                current_keys.pushBack(current_row);
            else
            {
                current_keys.popBack();
                current_keys.pushGap(2);
            }
        }

        if (out_row_sources_buf)
            current_row_sources.emplace(current->order, true);

        if (!current->isLast())
        {
            queue.next();
        }
        else
        {
            /// We take next block from the corresponding source, if there is one.
            queue.removeTop();
            return Status(current.impl->order);
        }
    }

    while (!current_keys.empty())
    {
        const auto & row = current_keys.front();
        auto gap = current_keys.frontGap();

        insertRow(gap, row);
        current_keys.popFront();

        if (merged_data.hasEnoughRows())
            return Status(merged_data.pull());
    }

    /// Write information about last collapsed rows.
    insertGap(current_keys.frontGap());
    return Status(merged_data.pull(), true);
}

}