aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h
blob: 578100f080d8c1832086ede3490ac9f60044892c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#pragma once
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/Algorithms/MergedData.h>
#include <Processors/Merges/Algorithms/FixedSizeDequeWithGaps.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <queue>

namespace DB
{

/** Merges several sorted inputs to one.
  * For each group of consecutive identical values of the sorting key
  *   (the columns by which the data is sorted, including specially specified version column),
  *   merges any pair of consecutive rows with opposite sign.
  */
class VersionedCollapsingAlgorithm final : public IMergingAlgorithmWithSharedChunks
{
public:
    /// Don't need version column. It's in primary key.
    VersionedCollapsingAlgorithm(
        const Block & header, size_t num_inputs,
        SortDescription description_, const String & sign_column_,
        size_t max_block_size_rows,
        size_t max_block_size_bytes,
        WriteBuffer * out_row_sources_buf_ = nullptr,
        bool use_average_block_sizes = false);

    Status merge() override;

private:
    MergedData merged_data;

    size_t sign_column_number = 0;

    const size_t max_rows_in_queue;

    /// Rows with the same primary key and sign.
    FixedSizeDequeWithGaps<RowRef> current_keys;
    Int8 sign_in_queue = 0;

    std::queue<RowSourcePart> current_row_sources;   /// Sources of rows with the current primary key

    void insertGap(size_t gap_size);
    void insertRow(size_t skip_rows, const RowRef & row);
};

}