diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-10-07 12:08:29 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-10-07 12:08:29 +0300 |
commit | 67090ca0f04c5c866c38abf31b98de061025c8d4 (patch) | |
tree | 45ca9b79656e47001923767bce675a1491cd3a66 | |
parent | fd64585de261a8b2057fc53be5ca400b20bbe476 (diff) | |
download | ydb-67090ca0f04c5c866c38abf31b98de061025c8d4.tar.gz |
+ brotli
4 files changed, 95 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 8fa68a664c..5fe4578d83 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -49,8 +49,9 @@ namespace { "bzip2"sv, "xz"sv }; - constexpr std::array<std::string_view, 3> CompressionsForOutput = { + constexpr std::array<std::string_view, 4> CompressionsForOutput = { "gzip"sv, + "brotli"sv, "zstd"sv, "xz"sv }; diff --git a/ydb/library/yql/providers/s3/compressors/brotli.cpp b/ydb/library/yql/providers/s3/compressors/brotli.cpp index c88bc86657..d510d885df 100644 --- a/ydb/library/yql/providers/s3/compressors/brotli.cpp +++ b/ydb/library/yql/providers/s3/compressors/brotli.cpp @@ -1,7 +1,9 @@ #include "brotli.h" +#include "output_queue_impl.h" #include <util/generic/size_literals.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <contrib/libs/brotli/include/brotli/encode.h> namespace NYql { @@ -102,6 +104,92 @@ void TReadBuffer::FreeDecoder() { BrotliDecoderDestroyInstance(DecoderState_); } +namespace { + +class TCompressor : public TOutputQueue<> { +public: + TCompressor(int quiality) + : EncoderState_(BrotliEncoderCreateInstance(&TAllocator::Allocate, &TAllocator::Deallocate, nullptr)), Quiality_(quiality) { + YQL_ENSURE(EncoderState_, "Brotli encoder initialization failed."); + YQL_ENSURE(BrotliEncoderSetParameter(EncoderState_, BROTLI_PARAM_QUALITY, Quiality_), "Failed to set quility: " << Quiality_); + YQL_ENSURE(BrotliEncoderSetParameter(EncoderState_, BROTLI_PARAM_LGWIN, BROTLI_DEFAULT_WINDOW), "Failed to set window bits."); + YQL_ENSURE(BrotliEncoderSetParameter(EncoderState_, BROTLI_PARAM_MODE, BROTLI_DEFAULT_MODE), "Failed to set mode."); + } + + ~TCompressor() { + BrotliEncoderDestroyInstance(EncoderState_); + } +private: + void Push(TString&& item) override { + InputQueue.Push(std::move(item)); + DoCompression(); + } + + void Seal() override { + InputQueue.Seal(); + DoCompression(); + } + + size_t Size() const override { + return TOutputQueue::Size() + InputQueue.Size(); + } + + bool Empty() const override { + return TOutputQueue::Empty() && InputQueue.Empty(); + } + + size_t Volume() const override { + return TOutputQueue::Volume() + InputQueue.Volume(); + } + + void DoCompression() { + while (!InputQueue.Empty()) { + const auto& pop = InputQueue.Pop(); + const bool done = InputQueue.IsSealed() && InputQueue.Empty(); + if (pop.empty() && !done) + break; + + size_t input_size = pop.size(); + auto input_data = reinterpret_cast<const uint8_t*>(pop.data()); + if (const auto bound = BrotliEncoderMaxCompressedSize(input_size); bound > OutputBufferSize) + OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound); + + auto output_data = reinterpret_cast<uint8_t*>(OutputBuffer.get()); + auto output_size = OutputBufferSize; + + if (IsFirstBlock && done) { + YQL_ENSURE(BrotliEncoderCompress(Quiality_, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_size, input_data, &output_size, output_data), "Encode failed."); + if (output_size) + TOutputQueue::Push(TString(OutputBuffer.get(), output_size)); + } else { + size_t total_size = 0ULL; + YQL_ENSURE(BrotliEncoderCompressStream(EncoderState_, done ? BROTLI_OPERATION_FINISH : BROTLI_OPERATION_PROCESS, &input_size, &input_data, &output_size, &output_data, &total_size), "Encode failed."); + if (const auto size = OutputBufferSize - output_size) + TOutputQueue::Push(TString(OutputBuffer.get(), size)); + + } + + IsFirstBlock = false; + if (done) + return TOutputQueue::Seal(); + }; + } + + std::size_t OutputBufferSize = 0ULL; + std::unique_ptr<char[]> OutputBuffer; + + BrotliEncoderState *const EncoderState_; + const int Quiality_; + + TOutputQueue<0> InputQueue; + bool IsFirstBlock = true; +}; + +} +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) { + return std::make_unique<TCompressor>(cLevel.value_or(BROTLI_DEFAULT_QUALITY)); +} + } } diff --git a/ydb/library/yql/providers/s3/compressors/brotli.h b/ydb/library/yql/providers/s3/compressors/brotli.h index 54b3894b8b..eb92778971 100644 --- a/ydb/library/yql/providers/s3/compressors/brotli.h +++ b/ydb/library/yql/providers/s3/compressors/brotli.h @@ -2,6 +2,7 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> #include <contrib/libs/brotli/include/brotli/decode.h> +#include "output_queue.h" namespace NYql { @@ -27,6 +28,8 @@ private: void FreeDecoder(); }; +IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {}); + } } diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp index 1e5f5689e0..7c9ac6a61f 100644 --- a/ydb/library/yql/providers/s3/compressors/factory.cpp +++ b/ydb/library/yql/providers/s3/compressors/factory.cpp @@ -29,6 +29,8 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const IOutputQueue::TPtr MakeCompressorQueue(const std::string_view& compression) { if ("lz4" == compression) return NLz4::MakeCompressor(); + if ("brotli" == compression) + return NBrotli::MakeCompressor(); if ("zstd" == compression) return NZstd::MakeCompressor(); if ("xz" == compression) |