aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h
blob: 58c445b6ac4d9c24114c953d8bf1295e3a481ae3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#pragma once
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/Algorithms/RowRef.h>
#include <Core/SortDescription.h>

namespace DB
{

class IMergingAlgorithmWithSharedChunks : public IMergingAlgorithm
{
public:
    IMergingAlgorithmWithSharedChunks(
        Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs);

    void initialize(Inputs inputs) override;
    void consume(Input & input, size_t source_num) override;

private:
    Block header;
    SortDescription description;

    /// Allocator must be destroyed after source_chunks.
    detail::SharedChunkAllocator chunk_allocator;

    SortCursorImpls cursors;

protected:

    struct Source
    {
        detail::SharedChunkPtr chunk;
        bool skip_last_row;
    };

    /// Sources currently being merged.
    using Sources = std::vector<Source>;
    Sources sources;

    SortingQueue<SortCursor> queue;

    /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
    /// If it is not nullptr then it should be populated during execution
    WriteBuffer * out_row_sources_buf = nullptr;

    using RowRef = detail::RowRefWithOwnedChunk;
    void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, sources[cursor.impl->order].chunk); }
    bool skipLastRowFor(size_t input_number) const { return sources[input_number].skip_last_row; }
};

}