aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-10-06 15:57:01 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-10-06 15:57:01 +0300
commit822465ea046a777f769e7178d09011f4de30ed1a (patch)
tree30aace840f7018bb62a543c11b9e78d2a53dfaac
parente0afdbbbbcbb295e9a4db98e3131b68ce2fef0eb (diff)
downloadydb-822465ea046a777f769e7178d09011f4de30ed1a.tar.gz
+ xz zstd
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp6
-rw-r--r--ydb/library/yql/providers/s3/compressors/factory.cpp5
-rw-r--r--ydb/library/yql/providers/s3/compressors/gz.cpp4
-rw-r--r--ydb/library/yql/providers/s3/compressors/output_queue_impl.h20
-rw-r--r--ydb/library/yql/providers/s3/compressors/xz.cpp113
-rw-r--r--ydb/library/yql/providers/s3/compressors/xz.h3
-rw-r--r--ydb/library/yql/providers/s3/compressors/zstd.cpp84
-rw-r--r--ydb/library/yql/providers/s3/compressors/zstd.h3
8 files changed, 224 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 5eef033cc8..8fa68a664c 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -49,8 +49,10 @@ namespace {
"bzip2"sv,
"xz"sv
};
- constexpr std::array<std::string_view, 1> CompressionsForOutput = {
- "gzip"sv
+ constexpr std::array<std::string_view, 3> CompressionsForOutput = {
+ "gzip"sv,
+ "zstd"sv,
+ "xz"sv
};
constexpr std::array<std::string_view, 10> IntervalUnits = {
"MICROSECONDS"sv,
diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp
index c0eedd0f5f..1e5f5689e0 100644
--- a/ydb/library/yql/providers/s3/compressors/factory.cpp
+++ b/ydb/library/yql/providers/s3/compressors/factory.cpp
@@ -29,7 +29,10 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const
IOutputQueue::TPtr MakeCompressorQueue(const std::string_view& compression) {
if ("lz4" == compression)
return NLz4::MakeCompressor();
-
+ if ("zstd" == compression)
+ return NZstd::MakeCompressor();
+ if ("xz" == compression)
+ return NXz::MakeCompressor();
if ("gzip" == compression)
return NGz::MakeCompressor();
diff --git a/ydb/library/yql/providers/s3/compressors/gz.cpp b/ydb/library/yql/providers/s3/compressors/gz.cpp
index 2b6f893a09..b887b00aab 100644
--- a/ydb/library/yql/providers/s3/compressors/gz.cpp
+++ b/ydb/library/yql/providers/s3/compressors/gz.cpp
@@ -98,7 +98,7 @@ private:
}
void DoCompression() {
- while (true) {
+ while (!InputQueue.Empty()) {
const auto& pop = InputQueue.Pop();
const bool done = InputQueue.IsSealed() && InputQueue.Empty();
if (pop.empty() && !done)
@@ -129,7 +129,7 @@ private:
z_stream Z_;
- TOutputQueue<6_MB, 6_MB> InputQueue;
+ TOutputQueue<0> InputQueue;
};
}
diff --git a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h
index e4b4b19ef8..f0a88b7a7a 100644
--- a/ydb/library/yql/providers/s3/compressors/output_queue_impl.h
+++ b/ydb/library/yql/providers/s3/compressors/output_queue_impl.h
@@ -19,18 +19,20 @@ public:
void Push(TString&& item) override {
YQL_ENSURE(!Sealed, "Queue is sealed.");
- if (!Queue.empty() && Queue.back().size() < MinItemSize) {
- if constexpr (MaxItemSize > 0ULL) {
- if (Queue.back().size() + item.size() > MaxItemSize) {
- Queue.back().append(item.substr(0U, MaxItemSize - Queue.back().size()));
- item = item.substr(item.size() + Queue.back().size() - MaxItemSize);
+ if constexpr (MinItemSize > 0ULL) {
+ if (!Queue.empty() && Queue.back().size() < MinItemSize) {
+ if constexpr (MaxItemSize > 0ULL) {
+ if (Queue.back().size() + item.size() > MaxItemSize) {
+ Queue.back().append(item.substr(0U, MaxItemSize - Queue.back().size()));
+ item = item.substr(item.size() + Queue.back().size() - MaxItemSize);
+ } else {
+ Queue.back().append(std::move(item));
+ item.clear();
+ }
} else {
Queue.back().append(std::move(item));
item.clear();
}
- } else {
- Queue.back().append(std::move(item));
- item.clear();
}
}
@@ -47,7 +49,7 @@ public:
}
TString Pop() override {
- if (Queue.empty() || !Sealed && Queue.front().size() < MinItemSize)
+ if (Queue.empty() || 1U == Queue.size() && !Sealed)
return {};
auto out = std::move(Queue.front());
diff --git a/ydb/library/yql/providers/s3/compressors/xz.cpp b/ydb/library/yql/providers/s3/compressors/xz.cpp
index 33e92a8767..a4ce6318aa 100644
--- a/ydb/library/yql/providers/s3/compressors/xz.cpp
+++ b/ydb/library/yql/providers/s3/compressors/xz.cpp
@@ -2,6 +2,7 @@
#include <util/generic/size_literals.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include "output_queue_impl.h"
namespace NYql {
@@ -84,6 +85,118 @@ bool TReadBuffer::nextImpl() {
}
}
+namespace {
+
+class TCompressor : public TOutputQueue<> {
+public:
+ TCompressor(int level) : Strm_(LZMA_STREAM_INIT) {
+
+
+ // options for further compression
+ lzma_options_lzma opt_lzma2;
+ if (lzma_lzma_preset(&opt_lzma2, level))
+ throw yexception() << "lzma preset failed: lzma version: " << LZMA_VERSION_STRING;
+
+ lzma_filter filters[] = {
+ {.id = LZMA_FILTER_X86, .options = nullptr},
+ {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
+ {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
+ };
+
+ switch (const lzma_ret ret = lzma_stream_encoder(&Strm_, filters, LZMA_CHECK_CRC64)) {
+ case LZMA_OK:
+ return;
+ case LZMA_MEM_ERROR:
+ throw yexception() << "Memory allocation failed.";
+ case LZMA_OPTIONS_ERROR:
+ throw yexception() << "Unsupported decompressor flags.";
+ default:
+ throw yexception() << "Unknown error << " << int(ret) << ", possibly a bug.";
+ }
+ }
+
+ ~TCompressor() {
+ lzma_end(&Strm_);
+ }
+private:
+ void Push(TString&& item) override {
+ InputQueue.Push(std::move(item));
+ DoCompression();
+ }
+
+ void Seal() override {
+ InputQueue.Seal();
+ DoCompression();
+ }
+
+ size_t Size() const override {
+ return TOutputQueue::Size() + InputQueue.Size();
+ }
+
+ bool Empty() const override {
+ return TOutputQueue::Empty() && InputQueue.Empty();
+ }
+
+ size_t Volume() const override {
+ return TOutputQueue::Volume() + InputQueue.Volume();
+ }
+
+ void DoCompression() {
+ while (!InputQueue.Empty()) {
+ const auto& pop = InputQueue.Pop();
+ const bool done = InputQueue.IsSealed() && InputQueue.Empty();
+ if (pop.empty() && !done)
+ break;
+
+ if (const auto bound = lzma_stream_buffer_bound(pop.size()); bound > OutputBufferSize)
+ OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound);
+
+ Strm_.next_in = const_cast<unsigned char*>(reinterpret_cast<const unsigned char*>(pop.data()));
+ Strm_.avail_in = pop.size();
+
+ Strm_.next_out = reinterpret_cast<unsigned char*>(OutputBuffer.get());
+ Strm_.avail_out = OutputBufferSize;
+
+ const lzma_ret ret = lzma_code(&Strm_, done ? LZMA_FINISH : LZMA_RUN);
+
+ if (const auto size = OutputBufferSize - Strm_.avail_out)
+ TOutputQueue::Push(TString(OutputBuffer.get(), size));
+
+ switch (ret) {
+ case LZMA_OK:
+ continue;
+ case LZMA_STREAM_END:
+ return TOutputQueue::Seal();
+ case LZMA_MEM_ERROR:
+ throw yexception() << "Memory allocation failed.";
+ case LZMA_FORMAT_ERROR:
+ throw yexception() << "The input is not in the .xz format.";
+ case LZMA_OPTIONS_ERROR:
+ throw yexception() << "Unsupported compression options.";
+ case LZMA_DATA_ERROR:
+ throw yexception() << "Compressed file is corrupt.";
+ case LZMA_BUF_ERROR:
+ throw yexception() << "Compressed file is truncated or otherwise corrupt.";
+ default:
+ throw yexception() << "Unknown error " << int(ret) << ", possibly a bug.";
+ }
+ };
+ }
+
+ std::size_t OutputBufferSize = 0ULL;
+ std::unique_ptr<char[]> OutputBuffer;
+
+ lzma_stream Strm_;
+
+ TOutputQueue<0> InputQueue;
+};
+
+}
+
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) {
+ return std::make_unique<TCompressor>(cLevel.value_or(LZMA_PRESET_DEFAULT));
+}
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/xz.h b/ydb/library/yql/providers/s3/compressors/xz.h
index df38e4bff4..a8302a165c 100644
--- a/ydb/library/yql/providers/s3/compressors/xz.h
+++ b/ydb/library/yql/providers/s3/compressors/xz.h
@@ -2,6 +2,7 @@
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
#include <contrib/libs/lzma/liblzma/api/lzma.h>
+#include "output_queue.h"
namespace NYql {
@@ -23,6 +24,8 @@ private:
bool IsOutFinished_ = false;
};
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {});
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp
index 8bcd0dd0f5..213b68e530 100644
--- a/ydb/library/yql/providers/s3/compressors/zstd.cpp
+++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp
@@ -2,6 +2,7 @@
#include <util/generic/size_literals.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include "output_queue_impl.h"
namespace NYql {
@@ -54,6 +55,89 @@ bool TReadBuffer::nextImpl() {
}
}
+namespace {
+
+class TCompressor : public TOutputQueue<> {
+public:
+ TCompressor(int level)
+ : ZCtx_(::ZSTD_createCStream())
+ {
+ const auto ret = ::ZSTD_initCStream(ZCtx_, level);
+ YQL_ENSURE(!::ZSTD_isError(ret), "code: " << ret << ", error: " << ::ZSTD_getErrorName(ret));
+ }
+
+ ~TCompressor() {
+ ::ZSTD_freeCStream(ZCtx_);
+ }
+private:
+ void Push(TString&& item) override {
+ InputQueue.Push(std::move(item));
+ DoCompression();
+ }
+
+ void Seal() override {
+ InputQueue.Seal();
+ DoCompression();
+ }
+
+ size_t Size() const override {
+ return TOutputQueue::Size() + InputQueue.Size();
+ }
+
+ bool Empty() const override {
+ return TOutputQueue::Empty() && InputQueue.Empty();
+ }
+
+ size_t Volume() const override {
+ return TOutputQueue::Volume() + InputQueue.Volume();
+ }
+
+ void DoCompression() {
+ while (!InputQueue.Empty()) {
+ const auto& pop = InputQueue.Pop();
+ const bool done = InputQueue.IsSealed() && InputQueue.Empty();
+ if (pop.empty() && !done)
+ break;
+
+ if (const auto bound = ZSTD_compressBound(pop.size()); bound > OutputBufferSize)
+ OutputBuffer = std::make_unique<char[]>(OutputBufferSize = bound);
+
+ if (IsFirstBlock && done) {
+ const auto size = ::ZSTD_compress2(ZCtx_, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size());
+ YQL_ENSURE(!::ZSTD_isError(size), "code: " << size << ", error: " << ::ZSTD_getErrorName(size));
+ if (size)
+ TOutputQueue::Push(TString(OutputBuffer.get(), size));
+ } else {
+ ::ZSTD_inBuffer zIn{pop.data(), pop.size(), 0ULL};
+ ::ZSTD_outBuffer zOut{OutputBuffer.get(), OutputBufferSize, 0ULL};
+ const auto code = ::ZSTD_compressStream2(ZCtx_, &zOut, &zIn, done ? ZSTD_e_end : ZSTD_e_continue);
+ YQL_ENSURE(!::ZSTD_isError(code), "code: " << code << ", error: " << ::ZSTD_getErrorName(code));
+
+ if (zOut.pos)
+ TOutputQueue::Push(TString(OutputBuffer.get(), zOut.pos));
+ }
+
+ IsFirstBlock = false;
+ if (done)
+ return TOutputQueue::Seal();
+ };
+ }
+
+ std::size_t OutputBufferSize = 0ULL;
+ std::unique_ptr<char[]> OutputBuffer;
+
+ ::ZSTD_CStream *const ZCtx_;
+
+ TOutputQueue<0> InputQueue;
+ bool IsFirstBlock = true;
+};
+
+}
+
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel) {
+ return std::make_unique<TCompressor>(cLevel.value_or(ZSTD_defaultCLevel()));
+}
+
}
}
diff --git a/ydb/library/yql/providers/s3/compressors/zstd.h b/ydb/library/yql/providers/s3/compressors/zstd.h
index 64cce44bd0..1f72dcc034 100644
--- a/ydb/library/yql/providers/s3/compressors/zstd.h
+++ b/ydb/library/yql/providers/s3/compressors/zstd.h
@@ -3,6 +3,7 @@
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
#define ZSTD_STATIC_LINKING_ONLY
#include <contrib/libs/zstd/include/zstd.h>
+#include "output_queue.h"
namespace NYql {
@@ -22,6 +23,8 @@ private:
bool Finished_ = false;
};
+IOutputQueue::TPtr MakeCompressor(std::optional<int> cLevel = {});
+
}
}