1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#pragma once
#include "clickhouse_config.h"
#if USE_SNAPPY
#include <memory>
#include <IO/ReadBuffer.h>
#include <IO/CompressedReadBufferWrapper.h>
namespace DB
{
/*
* Hadoop-snappy format is one of the compression formats base on Snappy used in Hadoop. It uses its own framing format as follows:
* 1. A compressed file consists of one or more blocks.
* 2. A block consists of uncompressed length (big endian 4 byte integer) and one or more subblocks.
* 3. A subblock consists of compressed length (big endian 4 byte integer) and raw compressed data.
*
* HadoopSnappyDecoder implements the decompression of data compressed with hadoop-snappy format.
*/
class HadoopSnappyDecoder
{
public:
enum class Status : int
{
OK = 0,
INVALID_INPUT = 1,
BUFFER_TOO_SMALL = 2,
NEEDS_MORE_INPUT = 3,
TOO_LARGE_COMPRESSED_BLOCK = 4,
};
HadoopSnappyDecoder() = default;
~HadoopSnappyDecoder() = default;
Status readBlock(size_t * avail_in, const char ** next_in, size_t * avail_out, char ** next_out);
inline void reset()
{
buffer_length = 0;
block_length = -1;
compressed_length = -1;
total_uncompressed_length = 0;
}
Status result = Status::OK;
private:
inline bool checkBufferLength(int max) const;
inline static bool checkAvailIn(size_t avail_in, int min);
inline void copyToBuffer(size_t * avail_in, const char ** next_in);
inline static uint32_t readLength(const char * in);
inline Status readLength(size_t * avail_in, const char ** next_in, int * length);
inline Status readBlockLength(size_t * avail_in, const char ** next_in);
inline Status readCompressedLength(size_t * avail_in, const char ** next_in);
inline Status readCompressedData(size_t * avail_in, const char ** next_in, size_t * avail_out, char ** next_out);
char buffer[DBMS_DEFAULT_BUFFER_SIZE] = {0};
int buffer_length = 0;
int block_length = -1;
int compressed_length = -1;
int total_uncompressed_length = 0;
};
/// HadoopSnappyReadBuffer implements read buffer for data compressed with hadoop-snappy format.
class HadoopSnappyReadBuffer : public CompressedReadBufferWrapper
{
public:
using Status = HadoopSnappyDecoder::Status;
inline static String statusToString(Status status)
{
switch (status)
{
case Status::OK:
return "OK";
case Status::INVALID_INPUT:
return "INVALID_INPUT";
case Status::BUFFER_TOO_SMALL:
return "BUFFER_TOO_SMALL";
case Status::NEEDS_MORE_INPUT:
return "NEEDS_MORE_INPUT";
case Status::TOO_LARGE_COMPRESSED_BLOCK:
return "TOO_LARGE_COMPRESSED_BLOCK";
}
UNREACHABLE();
}
explicit HadoopSnappyReadBuffer(
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~HadoopSnappyReadBuffer() override;
private:
bool nextImpl() override;
std::unique_ptr<HadoopSnappyDecoder> decoder;
size_t in_available;
const char * in_data;
size_t out_capacity;
char * out_data;
bool eof;
};
}
#endif
|