aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/WindowTransform.h
blob: de3e82d15ee640660ce1529036f700b35de7f1c8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#pragma once

#include <Interpreters/WindowDescription.h>

#include <Processors/IProcessor.h>

#include <Common/AlignedBuffer.h>

#include <deque>

/// See https://stackoverflow.com/questions/72533435/error-zero-as-null-pointer-constant-while-comparing-template-class-using-spaces
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"


namespace DB
{

class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

class Arena;

// Runtime data for computing one window function.
struct WindowFunctionWorkspace
{
    AggregateFunctionPtr aggregate_function;

    // This field is set for pure window functions. When set, we ignore the
    // window_function.aggregate_function, and work through this interface
    // instead.
    IWindowFunction * window_function_impl = nullptr;

    std::vector<size_t> argument_column_indices;

    // Will not be initialized for a pure window function.
    mutable AlignedBuffer aggregate_function_state;

    // Argument columns. Be careful, this is a per-block cache.
    std::vector<const IColumn *> argument_columns;
    UInt64 cached_block_number = std::numeric_limits<UInt64>::max();
};

struct WindowTransformBlock
{
    Columns original_input_columns;
    Columns input_columns;
    MutableColumns output_columns;

    size_t rows = 0;
};

struct RowNumber
{
    UInt64 block = 0;
    UInt64 row = 0;

    auto operator <=>(const RowNumber &) const = default;
};


/* Computes several window functions that share the same window. The input must
 * be sorted by PARTITION BY (in any order), then by ORDER BY.
 * We need to track the following pointers:
 * 1) boundaries of partition -- rows that compare equal w/PARTITION BY.
 * 2) current row for which we will compute the window functions.
 * 3) boundaries of the frame for this row.
 * Both the peer group and the frame are inside the partition, but can have any
 * position relative to each other.
 * All pointers only move forward. For partition boundaries, this is ensured by
 * the order of input data. This property also trivially holds for the ROWS and
 * GROUPS frames. For the RANGE frame, the proof requires the additional fact
 * that the ranges are specified in terms of (the single) ORDER BY column.
 *
 * `final` is so that the isCancelled() is devirtualized, we call it every row.
 */
class WindowTransform final : public IProcessor
{
public:
    WindowTransform(
            const Block & input_header_,
            const Block & output_header_,
            const WindowDescription & window_description_,
            const std::vector<WindowFunctionDescription> &
                functions);

    ~WindowTransform() override;

    String getName() const override
    {
        return "WindowTransform";
    }

    static Block transformHeader(Block header, const ExpressionActionsPtr & expression);

    /* (former) Implementation of ISimpleTransform.
     */
    void appendChunk(Chunk & chunk) /*override*/;

    /* Implementation of IProcessor;
     */
    Status prepare() override;
    void work() override;

    /* Implementation details.
     */
    void advancePartitionEnd();

    bool arePeers(const RowNumber & x, const RowNumber & y) const;

    void advanceFrameStartRowsOffset();
    void advanceFrameStartRangeOffset();
    void advanceFrameStart();

    void advanceFrameEndRowsOffset();
    void advanceFrameEndCurrentRow();
    void advanceFrameEndUnbounded();
    void advanceFrameEnd();
    void advanceFrameEndRangeOffset();

    void updateAggregationState();
    void writeOutCurrentRow();

    Columns & inputAt(const RowNumber & x)
    {
        assert(x.block >= first_block_number);
        assert(x.block - first_block_number < blocks.size());
        return blocks[x.block - first_block_number].input_columns;
    }

    const Columns & inputAt(const RowNumber & x) const
    {
        return const_cast<WindowTransform *>(this)->inputAt(x);
    }

    auto & blockAt(const UInt64 block_number)
    {
        assert(block_number >= first_block_number);
        assert(block_number - first_block_number < blocks.size());
        return blocks[block_number - first_block_number];
    }

    const auto & blockAt(const UInt64 block_number) const
    {
        return const_cast<WindowTransform *>(this)->blockAt(block_number);
    }

    auto & blockAt(const RowNumber & x)
    {
        return blockAt(x.block);
    }

    const auto & blockAt(const RowNumber & x) const
    {
        return const_cast<WindowTransform *>(this)->blockAt(x);
    }

    size_t blockRowsNumber(const RowNumber & x) const
    {
        return blockAt(x).rows;
    }

    MutableColumns & outputAt(const RowNumber & x)
    {
        assert(x.block >= first_block_number);
        assert(x.block - first_block_number < blocks.size());
        return blocks[x.block - first_block_number].output_columns;
    }

    void advanceRowNumber(RowNumber & x) const
    {
        assert(x.block >= first_block_number);
        assert(x.block - first_block_number < blocks.size());

        const auto block_rows = blockAt(x).rows;
        assert(x.row < block_rows);

        ++x.row;
        if (x.row < block_rows)
        {
            return;
        }

        x.row = 0;
        ++x.block;
    }

