aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-06-03 15:17:54 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-06-03 15:17:54 +0300
commit950f1f0bb785be2b41cf4916ca94a3d24f29fdda (patch)
tree41a02aa37059bc2de9ce8bc4e14323f84c63376b
parentb2c13c5016730c435d1108087ee23caf5ae37024 (diff)
downloadydb-950f1f0bb785be2b41cf4916ca94a3d24f29fdda.tar.gz
YQ-1037 lz4 support draft.
ref:13a14e17f32616cc493f4af5d7d93769582128e6
-rw-r--r--CMakeLists.darwin.txt1
-rw-r--r--CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp19
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt14
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt29
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.txt13
-rw-r--r--ydb/library/yql/providers/s3/compressors/factory.cpp14
-rw-r--r--ydb/library/yql/providers/s3/compressors/factory.h9
-rw-r--r--ydb/library/yql/providers/s3/compressors/lz4io.cpp174
-rw-r--r--ydb/library/yql/providers/s3/compressors/lz4io.h38
11 files changed, 306 insertions, 7 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index f7ebb70ba15..526e6e77beb 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -1216,6 +1216,7 @@ add_subdirectory(ydb/library/yql/parser/pg_catalog/ut)
add_subdirectory(ydb/library/yql/parser/lexer_common/ut)
add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
+add_subdirectory(ydb/library/yql/providers/s3/compressors)
add_subdirectory(ydb/library/yql/providers/function/common)
add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
add_subdirectory(ydb/library/yql/providers/function/gateway)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 061c06825c8..225a01df147 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -906,6 +906,7 @@ add_subdirectory(contrib/libs/expat)
add_subdirectory(contrib/libs/poco/Foundation)
add_subdirectory(contrib/libs/poco/JSON)
add_subdirectory(contrib/libs/poco/XML)
+add_subdirectory(ydb/library/yql/providers/s3/compressors)
add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client)
add_subdirectory(ydb/library/yql/public/udf/support)
add_subdirectory(contrib/restricted/boost/libs/program_options)
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
index 1ca4231aa67..5127a2fcbc2 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
@@ -28,6 +28,7 @@ target_link_libraries(providers-s3-actors PUBLIC
dq-actors-compute
providers-common-http_gateway
providers-s3-proto
+ providers-s3-compressors
clickhouse_client_udf
)
target_sources(providers-s3-actors PRIVATE
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 3c47e5d9632..ee5c34cc129 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -30,6 +30,7 @@
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/providers/s3/compressors/factory.h>
#include <ydb/library/yql/providers/s3/proto/range.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -329,7 +330,9 @@ public:
private:
void Run() final try {
TReadBufferFromStream buffer(this);
- NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings));
+ const auto decompress(MakeDecompressor(buffer, ReadSpec->Compression));
+ YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " <<ReadSpec->Compression << " compression.");
+ NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings));
while (auto block = stream.read())
Send(SourceActorId, new TEvPrivate::TEvNextBlock(block));
@@ -653,14 +656,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
readSpec->Compression = it->second;
#define SUPPORTED_FLAGS(xx) \
- xx(skip_unknown_fields) \
- xx(import_nested_json) \
- xx(with_names_use_header) \
- xx(null_as_default) \
+ xx(skip_unknown_fields, true) \
+ xx(import_nested_json, true) \
+ xx(with_names_use_header, true) \
+ xx(null_as_default, true) \
-#define SET_FLAG(flag) \
+#define SET_FLAG(flag, def) \
if (const auto it = settings.find(#flag); settings.cend() != it) \
- readSpec->Settings.flag = FromString<bool>(it->second);
+ readSpec->Settings.flag = FromString<bool>(it->second); \
+ else \
+ readSpec->Settings.flag = def;
SUPPORTED_FLAGS(SET_FLAG)
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..c3190798097
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt
@@ -0,0 +1,14 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-compressors INTERFACE)
+target_link_libraries(providers-s3-compressors INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
new file mode 100644
index 00000000000..3dffdb211fd
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
@@ -0,0 +1,29 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-compressors)
+target_compile_options(providers-s3-compressors PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(providers-s3-compressors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src
+)
+target_link_libraries(providers-s3-compressors PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ libs-poco-Util
+ contrib-libs-lz4
+)
+target_sources(providers-s3-compressors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/lz4io.cpp
+)
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.txt
new file mode 100644
index 00000000000..a681d385f3e
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp
new file mode 100644
index 00000000000..7a4961ac924
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/factory.cpp
@@ -0,0 +1,14 @@
+#include "factory.h"
+#include "lz4io.h"
+
+namespace NYql {
+
+std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const std::string_view& compression) {
+ if ("lz4" == compression)
+ return std::make_unique<NLz4::TReadBuffer>(input);
+
+ return nullptr;
+}
+
+}
+
diff --git a/ydb/library/yql/providers/s3/compressors/factory.h b/ydb/library/yql/providers/s3/compressors/factory.h
new file mode 100644
index 00000000000..8555c107c78
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/factory.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+
+namespace NYql {
+
+std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const std::string_view& compression);
+
+}
diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
new file mode 100644
index 00000000000..0d197ee970c
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
@@ -0,0 +1,174 @@
+#include "lz4io.h"
+
+#include <util/generic/scope.h>
+#include <util/generic/size_literals.h>
+#include <util/stream/output.h>
+#include <util/stream/input.h>
+#include <util/string/builder.h>
+
+#include <contrib/libs/lz4/lz4.h>
+#include <contrib/libs/lz4/lz4hc.h>
+
+#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/public/udf/udf_allocator.h>
+#include <ydb/library/yql/public/udf/udf_terminator.h>
+
+namespace NYql {
+
+namespace NLz4 {
+
+namespace {
+
+constexpr ui32 MagicNumberSize = 4U;
+constexpr ui32 Lz4ioMagicNumber = 0x184D2204U;
+constexpr ui32 LegacyMagicNumber = 0x184C2102U;
+
+constexpr size_t LegacyBlockSize = 8_MB;
+
+void WriteLE32 (void* p, ui32 value32)
+{
+ const auto dstPtr = static_cast<unsigned char*>(p);
+ dstPtr[0] = (unsigned char)value32;
+ dstPtr[1] = (unsigned char)(value32 >> 8U);
+ dstPtr[2] = (unsigned char)(value32 >> 16U);
+ dstPtr[3] = (unsigned char)(value32 >> 24U);
+}
+
+ui32 ReadLE32 (const void* s) {
+ const auto srcPtr = static_cast<const unsigned char*>(s);
+ ui32 value32 = srcPtr[0];
+ value32 += (ui32)srcPtr[1] << 8U;
+ value32 += (ui32)srcPtr[2] << 16U;
+ value32 += (ui32)srcPtr[3] << 24U;
+ return value32;
+}
+
+constexpr size_t BufferSize = 64_KB;
+
+EStreamType CheckMagic(const void* data) {
+ switch (ReadLE32(data)) {
+ case Lz4ioMagicNumber:
+ return EStreamType::Frame;
+ case LegacyMagicNumber:
+ return EStreamType::Legacy;
+ default:
+ return EStreamType::Unknown;
+ }
+}
+
+EStreamType CheckMagic(NDB::ReadBuffer& input) {
+ char data[4u];
+ YQL_ENSURE(input.read(data, sizeof(data)) == sizeof(data), "Buffer too small.");
+ return CheckMagic(data);
+}
+
+}
+
+TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
+ : NDB::ReadBuffer(nullptr, 0ULL), Source(source), StreamType(CheckMagic(Source)), Pos(0ULL), Remaining(0ULL)
+{
+ YQL_ENSURE(StreamType != EStreamType::Unknown, "Wrong magic.");
+ if (StreamType == EStreamType::Frame) {
+ const auto errorCode = LZ4F_createDecompressionContext(&Ctx, LZ4F_VERSION);
+ YQL_ENSURE(!LZ4F_isError(errorCode), "Can't create LZ4F context resource: " << LZ4F_getErrorName(errorCode));
+
+ InBuffer.resize(BufferSize);
+ OutBuffer.resize(BufferSize);
+
+ size_t inSize = MagicNumberSize;
+ size_t outSize = 0ULL;
+ WriteLE32(InBuffer.data(), Lz4ioMagicNumber);
+
+ NextToLoad = LZ4F_decompress_usingDict(Ctx, OutBuffer.data(), &outSize, InBuffer.data(), &inSize, nullptr, 0ULL, nullptr);
+ YQL_ENSURE(!LZ4F_isError(NextToLoad), "Header error: " << LZ4F_getErrorName(NextToLoad));
+ }
+}
+
+TReadBuffer::~TReadBuffer()
+{
+ if (StreamType == EStreamType::Frame) {
+ LZ4F_freeDecompressionContext(Ctx);
+ }
+}
+
+bool TReadBuffer::nextImpl() {
+ switch (StreamType) {
+ case EStreamType::Frame:
+ if (!NextToLoad)
+ return false;
+
+ if (const auto size = DecompressFrame()) {
+ working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + size);
+ return true;
+ }
+ break;
+ case EStreamType::Legacy:
+ if (const auto size = DecompressLegacy()) {
+ working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + size);
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+
+ set(nullptr, 0ULL);
+ return false;
+}
+
+size_t TReadBuffer::DecompressFrame() {
+ if (NextToLoad > InBuffer.size())
+ NextToLoad = InBuffer.size();
+
+ if (Pos >= Remaining) {
+ for (auto toRead = NextToLoad; toRead > 0U;) {
+ const auto sizeCheck = Source.read(InBuffer.data() + NextToLoad - toRead, toRead);
+ YQL_ENSURE(sizeCheck > 0U && sizeCheck <= toRead, "Cannot access compressed block.");
+ toRead -= sizeCheck;
+ }
+
+ Pos = 0ULL;
+ Remaining = NextToLoad;
+ }
+
+ if (Pos < Remaining) {
+ auto decodedBytes = OutBuffer.size();
+ NextToLoad = LZ4F_decompress_usingDict(Ctx, OutBuffer.data(), &decodedBytes, InBuffer.data() + Pos, &Remaining, nullptr, 0ULL, nullptr);
+ YQL_ENSURE(!LZ4F_isError(NextToLoad), "Decompression error: " << LZ4F_getErrorName(NextToLoad));
+ Pos += Remaining;
+
+ if (decodedBytes)
+ return decodedBytes;
+ }
+
+ return 0ULL;
+}
+
+size_t TReadBuffer::DecompressLegacy() {
+ InBuffer.resize(LZ4_compressBound(LegacyBlockSize));
+ OutBuffer.resize(LegacyBlockSize);
+
+ unsigned int blockSize = 0U;
+
+ if (const auto sizeCheck = Source.read(InBuffer.data(), 4U)) {
+ YQL_ENSURE(sizeCheck == 4U, "Cannot access block size.");
+ blockSize = ReadLE32(InBuffer.data());
+ YQL_ENSURE(blockSize <= LZ4_COMPRESSBOUND(LegacyBlockSize), "Block size out of bounds.");
+ } else
+ return 0ULL;
+
+ for (auto toRead = blockSize; toRead > 0U;) {
+ const auto sizeCheck = Source.read(InBuffer.data() + blockSize - toRead, toRead);
+ YQL_ENSURE(sizeCheck > 0U && sizeCheck <= toRead, "Cannot access compressed block.");
+ toRead -= sizeCheck;
+ }
+
+ const auto decodeSize = LZ4_decompress_safe(InBuffer.data(), OutBuffer.data(), (int)blockSize, LegacyBlockSize);
+
+ YQL_ENSURE(decodeSize >= 0, "Corrupted input detected.");
+ return size_t(decodeSize);
+}
+
+}
+
+}
diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.h b/ydb/library/yql/providers/s3/compressors/lz4io.h
new file mode 100644
index 00000000000..36203485eb0
--- /dev/null
+++ b/ydb/library/yql/providers/s3/compressors/lz4io.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+#define LZ4F_STATIC_LINKING_ONLY
+#include <contrib/libs/lz4/lz4frame.h>
+
+namespace NYql {
+
+namespace NLz4 {
+
+enum class EStreamType {
+ Unknown = 0,
+ Frame,
+ Legacy
+};
+
+class TReadBuffer : public NDB::ReadBuffer {
+public:
+ TReadBuffer(NDB::ReadBuffer& source);
+ ~TReadBuffer();
+private:
+ bool nextImpl() final;
+
+ size_t DecompressFrame();
+ size_t DecompressLegacy();
+
+ NDB::ReadBuffer& Source;
+ const EStreamType StreamType;
+ std::vector<char> InBuffer, OutBuffer;
+
+ LZ4F_decompressionContext_t Ctx;
+ LZ4F_errorCode_t NextToLoad;
+ size_t Pos, Remaining;
+};
+
+}
+
+}