diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-16 14:21:07 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-16 14:21:07 +0300 |
commit | f6c4135c52c0ab15e44686ea64c139b868c81f46 (patch) | |
tree | ff793b083defdeff19853eb70755e00217337d55 | |
parent | a2adf32166121a2e9e6b4113f4c05fdc703d01ca (diff) | |
download | ydb-f6c4135c52c0ab15e44686ea64c139b868c81f46.tar.gz |
Add gzip compression.
6 files changed, 114 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index dcfd8fa145..5eef033cc8 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -50,7 +50,7 @@ namespace { "xz"sv }; constexpr std::array<std::string_view, 1> CompressionsForOutput = { - "lz4"sv + "gzip"sv }; constexpr std::array<std::string_view, 10> IntervalUnits = { "MICROSECONDS"sv, diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp index 2aa02a46bc..c0eedd0f5f 100644 --- a/ydb/library/yql/providers/s3/compressors/factory.cpp +++ b/ydb/library/yql/providers/s3/compressors/factory.cpp @@ -30,6 +30,9 @@ IOutputQueue::TPtr MakeCompressorQueue(const std::string_view& compression) { if ("lz4" == compression) return NLz4::MakeCompressor(); + if ("gzip" == compression) + return NGz::MakeCompressor(); + if (compression.empty()) return std::make_unique<TOutputQueue<5_MB>>(); diff --git a/ydb/library/yql/providers/s3/compressors/gz.cpp b/ydb/library/yql/providers/s3/compressors/gz.cpp index 12ea6ceec2..2b6f893a09 100644 --- a/ydb/library/yql/providers/s3/compressors/gz.cpp +++ b/ydb/library/yql/providers/s3/compressors/gz.cpp @@ -2,6 +2,7 @@ #include <util/generic/size_literals.h> #include <ydb/library/yql/utils/yql_panic.h> +#include "output_queue_impl.h" namespace NYql { @@ -61,6 +62,82 @@ bool TReadBuffer::nextImpl() { } } +namespace { + +class TCompressor : public TOutputQueue<> { +public: + TCompressor(int level) { + Zero(Z_); + YQL_ENSURE(deflateInit2(&Z_, level, Z_DEFLATED, 16 | MAX_WBITS, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY) == Z_OK, "Can not init deflate engine."); + } + + ~TCompressor() { + deflateEnd(&Z_); + } +private: + void Push(TString&& item) override { + InputQueue.Push(std::move(item)); + DoCompression(); + } + + void Seal() override { + InputQueue.Seal(); + DoCompression(); + } + + size_t Size() const override { + return TOutputQueue::Size() + InputQueue.Size(); + } + + bool Empty() const override { + return TOutputQueue::Empty() && InputQueue.Empty(); + } + + size_t Volume() const override { + return TOutputQueue::Volume() + InputQueue.Volume(); + } + + void DoCompression() { + while (true) { + const auto& pop = InputQueue.Pop(); + const bool done = InputQueue.IsSealed() && InputQueue.Empty(); + if (pop.empty() && !done) + break; + + Z_.next_in = const_cast<unsigned char*>(reinterpret_cast<const unsigned char*>(pop.data())); + Z_.avail_in = pop.size(); + + if (const auto bound = deflateBound(&Z_, Z_.avail_in); bound > OutputBufferSize) + OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound); + + Z_.next_out = reinterpret_cast<unsigned char*>(OutputBuffer.get()); + Z_.avail_out = OutputBufferSize; + + const auto code = deflate(&Z_, done ? Z_FINISH : Z_BLOCK); + YQL_ENSURE((done ? Z_STREAM_END : Z_OK) == code, "code: " << code << ", error: " << GetErrMsg(Z_)); + + if (const auto size = OutputBufferSize - Z_.avail_out) + TOutputQueue::Push(TString(OutputBuffer.get(), size)); + + if (done) + return TOutputQueue::Seal(); + }; + } + + std::size_t OutputBufferSize = 0ULL; + std::unique_ptr<char[]> OutputBuffer; + + z_stream Z_; + + TOutputQueue<6_MB, 6_MB> InputQueue; +}; + +} + +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) { + return std::make_unique<TCompressor>(cLevel.value_or(Z_DEFAULT_COMPRESSION)); +} + } } diff --git a/ydb/library/yql/providers/s3/compressors/gz.h b/ydb/library/yql/providers/s3/compressors/gz.h index e73bb7cb16..1532cccc97 100644 --- a/ydb/library/yql/providers/s3/compressors/gz.h +++ b/ydb/library/yql/providers/s3/compressors/gz.h @@ -2,6 +2,7 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> #include <zlib.h> +#include "output_queue.h" namespace NYql { @@ -20,6 +21,8 @@ private: z_stream Z_; }; +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {}); + } } diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp index e0677045ef..cd5558f3d4 100644 --- a/ydb/library/yql/providers/s3/compressors/lz4io.cpp +++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp @@ -165,7 +165,9 @@ size_t TReadBuffer::DecompressLegacy() { return size_t(decodeSize); } -class TCompressor : public TOutputQueue<5_MB> { +namespace { + +class TCompressor : public TOutputQueue<> { static constexpr size_t BlockSize = 4_MB; public: TCompressor(int level) @@ -176,6 +178,7 @@ public: Prefs.frameInfo.blockMode = LZ4F_blockMode_t(1); Prefs.frameInfo.blockSizeID = LZ4F_blockSizeID_t(7); Prefs.frameInfo.blockChecksumFlag = LZ4F_blockChecksum_t(0); + Prefs.frameInfo.contentSize = 0ULL; Prefs.frameInfo.contentChecksumFlag = LZ4F_contentChecksum_t(1); Prefs.favorDecSpeed = 0; @@ -255,6 +258,8 @@ private: bool IsFirstBlock = true; }; +} + IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) { return std::make_unique<TCompressor>(cLevel.value_or(LZ4HC_CLEVEL_DEFAULT)); } diff --git a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h index 42da3683e9..e4b4b19ef8 100644 --- a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h +++ b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h @@ -9,24 +9,41 @@ namespace NYql { -template <size_t MinItemSize = 5_MB, size_t MaxItemSize = 0ULL> +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html +constexpr size_t S3PartUploadMinSize = 5_MB; + +template <size_t MinItemSize = S3PartUploadMinSize, size_t MaxItemSize = 0ULL> class TOutputQueue : public IOutputQueue { public: TOutputQueue() = default; void Push(TString&& item) override { YQL_ENSURE(!Sealed, "Queue is sealed."); - if (!Queue.empty() && Queue.back().size() < MinItemSize) - if constexpr (MaxItemSize > 0ULL) + if (!Queue.empty() && Queue.back().size() < MinItemSize) { + if constexpr (MaxItemSize > 0ULL) { if (Queue.back().size() + item.size() > MaxItemSize) { Queue.back().append(item.substr(0U, MaxItemSize - Queue.back().size())); - Queue.emplace_back(item.substr(item.size() + Queue.back().size() - MaxItemSize)); - } else + item = item.substr(item.size() + Queue.back().size() - MaxItemSize); + } else { Queue.back().append(std::move(item)); - else + item.clear(); + } + } else { Queue.back().append(std::move(item)); - else + item.clear(); + } + } + + if (!item.empty()) { + if constexpr (MaxItemSize > 0ULL) { + while (item.size() > MaxItemSize) { + Queue.emplace_back(item.substr(0U, MaxItemSize)); + item = item.substr(MaxItemSize); + } + } + Queue.emplace_back(std::move(item)); + } } TString Pop() override { |