aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/MemoryBoundMerging.h
blob: 607087fb39c56450f1d05d8b1173ff4219d7f2a3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#pragma once

#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <Processors/IProcessor.h>
#include <Processors/Transforms/AggregatingTransform.h>

#include <Poco/Logger.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}


/// Has several inputs and single output.
/// Read from inputs merged buckets with aggregated data, sort them by bucket number and block number.
/// Presumption: inputs return chunks with increasing bucket and block number, there is at most one chunk with the given bucket and block number.
class SortingAggregatedForMemoryBoundMergingTransform : public IProcessor
{
public:
    explicit SortingAggregatedForMemoryBoundMergingTransform(const Block & header_, size_t num_inputs_)
        : IProcessor(InputPorts(num_inputs_, header_), {header_})
        , header(header_)
        , num_inputs(num_inputs_)
        , last_chunk_id(num_inputs, {std::numeric_limits<Int32>::min(), 0})
        , is_input_finished(num_inputs, false)
    {
    }

    String getName() const override { return "SortingAggregatedForMemoryBoundMergingTransform"; }

    Status prepare() override
    {
        auto & output = outputs.front();

        if (output.isFinished())
        {
            for (auto & input : inputs)
                input.close();

            return Status::Finished;
        }

        if (!output.canPush())
        {
            for (auto & input : inputs)
                input.setNotNeeded();

            return Status::PortFull;
        }

        /// Push if have chunk that is the next in order
        bool pushed_to_output = tryPushChunk();

        bool need_data = false;
        bool all_finished = true;

        /// Try read new chunk
        auto in = inputs.begin();
        for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in)
        {
            if (in->isFinished())
            {
                is_input_finished[input_num] = true;
                continue;
            }

            /// We want to keep not more than `num_inputs` chunks in memory (and there will be only a single chunk with the given (bucket_id, chunk_num)).
            const bool bucket_from_this_input_still_in_memory = chunks.contains(last_chunk_id[input_num]);
            if (bucket_from_this_input_still_in_memory)
            {
                all_finished = false;
                continue;
            }

            in->setNeeded();

            if (!in->hasData())
            {
                need_data = true;
                all_finished = false;
                continue;
            }

            auto chunk = in->pull();
            addChunk(std::move(chunk), input_num);

            if (in->isFinished())
            {
                is_input_finished[input_num] = true;
            }
            else
            {
                /// If chunk was pulled, then we need data from this port.
                need_data = true;
                all_finished = false;
            }
        }

        if (pushed_to_output)
            return Status::PortFull;

        if (tryPushChunk())
            return Status::PortFull;

        if (need_data)
            return Status::NeedData;

        if (!all_finished)
            throw Exception(ErrorCodes::LOGICAL_ERROR, "SortingAggregatedForMemoryBoundMergingTransform has read bucket, but couldn't push it.");

        if (overflow_chunk)
        {
            output.push(std::move(overflow_chunk));
            return Status::PortFull;
        }

        output.finish();
        return Status::Finished;
    }

private:
    bool tryPushChunk()
    {
        auto & output = outputs.front();

        if (chunks.empty())
            return false;

        /// Chunk with min id
        auto it = chunks.begin();
        auto current_chunk_id = it->first;

        /// Check if it is actually next in order
        for (size_t input = 0; input < num_inputs; ++input)
            if (!is_input_finished[input] && last_chunk_id[input] < current_chunk_id)
                return false;

        output.push(std::move(it->second));
        chunks.erase(it);
        return true;
    }

    void addChunk(Chunk chunk, size_t from_input)
    {
        if (!chunk.hasRows())
            return;

        const auto & info = chunk.getChunkInfo();
        if (!info)
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in SortingAggregatedForMemoryBoundMergingTransform.");

        const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
        if (!agg_info)
            throw Exception(
                ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in SortingAggregatedForMemoryBoundMergingTransform.");

        Int32 bucket_id = agg_info->bucket_num;
        bool is_overflows = agg_info->is_overflows;
        UInt64 chunk_num = agg_info->chunk_num;

        if (is_overflows)
            overflow_chunk = std::move(chunk);
        else
        {
            const auto chunk_id = ChunkId{bucket_id, chunk_num};
            if (chunks.contains(chunk_id))
            {
                throw Exception(
                    ErrorCodes::LOGICAL_ERROR,
                    "SortingAggregatedForMemoryBoundMergingTransform already got bucket with number {}",
                    bucket_id);
            }

            chunks[chunk_id] = std::move(chunk);
            last_chunk_id[from_input] = chunk_id;
        }
    }

    struct ChunkId
    {
        Int32 bucket_id;
        UInt64 chunk_num;

        bool operator<(const ChunkId & other) const
        {
            return std::make_pair(bucket_id, chunk_num) < std::make_pair(other.bucket_id, other.chunk_num);
        }
    };

    Block header;
    size_t num_inputs;

    std::vector<ChunkId> last_chunk_id;
    std::vector<bool> is_input_finished;
    std::map<ChunkId, Chunk> chunks;
    Chunk overflow_chunk;
};

}