aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp
blob: 681709248405af43ff5dda7ea449153b3d456d49 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>


namespace DB
{

IMergingAlgorithmWithDelayedChunk::IMergingAlgorithmWithDelayedChunk(Block header_, size_t num_inputs, SortDescription description_)
    : description(std::move(description_)), header(std::move(header_)), current_inputs(num_inputs), cursors(num_inputs)
{
}

void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
{
    current_inputs = std::move(inputs);

    for (size_t source_num = 0; source_num < current_inputs.size(); ++source_num)
    {
        if (!current_inputs[source_num].chunk)
            continue;

        cursors[source_num] = SortCursorImpl(
            header, current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
    }

    queue = SortingQueue<SortCursor>(cursors);
}

void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t source_num)
{
    auto & current_input = current_inputs[source_num];

    /// Extend lifetime of last chunk.
    last_chunk.swap(current_input.chunk);
    last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);

    current_input.swap(input);
    cursors[source_num].reset(current_input.chunk.getColumns(), header, current_input.permutation);

    queue.push(cursors[source_num]);
}

}