diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-06-06 13:50:07 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-06-06 13:50:07 +0300 |
commit | fd815f4c2cd2485f30c8ab9633c05a9d69ce9703 (patch) | |
tree | 832d94e8841fa2034b59ff0aa071b3405775f060 | |
parent | 89158f6d13da51cf2ce9c11de9f595a20074b068 (diff) | |
download | ydb-fd815f4c2cd2485f30c8ab9633c05a9d69ce9703.tar.gz |
YQ-1037 + bzip2 zstd
ref:d543e8e932a51b24783b5770f98451903cf35408
6 files changed, 189 insertions, 0 deletions
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt index 193bf2039ab..4443fe654c2 100644 --- a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt @@ -22,10 +22,13 @@ target_link_libraries(providers-s3-compressors PUBLIC contrib-libs-fmt libs-poco-Util libs-brotli-dec + contrib-libs-libbz2 contrib-libs-lz4 ) target_sources(providers-s3-compressors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/bzip2.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/lz4io.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/zstd.cpp ) diff --git a/ydb/library/yql/providers/s3/compressors/bzip2.cpp b/ydb/library/yql/providers/s3/compressors/bzip2.cpp new file mode 100644 index 00000000000..072afce0ac6 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/bzip2.cpp @@ -0,0 +1,65 @@ +#include "bzip2.h" + +#include <util/generic/size_literals.h> +#include <ydb/library/yql/utils/yql_panic.h> + +namespace NYql { + +namespace NBzip2 { + +TReadBuffer::TReadBuffer(NDB::ReadBuffer& source) + : NDB::ReadBuffer(nullptr, 0ULL), Source_(source) +{ + InBuffer.resize(8_KB); + OutBuffer.resize(64_KB); + Zero(BzStream_); + InitDecoder(); +} + +TReadBuffer::~TReadBuffer() { + FreeDecoder(); +} + +void TReadBuffer::InitDecoder() { + YQL_ENSURE(BZ2_bzDecompressInit(&BzStream_, 0, 0) == BZ_OK, "Can not init bzip engine."); +} + +void TReadBuffer::FreeDecoder() { + BZ2_bzDecompressEnd(&BzStream_); +} + +bool TReadBuffer::nextImpl() { + BzStream_.next_out = OutBuffer.data(); + BzStream_.avail_out = OutBuffer.size(); + + while (true) { + if (!BzStream_.avail_in) { + BzStream_.next_in = InBuffer.data(); + BzStream_.avail_in = Source_.read(InBuffer.data(), InBuffer.size()); + if (!BzStream_.avail_in) { + set(nullptr, 0ULL); + return false; + } + } + + switch (BZ2_bzDecompress(&BzStream_)) { + case BZ_STREAM_END: + FreeDecoder(); + InitDecoder(); + [[fallthrough]]; + case BZ_OK: + if (const auto processed = OutBuffer.size() - BzStream_.avail_out) { + working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + processed); + return true; + } + + break; + default: + ythrow yexception() << "bzip error"; + } + } +} + +} + +} diff --git a/ydb/library/yql/providers/s3/compressors/bzip2.h b/ydb/library/yql/providers/s3/compressors/bzip2.h new file mode 100644 index 00000000000..41d8f74bce1 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/bzip2.h @@ -0,0 +1,29 @@ +#pragma once + +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> +#include <contrib/libs/libbz2/bzlib.h> + +namespace NYql { + +namespace NBzip2 { + +class TReadBuffer : public NDB::ReadBuffer { +public: + TReadBuffer(NDB::ReadBuffer& source); + ~TReadBuffer(); +private: + bool nextImpl() final; + + NDB::ReadBuffer& Source_; + std::vector<char> InBuffer, OutBuffer; + + bz_stream BzStream_; + + void InitDecoder(); + void FreeDecoder(); +}; + +} + +} + diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp index 48e22599309..772f4203922 100644 --- a/ydb/library/yql/providers/s3/compressors/factory.cpp +++ b/ydb/library/yql/providers/s3/compressors/factory.cpp @@ -1,6 +1,8 @@ #include "factory.h" #include "lz4io.h" #include "brotli.h" +#include "zstd.h" +#include "bzip2.h" namespace NYql { @@ -9,6 +11,10 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const return std::make_unique<NLz4::TReadBuffer>(input); if ("brotli" == compression) return std::make_unique<NBrotli::TReadBuffer>(input); + if ("zstd" == compression) + return std::make_unique<NZstd::TReadBuffer>(input); + if ("bzip2" == compression) + return std::make_unique<NBzip2::TReadBuffer>(input); return nullptr; } diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp new file mode 100644 index 00000000000..8bcd0dd0f57 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp @@ -0,0 +1,59 @@ +#include "zstd.h" + +#include <util/generic/size_literals.h> +#include <ydb/library/yql/utils/yql_panic.h> + +namespace NYql { + +namespace NZstd { + +TReadBuffer::TReadBuffer(NDB::ReadBuffer& source) + : NDB::ReadBuffer(nullptr, 0ULL), Source_(source), ZCtx_(::ZSTD_createDStream()) +{ + InBuffer.resize(8_KB); + OutBuffer.resize(64_KB); + Offset_ = InBuffer.size(); +} + +TReadBuffer::~TReadBuffer() { + ::ZSTD_freeDStream(ZCtx_); +} + +bool TReadBuffer::nextImpl() { + ::ZSTD_inBuffer zIn{InBuffer.data(), InBuffer.size(), Offset_}; + ::ZSTD_outBuffer zOut{OutBuffer.data(), OutBuffer.size(), 0ULL}; + + size_t returnCode = 0ULL; + if (!Finished_) do { + if (zIn.pos == zIn.size) { + zIn.size = Source_.read(InBuffer.data(), InBuffer.size()); + + zIn.pos = Offset_ = 0; + if (!zIn.size) { + // end of stream, need to check that there is no uncompleted blocks + YQL_ENSURE(!returnCode, "Incomplete block."); + Finished_ = true; + break; + } + } + returnCode = ::ZSTD_decompressStream(ZCtx_, &zOut, &zIn); + YQL_ENSURE(!::ZSTD_isError(returnCode), "Decompress failed: " << ::ZSTD_getErrorName(returnCode)); + if (!returnCode) { + // The frame is over, prepare to (maybe) start a new frame + ::ZSTD_initDStream(ZCtx_); + } + } while (zOut.pos != zOut.size); + Offset_ = zIn.pos; + + if (zOut.pos) { + working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + zOut.pos); + return true; + } else { + set(nullptr, 0ULL); + return false; + } +} + +} + +} diff --git a/ydb/library/yql/providers/s3/compressors/zstd.h b/ydb/library/yql/providers/s3/compressors/zstd.h new file mode 100644 index 00000000000..64cce44bd09 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/zstd.h @@ -0,0 +1,27 @@ +#pragma once + +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> +#define ZSTD_STATIC_LINKING_ONLY +#include <contrib/libs/zstd/include/zstd.h> + +namespace NYql { + +namespace NZstd { + +class TReadBuffer : public NDB::ReadBuffer { +public: + TReadBuffer(NDB::ReadBuffer& source); + ~TReadBuffer(); +private: + bool nextImpl() final; + + NDB::ReadBuffer& Source_; + std::vector<char> InBuffer, OutBuffer; + ::ZSTD_DStream *const ZCtx_; + size_t Offset_; + bool Finished_ = false; +}; + +} + +} |