1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
#pragma once
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
namespace DB
{
/// Writes data part in compact format.
class MergeTreeDataPartWriterCompact : public MergeTreeDataPartWriterOnDisk
{
public:
MergeTreeDataPartWriterCompact(
const MergeTreeMutableDataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const MergeTreeWriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
void finish(bool sync) override;
private:
/// Finish serialization of the data. Flush rows in buffer to disk, compute checksums.
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums);
void finishDataSerialization(bool sync);
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
/// Write block of rows into .bin file and marks in .mrk files
void writeDataBlock(const Block & block, const Granules & granules);
/// Write block of rows into .bin file and marks in .mrk files, primary index in .idx file
/// and skip indices in their corresponding files.
void writeDataBlockPrimaryIndexAndSkipIndices(const Block & block, const Granules & granules);
void addToChecksums(MergeTreeDataPartChecksums & checksums);
void addStreams(const NameAndTypePair & column, const ASTPtr & effective_codec_desc);
Block header;
/** Simplified SquashingTransform. The original one isn't suitable in this case
* as it can return smaller block from buffer without merging it with larger block if last is enough size.
* But in compact parts we should guarantee, that written block is larger or equals than index_granularity.
*/
class ColumnsBuffer
{
public:
void add(MutableColumns && columns);
size_t size() const;
Columns releaseColumns();
private:
MutableColumns accumulated_columns;
};
ColumnsBuffer columns_buffer;
/// hashing_buf -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
/// Compressed stream which allows to write with codec.
struct CompressedStream
{
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer hashing_buf;
CompressedStream(WriteBuffer & buf, const CompressionCodecPtr & codec)
: compressed_buf(buf, codec)
, hashing_buf(compressed_buf) {}
};
using CompressedStreamPtr = std::shared_ptr<CompressedStream>;
/// Create compressed stream for every different codec. All streams write to
/// a single file on disk.
std::unordered_map<UInt64, CompressedStreamPtr> streams_by_codec;
/// Stream for each column's substreams path (look at addStreams).
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
/// If marks are uncompressed, the data is written to 'marks_file_hashing' for hash calculation and then to the 'marks_file'.
std::unique_ptr<WriteBufferFromFileBase> marks_file;
std::unique_ptr<HashingWriteBuffer> marks_file_hashing;
/// If marks are compressed, the data is written to 'marks_source_hashing' for hash calculation,
/// then to 'marks_compressor' for compression,
/// then to 'marks_file_hashing' for calculation of hash of compressed data,
/// then finally to 'marks_file'.
std::unique_ptr<CompressedWriteBuffer> marks_compressor;
std::unique_ptr<HashingWriteBuffer> marks_source_hashing;
};
}
|