blob: 58c445b6ac4d9c24114c953d8bf1295e3a481ae3 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#pragma once
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/Algorithms/RowRef.h>
#include <Core/SortDescription.h>
namespace DB
{
class IMergingAlgorithmWithSharedChunks : public IMergingAlgorithm
{
public:
IMergingAlgorithmWithSharedChunks(
Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
private:
Block header;
SortDescription description;
/// Allocator must be destroyed after source_chunks.
detail::SharedChunkAllocator chunk_allocator;
SortCursorImpls cursors;
protected:
struct Source
{
detail::SharedChunkPtr chunk;
bool skip_last_row;
};
/// Sources currently being merged.
using Sources = std::vector<Source>;
Sources sources;
SortingQueue<SortCursor> queue;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
using RowRef = detail::RowRefWithOwnedChunk;
void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, sources[cursor.impl->order].chunk); }
bool skipLastRowFor(size_t input_number) const { return sources[input_number].skip_last_row; }
};
}
|