aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/HadoopSnappyReadBuffer.h
blob: bcc438489d0ca36771b539f0f0aa0639afa474cd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#pragma once

#include "clickhouse_config.h"

#if USE_SNAPPY

#include <memory>
#include <IO/ReadBuffer.h>
#include <IO/CompressedReadBufferWrapper.h>

namespace DB
{


/*
 * Hadoop-snappy format is one of the compression formats base on Snappy used in Hadoop. It uses its own framing format as follows:
 * 1. A compressed file consists of one or more blocks.
 * 2. A block consists of uncompressed length (big endian 4 byte integer) and one or more subblocks.
 * 3. A subblock consists of compressed length (big endian 4 byte integer) and raw compressed data.
 *
 * HadoopSnappyDecoder implements the decompression of data compressed with hadoop-snappy format.
 */
class HadoopSnappyDecoder
{
public:
    enum class Status : int
    {
        OK = 0,
        INVALID_INPUT = 1,
        BUFFER_TOO_SMALL = 2,
        NEEDS_MORE_INPUT = 3,
        TOO_LARGE_COMPRESSED_BLOCK = 4,
    };

    HadoopSnappyDecoder() = default;
    ~HadoopSnappyDecoder() = default;

    Status readBlock(size_t * avail_in, const char ** next_in, size_t * avail_out, char ** next_out);

    inline void reset()
    {
        buffer_length = 0;
        block_length = -1;
        compressed_length = -1;
        total_uncompressed_length = 0;
    }

    Status result = Status::OK;

private:
    inline bool checkBufferLength(int max) const;
    inline static bool checkAvailIn(size_t avail_in, int min);

    inline void copyToBuffer(size_t * avail_in, const char ** next_in);

    inline static uint32_t readLength(const char * in);
    inline Status readLength(size_t * avail_in, const char ** next_in, int * length);
    inline Status readBlockLength(size_t * avail_in, const char ** next_in);
    inline Status readCompressedLength(size_t * avail_in, const char ** next_in);
    inline Status readCompressedData(size_t * avail_in, const char ** next_in, size_t * avail_out, char ** next_out);

    char buffer[DBMS_DEFAULT_BUFFER_SIZE] = {0};
    int buffer_length = 0;

    int block_length = -1;
    int compressed_length = -1;
    int total_uncompressed_length = 0;
};

/// HadoopSnappyReadBuffer implements read buffer for data compressed with hadoop-snappy format.
class HadoopSnappyReadBuffer : public CompressedReadBufferWrapper
{
public:
    using Status = HadoopSnappyDecoder::Status;

    inline static String statusToString(Status status)
    {
        switch (status)
        {
            case Status::OK:
                return "OK";
            case Status::INVALID_INPUT:
                return "INVALID_INPUT";
            case Status::BUFFER_TOO_SMALL:
                return "BUFFER_TOO_SMALL";
            case Status::NEEDS_MORE_INPUT:
                return "NEEDS_MORE_INPUT";
            case Status::TOO_LARGE_COMPRESSED_BLOCK:
                return "TOO_LARGE_COMPRESSED_BLOCK";
        }
        UNREACHABLE();
    }

    explicit HadoopSnappyReadBuffer(
        std::unique_ptr<ReadBuffer> in_,
        size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
        char * existing_memory = nullptr,
        size_t alignment = 0);

    ~HadoopSnappyReadBuffer() override;

private:
    bool nextImpl() override;

    std::unique_ptr<HadoopSnappyDecoder> decoder;

    size_t in_available;
    const char * in_data;

    size_t out_capacity;
    char * out_data;

    bool eof;
};

}
#endif