summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp')
-rw-r--r--contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp331
1 files changed, 331 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp b/contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp
new file mode 100644
index 00000000000..dd19955d010
--- /dev/null
+++ b/contrib/clickhouse/src/Compression/CompressedReadBufferBase.cpp
@@ -0,0 +1,331 @@
+#include "CompressedReadBufferBase.h"
+
+#include <bit>
+#include <cstring>
+#include <cassert>
+#include <city.h>
+#include <Common/ProfileEvents.h>
+#include <Common/Exception.h>
+#include <base/hex.h>
+#include <Compression/ICompressionCodec.h>
+#include <Compression/CompressionFactory.h>
+#include <IO/ReadBuffer.h>
+#include <IO/ReadBufferFromMemory.h>
+#include <IO/BufferWithOwnMemory.h>
+#include <Compression/CompressionInfo.h>
+#include <IO/WriteHelpers.h>
+#include <IO/Operators.h>
+
+
+namespace ProfileEvents
+{
+ extern const Event ReadCompressedBytes;
+ extern const Event CompressedReadBufferBlocks;
+ extern const Event CompressedReadBufferBytes;
+}
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int TOO_LARGE_SIZE_COMPRESSED;
+ extern const int CHECKSUM_DOESNT_MATCH;
+ extern const int CANNOT_DECOMPRESS;
+ extern const int CORRUPTED_DATA;
+}
+
+using Checksum = CityHash_v1_0_2::uint128;
+
+
+/// Validate checksum of data, and if it mismatches, find out possible reason and throw exception.
+static void validateChecksum(char * data, size_t size, const Checksum expected_checksum)
+{
+ auto calculated_checksum = CityHash_v1_0_2::CityHash128(data, size);
+ if (expected_checksum == calculated_checksum)
+ return;
+
+ WriteBufferFromOwnString message;
+
+ /// TODO mess up of endianness in error message.
+ message << "Checksum doesn't match: corrupted data."
+ " Reference: " + getHexUIntLowercase(expected_checksum)
+ + ". Actual: " + getHexUIntLowercase(calculated_checksum)
+ + ". Size of compressed block: " + toString(size);
+
+ const char * message_hardware_failure = "This is most likely due to hardware failure. "
+ "If you receive broken data over network and the error does not repeat every time, "
+ "this can be caused by bad RAM on network interface controller or bad controller itself "
+ "or bad RAM on network switches or bad CPU on network switches "
+ "(look at the logs on related network switches; note that TCP checksums don't help) "
+ "or bad RAM on host (look at dmesg or kern.log for enormous amount of EDAC errors, "
+ "ECC-related reports, Machine Check Exceptions, mcelog; note that ECC memory can fail "
+ "if the number of errors is huge) or bad CPU on host. If you read data from disk, "
+ "this can be caused by disk bit rot. This exception protects ClickHouse "
+ "from data corruption due to hardware failures.";
+
+ auto flip_bit = [](char * buf, size_t pos)
+ {
+ buf[pos / 8] ^= 1 << pos % 8;
+ };
+
+ /// If size is too huge, then this may be caused by corruption.
+ /// And anyway this is pretty heavy, so avoid burning too much CPU here.
+ if (size < (1ULL << 20))
+ {
+ /// We need to copy data from ReadBuffer to flip bits as ReadBuffer should be immutable
+ PODArray<char> tmp_buffer(data, data + size);
+ char * tmp_data = tmp_buffer.data();
+
+ /// Check if the difference caused by single bit flip in data.
+ for (size_t bit_pos = 0; bit_pos < size * 8; ++bit_pos)
+ {
+ flip_bit(tmp_data, bit_pos);
+
+ auto checksum_of_data_with_flipped_bit = CityHash_v1_0_2::CityHash128(tmp_data, size);
+ if (expected_checksum == checksum_of_data_with_flipped_bit)
+ {
+ message << ". The mismatch is caused by single bit flip in data block at byte " << (bit_pos / 8) << ", bit " << (bit_pos % 8) << ". "
+ << message_hardware_failure;
+ throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
+ }
+
+ flip_bit(tmp_data, bit_pos); /// Restore
+ }
+ }
+
+ /// Check if the difference caused by single bit flip in stored checksum.
+ size_t difference = std::popcount(expected_checksum.low64 ^ calculated_checksum.low64)
+ + std::popcount(expected_checksum.high64 ^ calculated_checksum.high64);
+
+ if (difference == 1)
+ {
+ message << ". The mismatch is caused by single bit flip in checksum. "
+ << message_hardware_failure;
+ throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
+ }
+
+ throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
+}
+
+static void readHeaderAndGetCodecAndSize(
+ const char * compressed_buffer,
+ UInt8 header_size,
+ CompressionCodecPtr & codec,
+ size_t & size_decompressed,
+ size_t & size_compressed_without_checksum,
+ bool allow_different_codecs)
+{
+ uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
+
+ if (!codec)
+ {
+ codec = CompressionCodecFactory::instance().get(method);
+ }
+ else if (method != codec->getMethodByte())
+ {
+ if (allow_different_codecs)
+ {
+ codec = CompressionCodecFactory::instance().get(method);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Data compressed with different methods, given method "
+ "byte 0x{}, previous method byte 0x{}",
+ getHexUIntLowercase(method), getHexUIntLowercase(codec->getMethodByte()));
+ }
+ }
+
+ size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(compressed_buffer);
+ size_decompressed = ICompressionCodec::readDecompressedBlockSize(compressed_buffer);
+
+ /// This is for clang static analyzer.
+ assert(size_decompressed > 0);
+
+ if (size_compressed_without_checksum > DBMS_MAX_COMPRESSED_SIZE)
+ throw Exception(ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed_without_checksum: {}. "
+ "Most likely corrupted data.", size_compressed_without_checksum);
+
+ if (size_compressed_without_checksum < header_size)
+ throw Exception(ErrorCodes::CORRUPTED_DATA, "Can't decompress data: "
+ "the compressed data size ({}, this should include header size) is less than the header size ({})",
+ size_compressed_without_checksum, static_cast<size_t>(header_size));
+}
+
+/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
+/// Returns number of compressed bytes read.
+size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy)
+{
+ if (compressed_in->eof())
+ return 0;
+
+ UInt8 header_size = ICompressionCodec::getHeaderSize();
+ own_compressed_buffer.resize(header_size + sizeof(Checksum));
+
+ compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);
+
+ readHeaderAndGetCodecAndSize(
+ own_compressed_buffer.data() + sizeof(Checksum),
+ header_size,
+ codec,
+ size_decompressed,
+ size_compressed_without_checksum,
+ allow_different_codecs);
+
+ auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
+
+ /// Is whole compressed block located in 'compressed_in->' buffer?
+ if (!always_copy &&
+ compressed_in->offset() >= header_size + sizeof(Checksum) &&
+ compressed_in->available() >= (size_compressed_without_checksum - header_size) + additional_size_at_the_end_of_buffer + sizeof(Checksum))
+ {
+ compressed_in->position() -= header_size;
+ compressed_buffer = compressed_in->position();
+ compressed_in->position() += size_compressed_without_checksum;
+ }
+ else
+ {
+ own_compressed_buffer.resize(sizeof(Checksum) + size_compressed_without_checksum + additional_size_at_the_end_of_buffer);
+ compressed_buffer = own_compressed_buffer.data() + sizeof(Checksum);
+ compressed_in->readStrict(compressed_buffer + header_size, size_compressed_without_checksum - header_size);
+ }
+
+ if (!disable_checksum)
+ {
+ Checksum checksum;
+ ReadBufferFromMemory checksum_in(own_compressed_buffer.data(), sizeof(checksum));
+ readBinaryLittleEndian(checksum.low64, checksum_in);
+ readBinaryLittleEndian(checksum.high64, checksum_in);
+
+ validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
+ }
+
+ ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
+ return size_compressed_without_checksum + sizeof(Checksum);
+}
+
+/// Read compressed data into compressed_buffer for asynchronous decompression to avoid the situation of "read compressed block across the compressed_in".
+size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t & size_decompressed, size_t & size_compressed_without_checksum)
+{
+ UInt8 header_size = ICompressionCodec::getHeaderSize();
+ /// Make sure the whole header located in 'compressed_in->' buffer.
+ if (compressed_in->eof() || (compressed_in->available() < (header_size + sizeof(Checksum))))
+ return 0;
+
+ own_compressed_buffer.resize(header_size + sizeof(Checksum));
+ compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);
+
+ readHeaderAndGetCodecAndSize(
+ own_compressed_buffer.data() + sizeof(Checksum),
+ header_size,
+ codec,
+ size_decompressed,
+ size_compressed_without_checksum,
+ allow_different_codecs);
+
+ auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
+
+ /// Make sure the whole compressed block located in 'compressed_in->' buffer.
+ /// Otherwise, abandon header and restore original offset of compressed_in
+ if (compressed_in->offset() >= header_size + sizeof(Checksum) &&
+ compressed_in->available() >= (size_compressed_without_checksum - header_size) + additional_size_at_the_end_of_buffer + sizeof(Checksum))
+ {
+ compressed_in->position() -= header_size;
+ compressed_buffer = compressed_in->position();
+ compressed_in->position() += size_compressed_without_checksum;
+
+ if (!disable_checksum)
+ {
+ Checksum checksum;
+ ReadBufferFromMemory checksum_in(own_compressed_buffer.data(), sizeof(checksum));
+ readBinaryLittleEndian(checksum.low64, checksum_in);
+ readBinaryLittleEndian(checksum.high64, checksum_in);
+
+ validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
+ }
+
+ ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
+ return size_compressed_without_checksum + sizeof(Checksum);
+ }
+ else
+ {
+ compressed_in->position() -= (sizeof(Checksum) + header_size);
+ return 0;
+ }
+}
+
+static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_decompressed, CompressionCodecPtr & codec, bool allow_different_codecs)
+{
+ ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
+ ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
+
+ uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
+
+ if (!codec)
+ {
+ codec = CompressionCodecFactory::instance().get(method);
+ }
+ else if (codec->getMethodByte() != method)
+ {
+ if (allow_different_codecs)
+ {
+ codec = CompressionCodecFactory::instance().get(method);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Data compressed with different methods, given method "
+ "byte 0x{}, previous method byte 0x{}",
+ getHexUIntLowercase(method), getHexUIntLowercase(codec->getMethodByte()));
+ }
+ }
+}
+
+void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
+{
+ readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
+ codec->decompress(compressed_buffer, static_cast<UInt32>(size_compressed_without_checksum), to);
+}
+
+void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum)
+{
+ readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
+
+ if (codec->isNone())
+ {
+ /// Shortcut for NONE codec to avoid extra memcpy.
+ /// We doing it by changing the buffer `to` to point to existing uncompressed data.
+
+ UInt8 header_size = ICompressionCodec::getHeaderSize();
+ if (size_compressed_without_checksum < header_size)
+ throw Exception(ErrorCodes::CORRUPTED_DATA,
+ "Can't decompress data: the compressed data size ({}, this should include header size) is less than the header size ({})",
+ size_compressed_without_checksum, static_cast<size_t>(header_size));
+
+ to = BufferBase::Buffer(compressed_buffer + header_size, compressed_buffer + size_compressed_without_checksum);
+ }
+ else
+ codec->decompress(compressed_buffer, static_cast<UInt32>(size_compressed_without_checksum), to.begin());
+}
+
+void CompressedReadBufferBase::flushAsynchronousDecompressRequests() const
+{
+ if (codec)
+ codec->flushAsynchronousDecompressRequests();
+}
+
+void CompressedReadBufferBase::setDecompressMode(ICompressionCodec::CodecMode mode) const
+{
+ if (codec)
+ codec->setDecompressMode(mode);
+}
+
+/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
+CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
+ : compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)
+{
+}
+
+
+CompressedReadBufferBase::~CompressedReadBufferBase() = default; /// Proper destruction of unique_ptr of forward-declared type.
+
+}