aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Interpreters/SortedBlocksWriter.h
blob: db8ed860207a38ea355b2663f86107735a0452ac (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#pragma once

#include <mutex>
#include <condition_variable>

#include <Common/filesystemHelpers.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/TemporaryFileOnDisk.h>

namespace DB
{

class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;

class Pipe;

class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;

struct SortedBlocksWriter
{
    using TmpFilePtr = TemporaryFileOnDiskHolder;
    using SortedFiles = std::vector<TmpFilePtr>;

    struct Blocks
    {
        BlocksList blocks;
        size_t row_count = 0;
        size_t bytes = 0;

        bool empty() const { return blocks.empty(); }

        void insert(Block && block)
        {
            countBlockSize(block);
            blocks.emplace_back(std::move(block));
        }

        void countBlockSize(const Block & block)
        {
            row_count += block.rows();
            bytes += block.bytes();
        }

        void clear()
        {
            blocks.clear();
            row_count = 0;
            bytes = 0;
        }
    };

    struct PremergedFiles
    {
        SortedFiles files;
        Pipe pipe;
    };

    static constexpr const size_t num_streams = 2;

    std::mutex insert_mutex;
    std::condition_variable flush_condvar;
    const SizeLimits & size_limits;
    VolumePtr volume;
    Block sample_block;
    const SortDescription & sort_description;
    Blocks inserted_blocks;
    const size_t rows_in_block;
    const size_t num_files_for_merge;
    const String & codec;
    SortedFiles sorted_files;
    size_t row_count_in_flush = 0;
    size_t bytes_in_flush = 0;
    size_t flush_number = 0;
    size_t flush_inflight = 0;

    SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_,
                       const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
        : size_limits(size_limits_)
        , volume(volume_)
        , sample_block(sample_block_)
        , sort_description(description)
        , rows_in_block(rows_in_block_)
        , num_files_for_merge(num_files_to_merge_)
        , codec(codec_)
    {}

    void addBlocks(const Blocks & blocks)
    {
        sorted_files.emplace_back(flush(blocks.blocks));
    }

    String getPath() const;
    Pipe streamFromFile(const TmpFilePtr & file) const;

    void insert(Block && block);
    TmpFilePtr flush(const BlocksList & blocks) const;
    PremergedFiles premerge();
    SortedFiles finishMerge(std::function<void(const Block &)> callback = [](const Block &){});
};


class SortedBlocksBuffer
{
public:
    SortedBlocksBuffer(const SortDescription & sort_description_, size_t max_bytes_)
        : max_bytes(max_bytes_)
        , current_bytes(0)
        , sort_description(sort_description_)
    {}

    Block exchange(Block && block);

private:
    std::mutex mutex;
    size_t max_bytes;
    size_t current_bytes;
    Blocks buffer;
    const SortDescription & sort_description;

    Block mergeBlocks(Blocks &&) const;
};

}