aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-06-06 13:50:07 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-06-06 13:50:07 +0300
commitfd815f4c2cd2485f30c8ab9633c05a9d69ce9703 (patch)
tree832d94e8841fa2034b59ff0aa071b3405775f060
parent89158f6d13da51cf2ce9c11de9f595a20074b068 (diff)
downloadydb-fd815f4c2cd2485f30c8ab9633c05a9d69ce9703.tar.gz
YQ-1037 + bzip2 zstd
ref:d543e8e932a51b24783b5770f98451903cf35408
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt3
-rw-r--r--ydb/library/yql/providers/s3/compressors/bzip2.cpp65
-rw-r--r--ydb/library/yql/providers/s3/compressors/bzip2.h29
-rw-r--r--ydb/library/yql/providers/s3/compressors/factory.cpp6
-rw-r--r--ydb/library/yql/providers/s3/compressors/zstd.cpp59
-rw-r--r--ydb/library/yql/providers/s3/compressors/zstd.h27
6 files changed, 189 insertions, 0 deletions
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
index 193bf2039a..4443fe654c 100644
--- a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
@@ -22,10 +22,13 @@ target_link_libraries(providers-s3-compressors PUBLIC
contrib-libs-fmt
libs-poco-Util
libs-brotli-dec
+ contrib-libs-libbz2
contrib-libs-lz4
)
target_sources(providers-s3-compressors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/bzip2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/lz4io.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/zstd.cpp
)
diff --git a/ydb/library/yql/providers/s3/compressors/bzip2.cpp b/ydb/library/yql/providers/s3/compressors/bzip2.cpp
new file mode 100644
index 0000000000..072afce0ac
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/bzip2.cpp
@@ -0,0 +1,65 @@
+#include "bzip2.h"
+
+#include <util/generic/size_literals.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+namespace NYql {
+
+namespace NBzip2 {
+
+TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
+ : NDB::ReadBuffer(nullptr, 0ULL), Source_(source)
+{
+ InBuffer.resize(8_KB);
+ OutBuffer.resize(64_KB);
+ Zero(BzStream_);
+ InitDecoder();
+}
+
+TReadBuffer::~TReadBuffer() {
+ FreeDecoder();
+}
+
+void TReadBuffer::InitDecoder() {
+ YQL_ENSURE(BZ2_bzDecompressInit(&BzStream_, 0, 0) == BZ_OK, "Can not init bzip engine.");
+}
+
+void TReadBuffer::FreeDecoder() {
+ BZ2_bzDecompressEnd(&BzStream_);
+}
+
+bool TReadBuffer::nextImpl() {
+ BzStream_.next_out = OutBuffer.data();
+ BzStream_.avail_out = OutBuffer.size();
+
+ while (true) {
+ if (!BzStream_.avail_in) {
+ BzStream_.next_in = InBuffer.data();
+ BzStream_.avail_in = Source_.read(InBuffer.data(), InBuffer.size());
+ if (!BzStream_.avail_in) {
+ set(nullptr, 0ULL);
+ return false;
+ }
+ }
+
+ switch (BZ2_bzDecompress(&BzStream_)) {
+ case BZ_STREAM_END:
+ FreeDecoder();
+ InitDecoder();
+ [[fallthrough]];
+ case BZ_OK:
+ if (const auto processed = OutBuffer.size() - BzStream_.avail_out) {
+ working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + processed);
+ return true;
+ }
+
+ break;
+ default:
+ ythrow yexception() << "bzip error";
+ }
+ }
+}
+
+}
+
+}
diff --git a/ydb/library/yql/providers/s3/compressors/bzip2.h b/ydb/library/yql/providers/s3/compressors/bzip2.h
new file mode 100644
index 0000000000..41d8f74bce
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/bzip2.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+#include <contrib/libs/libbz2/bzlib.h>
+
+namespace NYql {
+
+namespace NBzip2 {
+
+class TReadBuffer : public NDB::ReadBuffer {
+public:
+ TReadBuffer(NDB::ReadBuffer& source);
+ ~TReadBuffer();
+private:
+ bool nextImpl() final;
+
+ NDB::ReadBuffer& Source_;
+ std::vector<char> InBuffer, OutBuffer;
+
+ bz_stream BzStream_;
+
+ void InitDecoder();
+ void FreeDecoder();
+};
+
+}
+
+}
+
diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp
index 48e2259930..772f420392 100644
--- a/ydb/library/yql/providers/s3/compressors/factory.cpp
+++ b/ydb/library/yql/providers/s3/compressors/factory.cpp
@@ -1,6 +1,8 @@
#include "factory.h"
#include "lz4io.h"
#include "brotli.h"
+#include "zstd.h"
+#include "bzip2.h"
namespace NYql {
@@ -9,6 +11,10 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const
return std::make_unique<NLz4::TReadBuffer>(input);
if ("brotli" == compression)
return std::make_unique<NBrotli::TReadBuffer>(input);
+ if ("zstd" == compression)
+ return std::make_unique<NZstd::TReadBuffer>(input);
+ if ("bzip2" == compression)
+ return std::make_unique<NBzip2::TReadBuffer>(input);
return nullptr;
}
diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp
new file mode 100644
index 0000000000..8bcd0dd0f5
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp
@@ -0,0 +1,59 @@
+#include "zstd.h"
+
+#include <util/generic/size_literals.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+namespace NYql {
+
+namespace NZstd {
+
+TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
+ : NDB::ReadBuffer(nullptr, 0ULL), Source_(source), ZCtx_(::ZSTD_createDStream())
+{
+ InBuffer.resize(8_KB);
+ OutBuffer.resize(64_KB);
+ Offset_ = InBuffer.size();
+}
+
+TReadBuffer::~TReadBuffer() {
+ ::ZSTD_freeDStream(ZCtx_);
+}
+
+bool TReadBuffer::nextImpl() {
+ ::ZSTD_inBuffer zIn{InBuffer.data(), InBuffer.size(), Offset_};
+ ::ZSTD_outBuffer zOut{OutBuffer.data(), OutBuffer.size(), 0ULL};
+
+ size_t returnCode = 0ULL;
+ if (!Finished_) do {
+ if (zIn.pos == zIn.size) {
+ zIn.size = Source_.read(InBuffer.data(), InBuffer.size());
+
+ zIn.pos = Offset_ = 0;
+ if (!zIn.size) {
+ // end of stream, need to check that there is no uncompleted blocks
+ YQL_ENSURE(!returnCode, "Incomplete block.");
+ Finished_ = true;
+ break;
+ }
+ }
+ returnCode = ::ZSTD_decompressStream(ZCtx_, &zOut, &zIn);
+ YQL_ENSURE(!::ZSTD_isError(returnCode), "Decompress failed: " << ::ZSTD_getErrorName(returnCode));
+ if (!returnCode) {
+ // The frame is over, prepare to (maybe) start a new frame
+ ::ZSTD_initDStream(ZCtx_);
+ }
+ } while (zOut.pos != zOut.size);
+ Offset_ = zIn.pos;
+
+ if (zOut.pos) {
+ working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + zOut.pos);
+ return true;
+ } else {
+ set(nullptr, 0ULL);
+ return false;
+ }
+}
+
+}
+
+}
diff --git a/ydb/library/yql/providers/s3/compressors/zstd.h b/ydb/library/yql/providers/s3/compressors/zstd.h
new file mode 100644
index 0000000000..64cce44bd0
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/zstd.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+#define ZSTD_STATIC_LINKING_ONLY
+#include <contrib/libs/zstd/include/zstd.h>
+
+namespace NYql {
+
+namespace NZstd {
+
+class TReadBuffer : public NDB::ReadBuffer {
+public:
+ TReadBuffer(NDB::ReadBuffer& source);
+ ~TReadBuffer();
+private:
+ bool nextImpl() final;
+
+ NDB::ReadBuffer& Source_;
+ std::vector<char> InBuffer, OutBuffer;
+ ::ZSTD_DStream *const ZCtx_;
+ size_t Offset_;
+ bool Finished_ = false;
+};
+
+}
+
+}