1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#pragma once
#include <mutex>
#include <condition_variable>
#include <Common/filesystemHelpers.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/TemporaryFileOnDisk.h>
namespace DB
{
class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class Pipe;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
struct SortedBlocksWriter
{
using TmpFilePtr = TemporaryFileOnDiskHolder;
using SortedFiles = std::vector<TmpFilePtr>;
struct Blocks
{
BlocksList blocks;
size_t row_count = 0;
size_t bytes = 0;
bool empty() const { return blocks.empty(); }
void insert(Block && block)
{
countBlockSize(block);
blocks.emplace_back(std::move(block));
}
void countBlockSize(const Block & block)
{
row_count += block.rows();
bytes += block.bytes();
}
void clear()
{
blocks.clear();
row_count = 0;
bytes = 0;
}
};
struct PremergedFiles
{
SortedFiles files;
Pipe pipe;
};
static constexpr const size_t num_streams = 2;
std::mutex insert_mutex;
std::condition_variable flush_condvar;
const SizeLimits & size_limits;
VolumePtr volume;
Block sample_block;
const SortDescription & sort_description;
Blocks inserted_blocks;
const size_t rows_in_block;
const size_t num_files_for_merge;
const String & codec;
SortedFiles sorted_files;
size_t row_count_in_flush = 0;
size_t bytes_in_flush = 0;
size_t flush_number = 0;
size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_,
const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_)
, volume(volume_)
, sample_block(sample_block_)
, sort_description(description)
, rows_in_block(rows_in_block_)
, num_files_for_merge(num_files_to_merge_)
, codec(codec_)
{}
void addBlocks(const Blocks & blocks)
{
sorted_files.emplace_back(flush(blocks.blocks));
}
String getPath() const;
Pipe streamFromFile(const TmpFilePtr & file) const;
void insert(Block && block);
TmpFilePtr flush(const BlocksList & blocks) const;
PremergedFiles premerge();
SortedFiles finishMerge(std::function<void(const Block &)> callback = [](const Block &){});
};
class SortedBlocksBuffer
{
public:
SortedBlocksBuffer(const SortDescription & sort_description_, size_t max_bytes_)
: max_bytes(max_bytes_)
, current_bytes(0)
, sort_description(sort_description_)
{}
Block exchange(Block && block);
private:
std::mutex mutex;
size_t max_bytes;
size_t current_bytes;
Blocks buffer;
const SortDescription & sort_description;
Block mergeBlocks(Blocks &&) const;
};
}
|