diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-10-06 15:57:01 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-10-06 15:57:01 +0300 |
commit | 822465ea046a777f769e7178d09011f4de30ed1a (patch) | |
tree | 30aace840f7018bb62a543c11b9e78d2a53dfaac | |
parent | e0afdbbbbcbb295e9a4db98e3131b68ce2fef0eb (diff) | |
download | ydb-822465ea046a777f769e7178d09011f4de30ed1a.tar.gz |
+ xz zstd
8 files changed, 224 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 5eef033cc8..8fa68a664c 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -49,8 +49,10 @@ namespace { "bzip2"sv, "xz"sv }; - constexpr std::array<std::string_view, 1> CompressionsForOutput = { - "gzip"sv + constexpr std::array<std::string_view, 3> CompressionsForOutput = { + "gzip"sv, + "zstd"sv, + "xz"sv }; constexpr std::array<std::string_view, 10> IntervalUnits = { "MICROSECONDS"sv, diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp index c0eedd0f5f..1e5f5689e0 100644 --- a/ydb/library/yql/providers/s3/compressors/factory.cpp +++ b/ydb/library/yql/providers/s3/compressors/factory.cpp @@ -29,7 +29,10 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const IOutputQueue::TPtr MakeCompressorQueue(const std::string_view& compression) { if ("lz4" == compression) return NLz4::MakeCompressor(); - + if ("zstd" == compression) + return NZstd::MakeCompressor(); + if ("xz" == compression) + return NXz::MakeCompressor(); if ("gzip" == compression) return NGz::MakeCompressor(); diff --git a/ydb/library/yql/providers/s3/compressors/gz.cpp b/ydb/library/yql/providers/s3/compressors/gz.cpp index 2b6f893a09..b887b00aab 100644 --- a/ydb/library/yql/providers/s3/compressors/gz.cpp +++ b/ydb/library/yql/providers/s3/compressors/gz.cpp @@ -98,7 +98,7 @@ private: } void DoCompression() { - while (true) { + while (!InputQueue.Empty()) { const auto& pop = InputQueue.Pop(); const bool done = InputQueue.IsSealed() && InputQueue.Empty(); if (pop.empty() && !done) @@ -129,7 +129,7 @@ private: z_stream Z_; - TOutputQueue<6_MB, 6_MB> InputQueue; + TOutputQueue<0> InputQueue; }; } diff --git a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h index e4b4b19ef8..f0a88b7a7a 100644 --- a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h +++ b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h @@ -19,18 +19,20 @@ public: void Push(TString&& item) override { YQL_ENSURE(!Sealed, "Queue is sealed."); - if (!Queue.empty() && Queue.back().size() < MinItemSize) { - if constexpr (MaxItemSize > 0ULL) { - if (Queue.back().size() + item.size() > MaxItemSize) { - Queue.back().append(item.substr(0U, MaxItemSize - Queue.back().size())); - item = item.substr(item.size() + Queue.back().size() - MaxItemSize); + if constexpr (MinItemSize > 0ULL) { + if (!Queue.empty() && Queue.back().size() < MinItemSize) { + if constexpr (MaxItemSize > 0ULL) { + if (Queue.back().size() + item.size() > MaxItemSize) { + Queue.back().append(item.substr(0U, MaxItemSize - Queue.back().size())); + item = item.substr(item.size() + Queue.back().size() - MaxItemSize); + } else { + Queue.back().append(std::move(item)); + item.clear(); + } } else { Queue.back().append(std::move(item)); item.clear(); } - } else { - Queue.back().append(std::move(item)); - item.clear(); } } @@ -47,7 +49,7 @@ public: } TString Pop() override { - if (Queue.empty() || !Sealed && Queue.front().size() < MinItemSize) + if (Queue.empty() || 1U == Queue.size() && !Sealed) return {}; auto out = std::move(Queue.front()); diff --git a/ydb/library/yql/providers/s3/compressors/xz.cpp b/ydb/library/yql/providers/s3/compressors/xz.cpp index 33e92a8767..a4ce6318aa 100644 --- a/ydb/library/yql/providers/s3/compressors/xz.cpp +++ b/ydb/library/yql/providers/s3/compressors/xz.cpp @@ -2,6 +2,7 @@ #include <util/generic/size_literals.h> #include <ydb/library/yql/utils/yql_panic.h> +#include "output_queue_impl.h" namespace NYql { @@ -84,6 +85,118 @@ bool TReadBuffer::nextImpl() { } } +namespace { + +class TCompressor : public TOutputQueue<> { +public: + TCompressor(int level) : Strm_(LZMA_STREAM_INIT) { + + + // options for further compression + lzma_options_lzma opt_lzma2; + if (lzma_lzma_preset(&opt_lzma2, level)) + throw yexception() << "lzma preset failed: lzma version: " << LZMA_VERSION_STRING; + + lzma_filter filters[] = { + {.id = LZMA_FILTER_X86, .options = nullptr}, + {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2}, + {.id = LZMA_VLI_UNKNOWN, .options = nullptr}, + }; + + switch (const lzma_ret ret = lzma_stream_encoder(&Strm_, filters, LZMA_CHECK_CRC64)) { + case LZMA_OK: + return; + case LZMA_MEM_ERROR: + throw yexception() << "Memory allocation failed."; + case LZMA_OPTIONS_ERROR: + throw yexception() << "Unsupported decompressor flags."; + default: + throw yexception() << "Unknown error << " << int(ret) << ", possibly a bug."; + } + } + + ~TCompressor() { + lzma_end(&Strm_); + } +private: + void Push(TString&& item) override { + InputQueue.Push(std::move(item)); + DoCompression(); + } + + void Seal() override { + InputQueue.Seal(); + DoCompression(); + } + + size_t Size() const override { + return TOutputQueue::Size() + InputQueue.Size(); + } + + bool Empty() const override { + return TOutputQueue::Empty() && InputQueue.Empty(); + } + + size_t Volume() const override { + return TOutputQueue::Volume() + InputQueue.Volume(); + } + + void DoCompression() { + while (!InputQueue.Empty()) { + const auto& pop = InputQueue.Pop(); + const bool done = InputQueue.IsSealed() && InputQueue.Empty(); + if (pop.empty() && !done) + break; + + if (const auto bound = lzma_stream_buffer_bound(pop.size()); bound > OutputBufferSize) + OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound); + + Strm_.next_in = const_cast<unsigned char*>(reinterpret_cast<const unsigned char*>(pop.data())); + Strm_.avail_in = pop.size(); + + Strm_.next_out = reinterpret_cast<unsigned char*>(OutputBuffer.get()); + Strm_.avail_out = OutputBufferSize; + + const lzma_ret ret = lzma_code(&Strm_, done ? LZMA_FINISH : LZMA_RUN); + + if (const auto size = OutputBufferSize - Strm_.avail_out) + TOutputQueue::Push(TString(OutputBuffer.get(), size)); + + switch (ret) { + case LZMA_OK: + continue; + case LZMA_STREAM_END: + return TOutputQueue::Seal(); + case LZMA_MEM_ERROR: + throw yexception() << "Memory allocation failed."; + case LZMA_FORMAT_ERROR: + throw yexception() << "The input is not in the .xz format."; + case LZMA_OPTIONS_ERROR: + throw yexception() << "Unsupported compression options."; + case LZMA_DATA_ERROR: + throw yexception() << "Compressed file is corrupt."; + case LZMA_BUF_ERROR: + throw yexception() << "Compressed file is truncated or otherwise corrupt."; + default: + throw yexception() << "Unknown error " << int(ret) << ", possibly a bug."; + } + }; + } + + std::size_t OutputBufferSize = 0ULL; + std::unique_ptr<char[]> OutputBuffer; + + lzma_stream Strm_; + + TOutputQueue<0> InputQueue; +}; + +} + +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) { + return std::make_unique<TCompressor>(cLevel.value_or(LZMA_PRESET_DEFAULT)); +} + } } diff --git a/ydb/library/yql/providers/s3/compressors/xz.h b/ydb/library/yql/providers/s3/compressors/xz.h index df38e4bff4..a8302a165c 100644 --- a/ydb/library/yql/providers/s3/compressors/xz.h +++ b/ydb/library/yql/providers/s3/compressors/xz.h @@ -2,6 +2,7 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> #include <contrib/libs/lzma/liblzma/api/lzma.h> +#include "output_queue.h" namespace NYql { @@ -23,6 +24,8 @@ private: bool IsOutFinished_ = false; }; +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {}); + } } diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp index 8bcd0dd0f5..213b68e530 100644 --- a/ydb/library/yql/providers/s3/compressors/zstd.cpp +++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp @@ -2,6 +2,7 @@ #include <util/generic/size_literals.h> #include <ydb/library/yql/utils/yql_panic.h> +#include "output_queue_impl.h" namespace NYql { @@ -54,6 +55,89 @@ bool TReadBuffer::nextImpl() { } } +namespace { + +class TCompressor : public TOutputQueue<> { +public: + TCompressor(int level) + : ZCtx_(::ZSTD_createCStream()) + { + const auto ret = ::ZSTD_initCStream(ZCtx_, level); + YQL_ENSURE(!::ZSTD_isError(ret), "code: " << ret << ", error: " << ::ZSTD_getErrorName(ret)); + } + + ~TCompressor() { + ::ZSTD_freeCStream(ZCtx_); + } +private: + void Push(TString&& item) override { + InputQueue.Push(std::move(item)); + DoCompression(); + } + + void Seal() override { + InputQueue.Seal(); + DoCompression(); + } + + size_t Size() const override { + return TOutputQueue::Size() + InputQueue.Size(); + } + + bool Empty() const override { + return TOutputQueue::Empty() && InputQueue.Empty(); + } + + size_t Volume() const override { + return TOutputQueue::Volume() + InputQueue.Volume(); + } + + void DoCompression() { + while (!InputQueue.Empty()) { + const auto& pop = InputQueue.Pop(); + const bool done = InputQueue.IsSealed() && InputQueue.Empty(); + if (pop.empty() && !done) + break; + + if (const auto bound = ZSTD_compressBound(pop.size()); bound > OutputBufferSize) + OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound); + + if (IsFirstBlock && done) { + const auto size = ::ZSTD_compress2(ZCtx_, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size()); + YQL_ENSURE(!::ZSTD_isError(size), "code: " << size << ", error: " << ::ZSTD_getErrorName(size)); + if (size) + TOutputQueue::Push(TString(OutputBuffer.get(), size)); + } else { + ::ZSTD_inBuffer zIn{pop.data(), pop.size(), 0ULL}; + ::ZSTD_outBuffer zOut{OutputBuffer.get(), OutputBufferSize, 0ULL}; + const auto code = ::ZSTD_compressStream2(ZCtx_, &zOut, &zIn, done ? ZSTD_e_end : ZSTD_e_continue); + YQL_ENSURE(!::ZSTD_isError(code), "code: " << code << ", error: " << ::ZSTD_getErrorName(code)); + + if (zOut.pos) + TOutputQueue::Push(TString(OutputBuffer.get(), zOut.pos)); + } + + IsFirstBlock = false; + if (done) + return TOutputQueue::Seal(); + }; + } + + std::size_t OutputBufferSize = 0ULL; + std::unique_ptr<char[]> OutputBuffer; + + ::ZSTD_CStream *const ZCtx_; + + TOutputQueue<0> InputQueue; + bool IsFirstBlock = true; +}; + +} + +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) { + return std::make_unique<TCompressor>(cLevel.value_or(ZSTD_defaultCLevel())); +} + } } diff --git a/ydb/library/yql/providers/s3/compressors/zstd.h b/ydb/library/yql/providers/s3/compressors/zstd.h index 64cce44bd0..1f72dcc034 100644 --- a/ydb/library/yql/providers/s3/compressors/zstd.h +++ b/ydb/library/yql/providers/s3/compressors/zstd.h @@ -3,6 +3,7 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> #define ZSTD_STATIC_LINKING_ONLY #include <contrib/libs/zstd/include/zstd.h> +#include "output_queue.h" namespace NYql { @@ -22,6 +23,8 @@ private: bool Finished_ = false; }; +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {}); + } } |