1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
explicit CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
uint8_t CompressionCodecZSTD::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::ZSTD);
}
void CompressionCodecZSTD::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return static_cast<UInt32>(ZSTD_compressBound(uncompressed_size));
}
UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
ZSTD_CCtx * cctx = ZSTD_createCCtx();
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, level);
if (enable_long_range)
{
ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1);
ZSTD_CCtx_setParameter(cctx, ZSTD_c_windowLog, window_log); // NB zero window_log means "use default" for libzstd
}
size_t compressed_size = ZSTD_compress2(cctx, dest, ZSTD_compressBound(source_size), source, source_size);
ZSTD_freeCCtx(cctx);
if (ZSTD_isError(compressed_size))
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress block with ZSTD: {}", std::string(ZSTD_getErrorName(compressed_size)));
return static_cast<UInt32>(compressed_size);
}
void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
size_t res = ZSTD_decompress(dest, uncompressed_size, source, source_size);
if (ZSTD_isError(res))
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot ZSTD_decompress: {}", std::string(ZSTD_getErrorName(res)));
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_, int window_log_) : level(level_), enable_long_range(true), window_log(window_log_)
{
setCodecDescription(
"ZSTD", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level)), std::make_shared<ASTLiteral>(static_cast<UInt64>(window_log))});
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_) : level(level_), enable_long_range(false), window_log(0)
{
setCodecDescription("ZSTD", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
void registerCodecZSTD(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::ZSTD);
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr {
int level = CompressionCodecZSTD::ZSTD_DEFAULT_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 2)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "ZSTD codec must have 1 or 2 parameters, given {}",
arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD codec argument must be integer");
level = static_cast<int>(literal->value.safeGet<UInt64>());
if (level > ZSTD_maxCLevel())
{
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER,
"ZSTD codec can't have level more than {}, given {}",
ZSTD_maxCLevel(), level);
}
if (arguments->children.size() > 1)
{
const auto * window_literal = children[1]->as<ASTLiteral>();
if (!window_literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD codec second argument must be integer");
const int window_log = static_cast<int>(window_literal->value.safeGet<UInt64>());
ZSTD_bounds window_log_bounds = ZSTD_cParam_getBounds(ZSTD_c_windowLog);
if (ZSTD_isError(window_log_bounds.error))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD windowLog parameter is not supported {}",
std::string(ZSTD_getErrorName(window_log_bounds.error)));
// 0 means "use default" for libzstd
if (window_log != 0 && (window_log > window_log_bounds.upperBound || window_log < window_log_bounds.lowerBound))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER,
"ZSTD codec can't have window log more than {} and lower than {}, given {}",
toString(window_log_bounds.upperBound),
toString(window_log_bounds.lowerBound), toString(window_log));
return std::make_shared<CompressionCodecZSTD>(level, window_log);
}
}
return std::make_shared<CompressionCodecZSTD>(level);
});
}
CompressionCodecPtr getCompressionCodecZSTD(int level)
{
return std::make_shared<CompressionCodecZSTD>(level);
}
}
|