aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-10-07 12:08:29 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-10-07 12:08:29 +0300
commit67090ca0f04c5c866c38abf31b98de061025c8d4 (patch)
tree45ca9b79656e47001923767bce675a1491cd3a66
parentfd64585de261a8b2057fc53be5ca400b20bbe476 (diff)
downloadydb-67090ca0f04c5c866c38abf31b98de061025c8d4.tar.gz
+ brotli
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp3
-rw-r--r--ydb/library/yql/providers/s3/compressors/brotli.cpp88
-rw-r--r--ydb/library/yql/providers/s3/compressors/brotli.h3
-rw-r--r--ydb/library/yql/providers/s3/compressors/factory.cpp2
4 files changed, 95 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 8fa68a664c..5fe4578d83 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -49,8 +49,9 @@ namespace {
"bzip2"sv,
"xz"sv
};
- constexpr std::array<std::string_view, 3> CompressionsForOutput = {
+ constexpr std::array<std::string_view, 4> CompressionsForOutput = {
"gzip"sv,
+ "brotli"sv,
"zstd"sv,
"xz"sv
};
diff --git a/ydb/library/yql/providers/s3/compressors/brotli.cpp b/ydb/library/yql/providers/s3/compressors/brotli.cpp
index c88bc86657..d510d885df 100644
--- a/ydb/library/yql/providers/s3/compressors/brotli.cpp
+++ b/ydb/library/yql/providers/s3/compressors/brotli.cpp
@@ -1,7 +1,9 @@
#include "brotli.h"
+#include "output_queue_impl.h"
#include <util/generic/size_literals.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <contrib/libs/brotli/include/brotli/encode.h>
namespace NYql {
@@ -102,6 +104,92 @@ void TReadBuffer::FreeDecoder() {
BrotliDecoderDestroyInstance(DecoderState_);
}
+namespace {
+
+class TCompressor : public TOutputQueue<> {
+public:
+ TCompressor(int quiality)
+ : EncoderState_(BrotliEncoderCreateInstance(&TAllocator::Allocate, &TAllocator::Deallocate, nullptr)), Quiality_(quiality) {
+ YQL_ENSURE(EncoderState_, "Brotli encoder initialization failed.");
+ YQL_ENSURE(BrotliEncoderSetParameter(EncoderState_, BROTLI_PARAM_QUALITY, Quiality_), "Failed to set quility: " << Quiality_);
+ YQL_ENSURE(BrotliEncoderSetParameter(EncoderState_, BROTLI_PARAM_LGWIN, BROTLI_DEFAULT_WINDOW), "Failed to set window bits.");
+ YQL_ENSURE(BrotliEncoderSetParameter(EncoderState_, BROTLI_PARAM_MODE, BROTLI_DEFAULT_MODE), "Failed to set mode.");
+ }
+
+ ~TCompressor() {
+ BrotliEncoderDestroyInstance(EncoderState_);
+ }
+private:
+ void Push(TString&& item) override {
+ InputQueue.Push(std::move(item));
+ DoCompression();
+ }
+
+ void Seal() override {
+ InputQueue.Seal();
+ DoCompression();
+ }
+
+ size_t Size() const override {
+ return TOutputQueue::Size() + InputQueue.Size();
+ }
+
+ bool Empty() const override {
+ return TOutputQueue::Empty() && InputQueue.Empty();
+ }
+
+ size_t Volume() const override {
+ return TOutputQueue::Volume() + InputQueue.Volume();
+ }
+
+ void DoCompression() {
+ while (!InputQueue.Empty()) {
+ const auto& pop = InputQueue.Pop();
+ const bool done = InputQueue.IsSealed() && InputQueue.Empty();
+ if (pop.empty() && !done)
+ break;
+
+ size_t input_size = pop.size();
+ auto input_data = reinterpret_cast<const uint8_t*>(pop.data());
+ if (const auto bound = BrotliEncoderMaxCompressedSize(input_size); bound > OutputBufferSize)
+ OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound);
+
+ auto output_data = reinterpret_cast<uint8_t*>(OutputBuffer.get());
+ auto output_size = OutputBufferSize;
+
+ if (IsFirstBlock && done) {
+ YQL_ENSURE(BrotliEncoderCompress(Quiality_, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_size, input_data, &output_size, output_data), "Encode failed.");
+ if (output_size)
+ TOutputQueue::Push(TString(OutputBuffer.get(), output_size));
+ } else {
+ size_t total_size = 0ULL;
+ YQL_ENSURE(BrotliEncoderCompressStream(EncoderState_, done ? BROTLI_OPERATION_FINISH : BROTLI_OPERATION_PROCESS, &input_size, &input_data, &output_size, &output_data, &total_size), "Encode failed.");
+ if (const auto size = OutputBufferSize - output_size)
+ TOutputQueue::Push(TString(OutputBuffer.get(), size));
+
+ }
+
+ IsFirstBlock = false;
+ if (done)
+ return TOutputQueue::Seal();
+ };
+ }
+
+ std::size_t OutputBufferSize = 0ULL;
+ std::unique_ptr<char[]> OutputBuffer;
+
+ BrotliEncoderState *const EncoderState_;
+ const int Quiality_;
+
+ TOutputQueue<0> InputQueue;
+ bool IsFirstBlock = true;
+};
+
+}
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) {
+ return std::make_unique<TCompressor>(cLevel.value_or(BROTLI_DEFAULT_QUALITY));
+}
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/brotli.h b/ydb/library/yql/providers/s3/compressors/brotli.h
index 54b3894b8b..eb92778971 100644
--- a/ydb/library/yql/providers/s3/compressors/brotli.h
+++ b/ydb/library/yql/providers/s3/compressors/brotli.h
@@ -2,6 +2,7 @@
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
#include <contrib/libs/brotli/include/brotli/decode.h>
+#include "output_queue.h"
namespace NYql {
@@ -27,6 +28,8 @@ private:
void FreeDecoder();
};
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {});
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp
index 1e5f5689e0..7c9ac6a61f 100644
--- a/ydb/library/yql/providers/s3/compressors/factory.cpp
+++ b/ydb/library/yql/providers/s3/compressors/factory.cpp
@@ -29,6 +29,8 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const
IOutputQueue::TPtr MakeCompressorQueue(const std::string_view& compression) {
if ("lz4" == compression)
return NLz4::MakeCompressor();
+ if ("brotli" == compression)
+ return NBrotli::MakeCompressor();
if ("zstd" == compression)
return NZstd::MakeCompressor();
if ("xz" == compression)