aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-16 14:21:07 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-16 14:21:07 +0300
commitf6c4135c52c0ab15e44686ea64c139b868c81f46 (patch)
treeff793b083defdeff19853eb70755e00217337d55
parenta2adf32166121a2e9e6b4113f4c05fdc703d01ca (diff)
downloadydb-f6c4135c52c0ab15e44686ea64c139b868c81f46.tar.gz
Add gzip compression.
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp2
-rw-r--r--ydb/library/yql/providers/s3/compressors/factory.cpp3
-rw-r--r--ydb/library/yql/providers/s3/compressors/gz.cpp77
-rw-r--r--ydb/library/yql/providers/s3/compressors/gz.h3
-rw-r--r--ydb/library/yql/providers/s3/compressors/lz4io.cpp7
-rw-r--r--ydb/library/yql/providers/s3/compressors/output_queue_impl.h31
6 files changed, 114 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index dcfd8fa145..5eef033cc8 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -50,7 +50,7 @@ namespace {
"xz"sv
};
constexpr std::array<std::string_view, 1> CompressionsForOutput = {
- "lz4"sv
+ "gzip"sv
};
constexpr std::array<std::string_view, 10> IntervalUnits = {
"MICROSECONDS"sv,
diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp
index 2aa02a46bc..c0eedd0f5f 100644
--- a/ydb/library/yql/providers/s3/compressors/factory.cpp
+++ b/ydb/library/yql/providers/s3/compressors/factory.cpp
@@ -30,6 +30,9 @@ IOutputQueue::TPtr MakeCompressorQueue(const std::string_view& compression) {
if ("lz4" == compression)
return NLz4::MakeCompressor();
+ if ("gzip" == compression)
+ return NGz::MakeCompressor();
+
if (compression.empty())
return std::make_unique<TOutputQueue<5_MB>>();
diff --git a/ydb/library/yql/providers/s3/compressors/gz.cpp b/ydb/library/yql/providers/s3/compressors/gz.cpp
index 12ea6ceec2..2b6f893a09 100644
--- a/ydb/library/yql/providers/s3/compressors/gz.cpp
+++ b/ydb/library/yql/providers/s3/compressors/gz.cpp
@@ -2,6 +2,7 @@
#include <util/generic/size_literals.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include "output_queue_impl.h"
namespace NYql {
@@ -61,6 +62,82 @@ bool TReadBuffer::nextImpl() {
}
}
+namespace {
+
+class TCompressor : public TOutputQueue<> {
+public:
+ TCompressor(int level) {
+ Zero(Z_);
+ YQL_ENSURE(deflateInit2(&Z_, level, Z_DEFLATED, 16 | MAX_WBITS, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY) == Z_OK, "Can not init deflate engine.");
+ }
+
+ ~TCompressor() {
+ deflateEnd(&Z_);
+ }
+private:
+ void Push(TString&& item) override {
+ InputQueue.Push(std::move(item));
+ DoCompression();
+ }
+
+ void Seal() override {
+ InputQueue.Seal();
+ DoCompression();
+ }
+
+ size_t Size() const override {
+ return TOutputQueue::Size() + InputQueue.Size();
+ }
+
+ bool Empty() const override {
+ return TOutputQueue::Empty() && InputQueue.Empty();
+ }
+
+ size_t Volume() const override {
+ return TOutputQueue::Volume() + InputQueue.Volume();
+ }
+
+ void DoCompression() {
+ while (true) {
+ const auto& pop = InputQueue.Pop();
+ const bool done = InputQueue.IsSealed() && InputQueue.Empty();
+ if (pop.empty() && !done)
+ break;
+
+ Z_.next_in = const_cast<unsigned char*>(reinterpret_cast<const unsigned char*>(pop.data()));
+ Z_.avail_in = pop.size();
+
+ if (const auto bound = deflateBound(&Z_, Z_.avail_in); bound > OutputBufferSize)
+ OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound);
+
+ Z_.next_out = reinterpret_cast<unsigned char*>(OutputBuffer.get());
+ Z_.avail_out = OutputBufferSize;
+
+ const auto code = deflate(&Z_, done ? Z_FINISH : Z_BLOCK);
+ YQL_ENSURE((done ? Z_STREAM_END : Z_OK) == code, "code: " << code << ", error: " << GetErrMsg(Z_));
+
+ if (const auto size = OutputBufferSize - Z_.avail_out)
+ TOutputQueue::Push(TString(OutputBuffer.get(), size));
+
+ if (done)
+ return TOutputQueue::Seal();
+ };
+ }
+
+ std::size_t OutputBufferSize = 0ULL;
+ std::unique_ptr<char[]> OutputBuffer;
+
+ z_stream Z_;
+
+ TOutputQueue<6_MB, 6_MB> InputQueue;
+};
+
+}
+
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) {
+ return std::make_unique<TCompressor>(cLevel.value_or(Z_DEFAULT_COMPRESSION));
+}
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/gz.h b/ydb/library/yql/providers/s3/compressors/gz.h
index e73bb7cb16..1532cccc97 100644
--- a/ydb/library/yql/providers/s3/compressors/gz.h
+++ b/ydb/library/yql/providers/s3/compressors/gz.h
@@ -2,6 +2,7 @@
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
#include <zlib.h>
+#include "output_queue.h"
namespace NYql {
@@ -20,6 +21,8 @@ private:
z_stream Z_;
};
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {});
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
index e0677045ef..cd5558f3d4 100644
--- a/ydb/library/yql/providers/s3/compressors/lz4io.cpp
+++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
@@ -165,7 +165,9 @@ size_t TReadBuffer::DecompressLegacy() {
return size_t(decodeSize);
}
-class TCompressor : public TOutputQueue<5_MB> {
+namespace {
+
+class TCompressor : public TOutputQueue<> {
static constexpr size_t BlockSize = 4_MB;
public:
TCompressor(int level)
@@ -176,6 +178,7 @@ public:
Prefs.frameInfo.blockMode = LZ4F_blockMode_t(1);
Prefs.frameInfo.blockSizeID = LZ4F_blockSizeID_t(7);
Prefs.frameInfo.blockChecksumFlag = LZ4F_blockChecksum_t(0);
+ Prefs.frameInfo.contentSize = 0ULL;
Prefs.frameInfo.contentChecksumFlag = LZ4F_contentChecksum_t(1);
Prefs.favorDecSpeed = 0;
@@ -255,6 +258,8 @@ private:
bool IsFirstBlock = true;
};
+}
+
IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) {
return std::make_unique<TCompressor>(cLevel.value_or(LZ4HC_CLEVEL_DEFAULT));
}
diff --git a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h
index 42da3683e9..e4b4b19ef8 100644
--- a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h
+++ b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h
@@ -9,24 +9,41 @@
namespace NYql {
-template <size_t MinItemSize = 5_MB, size_t MaxItemSize = 0ULL>
+// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
+constexpr size_t S3PartUploadMinSize = 5_MB;
+
+template <size_t MinItemSize = S3PartUploadMinSize, size_t MaxItemSize = 0ULL>
class TOutputQueue : public IOutputQueue {
public:
TOutputQueue() = default;
void Push(TString&& item) override {
YQL_ENSURE(!Sealed, "Queue is sealed.");
- if (!Queue.empty() && Queue.back().size() < MinItemSize)
- if constexpr (MaxItemSize > 0ULL)
+ if (!Queue.empty() && Queue.back().size() < MinItemSize) {
+ if constexpr (MaxItemSize > 0ULL) {
if (Queue.back().size() + item.size() > MaxItemSize) {
Queue.back().append(item.substr(0U, MaxItemSize - Queue.back().size()));
- Queue.emplace_back(item.substr(item.size() + Queue.back().size() - MaxItemSize));
- } else
+ item = item.substr(item.size() + Queue.back().size() - MaxItemSize);
+ } else {
Queue.back().append(std::move(item));
- else
+ item.clear();
+ }
+ } else {
Queue.back().append(std::move(item));
- else
+ item.clear();
+ }
+ }
+
+ if (!item.empty()) {
+ if constexpr (MaxItemSize > 0ULL) {
+ while (item.size() > MaxItemSize) {
+ Queue.emplace_back(item.substr(0U, MaxItemSize));
+ item = item.substr(MaxItemSize);
+ }
+ }
+
Queue.emplace_back(std::move(item));
+ }
}
TString Pop() override {