aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp
blob: 8c94a0172712fe0c88a3e7c7b7b8c274815cff5f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>

namespace DB
{

IMergingAlgorithmWithSharedChunks::IMergingAlgorithmWithSharedChunks(
    Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs)
    : header(std::move(header_))
    , description(std::move(description_))
    , chunk_allocator(num_inputs + max_row_refs)
    , cursors(num_inputs)
    , sources(num_inputs)
    , out_row_sources_buf(out_row_sources_buf_)
{
}

static void prepareChunk(Chunk & chunk)
{
    auto num_rows = chunk.getNumRows();
    auto columns = chunk.detachColumns();
    for (auto & column : columns)
        column = column->convertToFullColumnIfConst();

    chunk.setColumns(std::move(columns), num_rows);
}

void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
{
    for (size_t source_num = 0; source_num < inputs.size(); ++source_num)
    {
        if (!inputs[source_num].chunk)
            continue;

        prepareChunk(inputs[source_num].chunk);

        auto & source = sources[source_num];

        source.skip_last_row = inputs[source_num].skip_last_row;
        source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
        cursors[source_num] = SortCursorImpl(header, source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);

        source.chunk->all_columns = cursors[source_num].all_columns;
        source.chunk->sort_columns = cursors[source_num].sort_columns;
    }

    queue = SortingQueue<SortCursor>(cursors);
}

void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num)
{
    prepareChunk(input.chunk);

    auto & source = sources[source_num];
    source.skip_last_row = input.skip_last_row;
    source.chunk = chunk_allocator.alloc(input.chunk);
    cursors[source_num].reset(source.chunk->getColumns(), header, input.permutation);

    source.chunk->all_columns = cursors[source_num].all_columns;
    source.chunk->sort_columns = cursors[source_num].sort_columns;

    queue.push(cursors[source_num]);
}

}