aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/ColumnGathererTransform.h
blob: da6dc877abff7a4ce480b79fe166cdf2207fd82c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#pragma once

#include <IO/ReadBuffer.h>
#include <Common/PODArray.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>


namespace Poco { class Logger; }


namespace DB
{


/// Tiny struct, stores number of a Part from which current row was fetched, and insertion flag.
struct RowSourcePart
{
    UInt8 data = 0;

    RowSourcePart() = default;

    explicit RowSourcePart(size_t source_num, bool skip_flag = false)
    {
        static_assert(sizeof(*this) == 1, "Size of RowSourcePart is too big due to compiler settings");
        setSourceNum(source_num);
        setSkipFlag(skip_flag);
    }

    size_t getSourceNum() const { return data & MASK_NUMBER; }

    /// In CollapsingMergeTree case flag means "skip this rows"
    bool getSkipFlag() const { return (data & MASK_FLAG) != 0; }

    void setSourceNum(size_t source_num)
    {
        data = (data & MASK_FLAG) | (static_cast<UInt8>(source_num) & MASK_NUMBER);
    }

    void setSkipFlag(bool flag)
    {
        data = flag ? data | MASK_FLAG : data & ~MASK_FLAG;
    }

    static constexpr size_t MAX_PARTS = 0x7F;
    static constexpr UInt8 MASK_NUMBER = 0x7F;
    static constexpr UInt8 MASK_FLAG = 0x80;
};

using MergedRowSources = PODArray<RowSourcePart>;


/** Gather single stream from multiple streams according to streams mask.
  * Stream mask maps row number to index of source stream.
  * Streams should contain exactly one column.
  */
class ColumnGathererStream final : public IMergingAlgorithm
{
public:
    ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);

    void initialize(Inputs inputs) override;
    void consume(Input & input, size_t source_num) override;
    Status merge() override;

    /// for use in implementations of IColumn::gather()
    template <typename Column>
    void gather(Column & column_res);

    UInt64 getMergedRows() const { return merged_rows; }
    UInt64 getMergedBytes() const { return merged_bytes; }

private:
    /// Cache required fields
    struct Source
    {
        ColumnPtr column;
        size_t pos = 0;
        size_t size = 0;

        void update(ColumnPtr column_)
        {
            column = std::move(column_);
            size = column->size();
            pos = 0;
        }
    };

    MutableColumnPtr result_column;

    std::vector<Source> sources;
    ReadBuffer & row_sources_buf;

    const size_t block_preferred_size;

    Source * source_to_fully_copy = nullptr;

    ssize_t next_required_source = -1;
    size_t cur_block_preferred_size = 0;

    UInt64 merged_rows = 0;
    UInt64 merged_bytes = 0;
};

class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream>
{
public:
    ColumnGathererTransform(
        const Block & header,
        size_t num_inputs,
        ReadBuffer & row_sources_buf_,
        size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);

    String getName() const override { return "ColumnGathererTransform"; }

    void work() override;

protected:
    void onFinish() override;
    UInt64 elapsed_ns = 0;

    Poco::Logger * log;
};


template <typename Column>
void ColumnGathererStream::gather(Column & column_res)
{
    row_sources_buf.nextIfAtEnd();
    RowSourcePart * row_source_pos = reinterpret_cast<RowSourcePart *>(row_sources_buf.position());
    RowSourcePart * row_sources_end = reinterpret_cast<RowSourcePart *>(row_sources_buf.buffer().end());

    if (next_required_source == -1)
    {
        /// Start new column.
        cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
        column_res.reserve(cur_block_preferred_size);
    }

    size_t cur_size = column_res.size();
    next_required_source = -1;

    while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size)
    {
        RowSourcePart row_source = *row_source_pos;
        size_t source_num = row_source.getSourceNum();
        Source & source = sources[source_num];
        bool source_skip = row_source.getSkipFlag();

        if (source.pos >= source.size) /// Fetch new block from source_num part
        {
            next_required_source = source_num;
            return;
        }

        ++row_source_pos;

        /// Consecutive optimization. TODO: precompute lengths
        size_t len = 1;
        size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block
        while (len < max_len && row_source_pos->data == row_source.data)
        {
            ++len;
            ++row_source_pos;
        }

        row_sources_buf.position() = reinterpret_cast<char *>(row_source_pos);

        if (!source_skip)
        {
            /// Whole block could be produced via copying pointer from current block
            if (source.pos == 0 && source.size == len)
            {
                /// If current block already contains data, return it.
                /// Whole column from current source will be returned on next read() iteration.
                source_to_fully_copy = &source;
                return;
            }
            else if (len == 1)
                column_res.insertFrom(*source.column, source.pos);
            else
                column_res.insertRangeFrom(*source.column, source.pos, len);

            cur_size += len;
        }

        source.pos += len;
    }
}

}