    RowNumber nextRowNumber(const RowNumber & x) const
    {
        RowNumber result = x;
        advanceRowNumber(result);
        return result;
    }

    void retreatRowNumber(RowNumber & x) const
    {
#ifndef NDEBUG
        auto original_x = x;
#endif

        if (x.row > 0)
        {
            --x.row;
            return;
        }

        --x.block;
        assert(x.block >= first_block_number);
        assert(x.block < first_block_number + blocks.size());
        assert(blockAt(x).rows > 0);
        x.row = blockAt(x).rows - 1;

#ifndef NDEBUG
        auto advanced_retreated_x = x;
        advanceRowNumber(advanced_retreated_x);
        assert(advanced_retreated_x == original_x);
#endif
    }

    RowNumber prevRowNumber(const RowNumber & x) const
    {
        RowNumber result = x;
        retreatRowNumber(result);
        return result;
    }

    auto moveRowNumber(const RowNumber & original_row_number, Int64 offset) const;
    auto moveRowNumberNoCheck(const RowNumber & original_row_number, Int64 offset) const;

    void assertValid(const RowNumber & x) const
    {
        assert(x.block >= first_block_number);
        if (x.block == first_block_number + blocks.size())
            assert(x.row == 0);
        else
            assert(x.row < blockRowsNumber(x));
    }

    RowNumber blocksEnd() const
    {
        return RowNumber{first_block_number + blocks.size(), 0};
    }

    RowNumber blocksBegin() const
    {
        return RowNumber{first_block_number, 0};
    }

    /* Data (formerly) inherited from ISimpleTransform, needed for the
     * implementation of the IProcessor interface.
     */
    InputPort & input;
    OutputPort & output;

    bool has_input = false;
    bool input_is_finished = false;
    Port::Data input_data;
    bool has_output = false;
    Port::Data output_data;

    /* Data for window transform itself.
     */
    Block input_header;

    WindowDescription window_description;

    // Indices of the PARTITION BY columns in block.
    std::vector<size_t> partition_by_indices;
    // Indices of the ORDER BY columns in block;
    std::vector<size_t> order_by_indices;

    // Per-window-function scratch spaces.
    std::vector<WindowFunctionWorkspace> workspaces;

    // FIXME Reset it when the partition changes. We only save the temporary
    // states in it (probably?).
    std::unique_ptr<Arena> arena;

    // A sliding window of blocks we currently need. We add the input blocks as
    // they arrive, and discard the blocks we don't need anymore. The blocks
    // have an always-incrementing index. The index of the first block is in
    // `first_block_number`.
    std::deque<WindowTransformBlock> blocks;
    UInt64 first_block_number = 0;
    // The next block we are going to pass to the consumer.
    UInt64 next_output_block_number = 0;
    // The first row for which we still haven't calculated the window functions.
    // Used to determine which resulting blocks we can pass to the consumer.
    RowNumber first_not_ready_row;

    // Boundaries of the current partition.
    // partition_start doesn't point to a valid block, because we want to drop
    // the blocks early to save memory. We still have to track it so that we can
    // cut off a PRECEDING frame at the partition start.
    // The `partition_end` is past-the-end, as usual. When
    // partition_ended = false, it still haven't ended, and partition_end is the
    // next row to check.
    RowNumber partition_start;
    RowNumber partition_end;
    bool partition_ended = false;

    // The row for which we are now computing the window functions.
    RowNumber current_row;
    // The start of current peer group, needed for CURRENT ROW frame start.
    // For ROWS frame, always equal to the current row, and for RANGE and GROUP
    // frames may be earlier.
    RowNumber peer_group_start;

    // Row and group numbers in partition for calculating rank() and friends.
    UInt64 current_row_number = 1;
    UInt64 peer_group_start_row_number = 1;
    UInt64 peer_group_number = 1;

    // The frame is [frame_start, frame_end) if frame_ended && frame_started,
    // and unknown otherwise. Note that when we move to the next row, both the
    // frame_start and the frame_end may jump forward by an unknown amount of
    // blocks, e.g. if we use a RANGE frame. This means that sometimes we don't
    // know neither frame_end nor frame_start.
    // We update the states of the window functions after we find the final frame
    // boundaries.
    // After we have found the final boundaries of the frame, we can immediately
    // output the result for the current row, without waiting for more data.
    RowNumber frame_start;
    RowNumber frame_end;
    bool frame_ended = false;
    bool frame_started = false;

    // The previous frame boundaries that correspond to the current state of the
    // aggregate function. We use them to determine how to update the aggregation
    // state after we find the new frame.
    RowNumber prev_frame_start;
    RowNumber prev_frame_end;

    // Comparison function for RANGE OFFSET frames. We choose the appropriate
    // overload once, based on the type of the ORDER BY column. Choosing it for
    // each row would be slow.
    std::function<int(
        const IColumn * compared_column, size_t compared_row,
        const IColumn * reference_column, size_t reference_row,
        const Field & offset,
        bool offset_is_preceding)> compare_values_with_offset;
};

}

#pragma clang diagnostic pop