aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp
blob: dd19955d010f9cf87d332ee770d3d93efb1ee5f0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#include "CompressedReadBufferBase.h"

#include <bit>
#include <cstring>
#include <cassert>
#include <city.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>
#include <base/hex.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BufferWithOwnMemory.h>
#include <Compression/CompressionInfo.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>


namespace ProfileEvents
{
    extern const Event ReadCompressedBytes;
    extern const Event CompressedReadBufferBlocks;
    extern const Event CompressedReadBufferBytes;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int TOO_LARGE_SIZE_COMPRESSED;
    extern const int CHECKSUM_DOESNT_MATCH;
    extern const int CANNOT_DECOMPRESS;
    extern const int CORRUPTED_DATA;
}

using Checksum = CityHash_v1_0_2::uint128;


/// Validate checksum of data, and if it mismatches, find out possible reason and throw exception.
static void validateChecksum(char * data, size_t size, const Checksum expected_checksum)
{
    auto calculated_checksum = CityHash_v1_0_2::CityHash128(data, size);
    if (expected_checksum == calculated_checksum)
        return;

    WriteBufferFromOwnString message;

    /// TODO mess up of endianness in error message.
    message << "Checksum doesn't match: corrupted data."
        " Reference: " + getHexUIntLowercase(expected_checksum)
        + ". Actual: " + getHexUIntLowercase(calculated_checksum)
        + ". Size of compressed block: " + toString(size);

    const char * message_hardware_failure = "This is most likely due to hardware failure. "
                                            "If you receive broken data over network and the error does not repeat every time, "
                                            "this can be caused by bad RAM on network interface controller or bad controller itself "
                                            "or bad RAM on network switches or bad CPU on network switches "
                                            "(look at the logs on related network switches; note that TCP checksums don't help) "
                                            "or bad RAM on host (look at dmesg or kern.log for enormous amount of EDAC errors, "
                                            "ECC-related reports, Machine Check Exceptions, mcelog; note that ECC memory can fail "
                                            "if the number of errors is huge) or bad CPU on host. If you read data from disk, "
                                            "this can be caused by disk bit rot. This exception protects ClickHouse "
                                            "from data corruption due to hardware failures.";

    auto flip_bit = [](char * buf, size_t pos)
    {
        buf[pos / 8] ^= 1 << pos % 8;
    };

    /// If size is too huge, then this may be caused by corruption.
    /// And anyway this is pretty heavy, so avoid burning too much CPU here.
    if (size < (1ULL << 20))
    {
        /// We need to copy data from ReadBuffer to flip bits as ReadBuffer should be immutable
        PODArray<char> tmp_buffer(data, data + size);
        char * tmp_data = tmp_buffer.data();

        /// Check if the difference caused by single bit flip in data.
        for (size_t bit_pos = 0; bit_pos < size * 8; ++bit_pos)
        {
            flip_bit(tmp_data, bit_pos);

            auto checksum_of_data_with_flipped_bit = CityHash_v1_0_2::CityHash128(tmp_data, size);
            if (expected_checksum == checksum_of_data_with_flipped_bit)
            {
                message << ". The mismatch is caused by single bit flip in data block at byte " << (bit_pos / 8) << ", bit " << (bit_pos % 8) << ". "
                    << message_hardware_failure;
                throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
            }

            flip_bit(tmp_data, bit_pos);    /// Restore
        }
    }

    /// Check if the difference caused by single bit flip in stored checksum.
    size_t difference = std::popcount(expected_checksum.low64 ^ calculated_checksum.low64)
        + std::popcount(expected_checksum.high64 ^ calculated_checksum.high64);

    if (difference == 1)
    {
        message << ". The mismatch is caused by single bit flip in checksum. "
            << message_hardware_failure;
        throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
    }

    throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

static void readHeaderAndGetCodecAndSize(
    const char * compressed_buffer,
    UInt8 header_size,
    CompressionCodecPtr & codec,
    size_t & size_decompressed,
    size_t & size_compressed_without_checksum,
    bool allow_different_codecs)
{
    uint8_t method = ICompressionCodec::readMethod(compressed_buffer);

    if (!codec)
    {
        codec = CompressionCodecFactory::instance().get(method);
    }
    else if (method != codec->getMethodByte())
    {
        if (allow_different_codecs)
        {
            codec = CompressionCodecFactory::instance().get(method);
        }
        else
        {
            throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Data compressed with different methods, given method "
                            "byte 0x{}, previous method byte 0x{}",
                            getHexUIntLowercase(method), getHexUIntLowercase(codec->getMethodByte()));
        }
    }

    size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(compressed_buffer);
    size_decompressed = ICompressionCodec::readDecompressedBlockSize(compressed_buffer);

    /// This is for clang static analyzer.
    assert(size_decompressed > 0);

    if (size_compressed_without_checksum > DBMS_MAX_COMPRESSED_SIZE)
        throw Exception(ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed_without_checksum: {}. "
                        "Most likely corrupted data.", size_compressed_without_checksum);

    if (size_compressed_without_checksum < header_size)
        throw Exception(ErrorCodes::CORRUPTED_DATA, "Can't decompress data: "
            "the compressed data size ({}, this should include header size) is less than the header size ({})",
            size_compressed_without_checksum, static_cast<size_t>(header_size));
}

