aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/FillingTransform.h
blob: 85da544ef2dc96e8e8e886b6ae1bb4cbef0f2cc1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#pragma once

#include <Core/InterpolateDescription.h>
#include <Core/SortDescription.h>
#include <Interpreters/FillingRow.h>
#include <Processors/ISimpleTransform.h>


namespace DB
{

/** Implements modifier WITH FILL of ORDER BY clause.
 *  It fills gaps in data stream by rows with missing values in columns with set WITH FILL and default values in other columns.
 *  Optionally FROM, TO and STEP values can be specified.
 */
class FillingTransform : public ISimpleTransform
{
public:
    FillingTransform(
        const Block & header_,
        const SortDescription & sort_description_,
        const SortDescription & fill_description_,
        InterpolateDescriptionPtr interpolate_description_,
        bool use_with_fill_by_sorting_prefix_);

    String getName() const override { return "FillingTransform"; }

    Status prepare() override;

    static Block transformHeader(Block header, const SortDescription & sort_description);

protected:
    void transform(Chunk & chunk) override;

private:
    using MutableColumnRawPtrs = std::vector<IColumn *>;
    void transformRange(
        const Columns & input_fill_columns,
        const Columns & input_interpolate_columns,
        const Columns & input_sort_prefix_columns,
        const Columns & input_other_columns,
        const MutableColumns & result_columns,
        const MutableColumnRawPtrs & res_fill_columns,
        const MutableColumnRawPtrs & res_interpolate_columns,
        const MutableColumnRawPtrs & res_sort_prefix_columns,
        const MutableColumnRawPtrs & res_other_columns,
        std::pair<size_t, size_t> range,
        bool new_sorting_prefix);

    void saveLastRow(const MutableColumns & result_columns);
    void interpolate(const MutableColumns & result_columns, Block & interpolate_block);

    void initColumns(
        const Columns & input_columns,
        Columns & input_fill_columns,
        Columns & input_interpolate_columns,
        Columns & input_sort_prefix_columns,
        Columns & input_other_columns,
        MutableColumns & output_columns,
        MutableColumnRawPtrs & output_fill_columns,
        MutableColumnRawPtrs & output_interpolate_columns,
        MutableColumnRawPtrs & output_sort_prefix_columns,
        MutableColumnRawPtrs & output_other_columns);

    bool generateSuffixIfNeeded(
        const MutableColumns & result_columns,
        MutableColumnRawPtrs res_fill_columns,
        MutableColumnRawPtrs res_interpolate_columns,
        MutableColumnRawPtrs res_sort_prefix_columns,
        MutableColumnRawPtrs res_other_columns);
    bool generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns);

    void insertFromFillingRow(
        const MutableColumnRawPtrs & filling_columns,
        const MutableColumnRawPtrs & interpolate_columns,
        const MutableColumnRawPtrs & other_columns,
        const Block & interpolate_block);

    const SortDescription sort_description;
    const SortDescription fill_description; /// Contains only columns with WITH FILL.
    SortDescription sort_prefix;
    const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns

    FillingRow filling_row; /// Current row, which is used to fill gaps.
    FillingRow next_row; /// Row to which we need to generate filling rows.
    bool filling_row_inserted = false;

    using Positions = std::vector<size_t>;
    Positions fill_column_positions;
    Positions interpolate_column_positions;
    Positions other_column_positions;
    Positions sort_prefix_positions;
    std::vector<std::pair<size_t, NameAndTypePair>> input_positions; /// positions in result columns required for actions
    ExpressionActionsPtr interpolate_actions;
    Columns last_row;
    Columns last_range_sort_prefix;
    bool all_chunks_processed = false;    /// flag to determine if we have already processed all chunks
    const bool use_with_fill_by_sorting_prefix;
};

class FillingNoopTransform : public ISimpleTransform
{
public:
    FillingNoopTransform(const Block & header, const SortDescription & sort_description_)
        : ISimpleTransform(header, FillingTransform::transformHeader(header, sort_description_), true)
    {
    }

    void transform(Chunk &) override {}
    String getName() const override { return "FillingNoopTransform"; }
};

}