diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Compression/CompressionCodecMultiple.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Compression/CompressionCodecMultiple.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Compression/CompressionCodecMultiple.cpp | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Compression/CompressionCodecMultiple.cpp b/contrib/clickhouse/src/Compression/CompressionCodecMultiple.cpp new file mode 100644 index 00000000000..dba67749e4d --- /dev/null +++ b/contrib/clickhouse/src/Compression/CompressionCodecMultiple.cpp @@ -0,0 +1,139 @@ +#include <Compression/CompressionCodecMultiple.h> +#include <Compression/CompressionInfo.h> +#include <Common/PODArray.h> +#include <base/unaligned.h> +#include <Compression/CompressionFactory.h> +#include <Parsers/ASTExpressionList.h> +#include <Parsers/ASTFunction.h> +#include <IO/WriteHelpers.h> +#include <IO/WriteBufferFromString.h> +#include <IO/Operators.h> +#include <base/hex.h> + + +namespace DB +{ + + +namespace ErrorCodes +{ + extern const int CORRUPTED_DATA; +} + +CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_) + : codecs(codecs_) +{ + ASTs arguments; + for (const auto & codec : codecs) + arguments.push_back(codec->getCodecDesc()); + /// Special case, codec doesn't have name and contain list of codecs. + setCodecDescription("", arguments); +} + +uint8_t CompressionCodecMultiple::getMethodByte() const +{ + return static_cast<uint8_t>(CompressionMethodByte::Multiple); +} + +void CompressionCodecMultiple::updateHash(SipHash & hash) const +{ + for (const auto & codec : codecs) + codec->updateHash(hash); +} + +UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + UInt32 compressed_size = uncompressed_size; + for (const auto & codec : codecs) + compressed_size = codec->getCompressedReserveSize(compressed_size); + + /// TotalCodecs ByteForEachCodec data + return static_cast<UInt32>(sizeof(UInt8) + codecs.size() + compressed_size); +} + +UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + PODArray<char> compressed_buf; + PODArray<char> uncompressed_buf(source, source + source_size); + + dest[0] = static_cast<UInt8>(codecs.size()); + + size_t codecs_byte_pos = 1; + for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos) + { + const auto codec = codecs[idx]; + dest[codecs_byte_pos] = codec->getMethodByte(); + compressed_buf.resize(codec->getCompressedReserveSize(source_size)); + + UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data()); + + uncompressed_buf.swap(compressed_buf); + source_size = size_compressed; + } + + memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size); + + return static_cast<UInt32>(1 + codecs.size() + source_size); +} + +void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const +{ + if (source_size < 1 || !source[0]) + throw Exception(ErrorCodes::CORRUPTED_DATA, "Wrong compression methods list"); + + UInt8 compression_methods_size = source[0]; + + PODArray<char> compressed_buf(&source[compression_methods_size + 1], &source[source_size]); + PODArray<char> uncompressed_buf; + /// Insert all data into compressed buf + source_size -= (compression_methods_size + 1); + + for (int idx = compression_methods_size - 1; idx >= 0; --idx) + { + UInt8 compression_method = source[idx + 1]; + const auto codec = CompressionCodecFactory::instance().get(compression_method); + auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer(); + + compressed_buf.resize(compressed_buf.size() + additional_size_at_the_end_of_buffer); + UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data()); + + if (idx == 0 && uncompressed_size != decompressed_size) + throw Exception(ErrorCodes::CORRUPTED_DATA, "Wrong final decompressed size in codec Multiple, got {}, expected {}", + uncompressed_size, decompressed_size); + + uncompressed_buf.resize(uncompressed_size + additional_size_at_the_end_of_buffer); + codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data()); + uncompressed_buf.swap(compressed_buf); + source_size = uncompressed_size; + } + + memcpy(dest, compressed_buf.data(), decompressed_size); +} + +std::vector<uint8_t> CompressionCodecMultiple::getCodecsBytesFromData(const char * source) +{ + std::vector<uint8_t> result; + uint8_t compression_methods_size = source[0]; + for (size_t i = 0; i < compression_methods_size; ++i) + result.push_back(source[1 + i]); + return result; +} + +bool CompressionCodecMultiple::isCompression() const +{ + for (const auto & codec : codecs) + if (codec->isCompression()) + return true; + return false; +} + + +void registerCodecMultiple(CompressionCodecFactory & factory) +{ + factory.registerSimpleCompressionCodec("Multiple", static_cast<UInt8>(CompressionMethodByte::Multiple), [&] () + { + return std::make_shared<CompressionCodecMultiple>(); + }); +} + +} |