/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy)
{
    if (compressed_in->eof())
        return 0;

    UInt8 header_size = ICompressionCodec::getHeaderSize();
    own_compressed_buffer.resize(header_size + sizeof(Checksum));

    compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);

    readHeaderAndGetCodecAndSize(
        own_compressed_buffer.data() + sizeof(Checksum),
        header_size,
        codec,
        size_decompressed,
        size_compressed_without_checksum,
        allow_different_codecs);

    auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();

    /// Is whole compressed block located in 'compressed_in->' buffer?
    if (!always_copy &&
        compressed_in->offset() >= header_size + sizeof(Checksum) &&
        compressed_in->available() >= (size_compressed_without_checksum - header_size) + additional_size_at_the_end_of_buffer + sizeof(Checksum))
    {
        compressed_in->position() -= header_size;
        compressed_buffer = compressed_in->position();
        compressed_in->position() += size_compressed_without_checksum;
    }
    else
    {
        own_compressed_buffer.resize(sizeof(Checksum) + size_compressed_without_checksum + additional_size_at_the_end_of_buffer);
        compressed_buffer = own_compressed_buffer.data() + sizeof(Checksum);
        compressed_in->readStrict(compressed_buffer + header_size, size_compressed_without_checksum - header_size);
    }

    if (!disable_checksum)
    {
        Checksum checksum;
        ReadBufferFromMemory checksum_in(own_compressed_buffer.data(), sizeof(checksum));
        readBinaryLittleEndian(checksum.low64, checksum_in);
        readBinaryLittleEndian(checksum.high64, checksum_in);

        validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
    }

    ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
    return size_compressed_without_checksum + sizeof(Checksum);
}

/// Read compressed data into compressed_buffer for asynchronous decompression to avoid the situation of "read compressed block across the compressed_in".
size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{
    UInt8 header_size = ICompressionCodec::getHeaderSize();
    /// Make sure the whole header located in 'compressed_in->' buffer.
    if (compressed_in->eof() || (compressed_in->available() < (header_size + sizeof(Checksum))))
        return 0;

    own_compressed_buffer.resize(header_size + sizeof(Checksum));
    compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);

    readHeaderAndGetCodecAndSize(
        own_compressed_buffer.data() + sizeof(Checksum),
        header_size,
        codec,
        size_decompressed,
        size_compressed_without_checksum,
        allow_different_codecs);

    auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();

    /// Make sure the whole compressed block located in 'compressed_in->' buffer.
    /// Otherwise, abandon header and restore original offset of compressed_in
    if (compressed_in->offset() >= header_size + sizeof(Checksum) &&
        compressed_in->available() >= (size_compressed_without_checksum - header_size) + additional_size_at_the_end_of_buffer + sizeof(Checksum))
    {
        compressed_in->position() -= header_size;
        compressed_buffer = compressed_in->position();
        compressed_in->position() += size_compressed_without_checksum;

        if (!disable_checksum)
        {
            Checksum checksum;
            ReadBufferFromMemory checksum_in(own_compressed_buffer.data(), sizeof(checksum));
            readBinaryLittleEndian(checksum.low64, checksum_in);
            readBinaryLittleEndian(checksum.high64, checksum_in);

            validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
        }

        ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
        return size_compressed_without_checksum + sizeof(Checksum);
    }
    else
    {
        compressed_in->position() -= (sizeof(Checksum) + header_size);
        return 0;
    }
}

static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_decompressed, CompressionCodecPtr & codec, bool allow_different_codecs)
{
    ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
    ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);

    uint8_t method = ICompressionCodec::readMethod(compressed_buffer);

    if (!codec)
    {
        codec = CompressionCodecFactory::instance().get(method);
    }
    else if (codec->getMethodByte() != method)
    {
        if (allow_different_codecs)
        {
            codec = CompressionCodecFactory::instance().get(method);
        }
        else
        {
            throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Data compressed with different methods, given method "
                            "byte 0x{}, previous method byte 0x{}",
                            getHexUIntLowercase(method), getHexUIntLowercase(codec->getMethodByte()));
        }
    }
}

void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
    readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
    codec->decompress(compressed_buffer, static_cast<UInt32>(size_compressed_without_checksum), to);
}

void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
    readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);

    if (codec->isNone())
    {
        /// Shortcut for NONE codec to avoid extra memcpy.
        /// We doing it by changing the buffer `to` to point to existing uncompressed data.

        UInt8 header_size = ICompressionCodec::getHeaderSize();
        if (size_compressed_without_checksum < header_size)
            throw Exception(ErrorCodes::CORRUPTED_DATA,
                "Can't decompress data: the compressed data size ({}, this should include header size) is less than the header size ({})",
                    size_compressed_without_checksum, static_cast<size_t>(header_size));

        to = BufferBase::Buffer(compressed_buffer + header_size, compressed_buffer + size_compressed_without_checksum);
    }
    else
        codec->decompress(compressed_buffer, static_cast<UInt32>(size_compressed_without_checksum), to.begin());
}

void CompressedReadBufferBase::flushAsynchronousDecompressRequests() const
{
    if (codec)
        codec->flushAsynchronousDecompressRequests();
}

void CompressedReadBufferBase::setDecompressMode(ICompressionCodec::CodecMode mode) const
{
    if (codec)
        codec->setDecompressMode(mode);
}

/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
    : compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)
{
}


CompressedReadBufferBase::~CompressedReadBufferBase() = default; /// Proper destruction of unique_ptr of forward-declared type.

}