aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/ZstdInflatingReadBuffer.cpp
blob: 2b663ec714528adc208ee0228b9f47c6f48b5c77 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include <IO/ZstdInflatingReadBuffer.h>
#include <IO/WithFileName.h>
#include <zstd_errors.h>


namespace DB
{
namespace ErrorCodes
{
    extern const int ZSTD_DECODER_FAILED;
}

ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
    : CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
{
    dctx = ZSTD_createDCtx();
    input = {nullptr, 0, 0};
    output = {nullptr, 0, 0};

    if (dctx == nullptr)
    {
        throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
    }

    size_t ret = ZSTD_DCtx_setParameter(dctx, ZSTD_d_windowLogMax, zstd_window_log_max);
    if (ZSTD_isError(ret))
    {
        throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: {}", ZSTD_getErrorName(ret));
    }
}

ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()
{
    ZSTD_freeDCtx(dctx);
}

bool ZstdInflatingReadBuffer::nextImpl()
{
    do
    {
        // If it is known that end of file was reached, return false
        if (eof_flag)
            return false;

        /// If end was reached, get next part
        if (input.pos >= input.size)
        {
            in->nextIfAtEnd();
            input.src = reinterpret_cast<unsigned char *>(in->position());
            input.pos = 0;
            input.size = in->buffer().end() - in->position();
        }

        /// fill output
        output.dst = reinterpret_cast<unsigned char *>(internal_buffer.begin());
        output.size = internal_buffer.size();
        output.pos = 0;

        /// Decompress data and check errors.
        size_t ret = ZSTD_decompressStream(dctx, &output, &input);
        if (ZSTD_getErrorCode(ret))
        {
            throw Exception(
                ErrorCodes::ZSTD_DECODER_FAILED,
                "ZSTD stream decoding failed: error '{}'{}; ZSTD version: {}{}",
                ZSTD_getErrorName(ret),
                ZSTD_error_frameParameter_windowTooLarge == ret
                    ? ". You can increase the maximum window size with the 'zstd_window_log_max' setting in ClickHouse. Example: 'SET zstd_window_log_max = 31'"
                    : "",
                ZSTD_VERSION_STRING,
                getExceptionEntryWithFileName(*in));
        }

        /// Check that something has changed after decompress (input or output position)
        assert(in->eof() || output.pos > 0 || in->position() < in->buffer().begin() + input.pos);

        /// move position to the end of read data
        in->position() = in->buffer().begin() + input.pos;
        working_buffer.resize(output.pos);

        /// If end of file is reached, fill eof variable and return true if there is some data in buffer, otherwise return false
        if (in->eof())
        {
            eof_flag = true;
            return !working_buffer.empty();
        }
        /// It is possible, that input buffer is not at eof yet, but nothing was decompressed in current iteration.
        /// But there are cases, when such behaviour is not allowed - i.e. if input buffer is not eof, then
        /// it has to be guaranteed that working_buffer is not empty. So if it is empty, continue.
    } while (output.pos == 0);

    return true;
}

}