diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-06-03 15:17:54 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-06-03 15:17:54 +0300 |
commit | 950f1f0bb785be2b41cf4916ca94a3d24f29fdda (patch) | |
tree | 41a02aa37059bc2de9ce8bc4e14323f84c63376b | |
parent | b2c13c5016730c435d1108087ee23caf5ae37024 (diff) | |
download | ydb-950f1f0bb785be2b41cf4916ca94a3d24f29fdda.tar.gz |
YQ-1037 lz4 support draft.
ref:13a14e17f32616cc493f4af5d7d93769582128e6
-rw-r--r-- | CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 19 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt | 14 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt | 29 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/CMakeLists.txt | 13 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/factory.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/factory.h | 9 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/lz4io.cpp | 174 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/lz4io.h | 38 |
11 files changed, 306 insertions, 7 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index f7ebb70ba15..526e6e77beb 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1216,6 +1216,7 @@ add_subdirectory(ydb/library/yql/parser/pg_catalog/ut) add_subdirectory(ydb/library/yql/parser/lexer_common/ut) add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) +add_subdirectory(ydb/library/yql/providers/s3/compressors) add_subdirectory(ydb/library/yql/providers/function/common) add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/gateway) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 061c06825c8..225a01df147 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -906,6 +906,7 @@ add_subdirectory(contrib/libs/expat) add_subdirectory(contrib/libs/poco/Foundation) add_subdirectory(contrib/libs/poco/JSON) add_subdirectory(contrib/libs/poco/XML) +add_subdirectory(ydb/library/yql/providers/s3/compressors) add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client) add_subdirectory(ydb/library/yql/public/udf/support) add_subdirectory(contrib/restricted/boost/libs/program_options) diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index 1ca4231aa67..5127a2fcbc2 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -28,6 +28,7 @@ target_link_libraries(providers-s3-actors PUBLIC dq-actors-compute providers-common-http_gateway providers-s3-proto + providers-s3-compressors clickhouse_client_udf ) target_sources(providers-s3-actors PRIVATE diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 3c47e5d9632..ee5c34cc129 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -30,6 +30,7 @@ #include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/providers/s3/compressors/factory.h> #include <ydb/library/yql/providers/s3/proto/range.pb.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> @@ -329,7 +330,9 @@ public: private: void Run() final try { TReadBufferFromStream buffer(this); - NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); + const auto decompress(MakeDecompressor(buffer, ReadSpec->Compression)); + YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " <<ReadSpec->Compression << " compression."); + NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); while (auto block = stream.read()) Send(SourceActorId, new TEvPrivate::TEvNextBlock(block)); @@ -653,14 +656,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( readSpec->Compression = it->second; #define SUPPORTED_FLAGS(xx) \ - xx(skip_unknown_fields) \ - xx(import_nested_json) \ - xx(with_names_use_header) \ - xx(null_as_default) \ + xx(skip_unknown_fields, true) \ + xx(import_nested_json, true) \ + xx(with_names_use_header, true) \ + xx(null_as_default, true) \ -#define SET_FLAG(flag) \ +#define SET_FLAG(flag, def) \ if (const auto it = settings.find(#flag); settings.cend() != it) \ - readSpec->Settings.flag = FromString<bool>(it->second); + readSpec->Settings.flag = FromString<bool>(it->second); \ + else \ + readSpec->Settings.flag = def; SUPPORTED_FLAGS(SET_FLAG) diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt new file mode 100644 index 00000000000..c3190798097 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt @@ -0,0 +1,14 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-compressors INTERFACE) +target_link_libraries(providers-s3-compressors INTERFACE + contrib-libs-cxxsupp + yutil +) diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt new file mode 100644 index 00000000000..3dffdb211fd --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt @@ -0,0 +1,29 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-compressors) +target_compile_options(providers-s3-compressors PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(providers-s3-compressors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src +) +target_link_libraries(providers-s3-compressors PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + libs-poco-Util + contrib-libs-lz4 +) +target_sources(providers-s3-compressors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/lz4io.cpp +) diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.txt new file mode 100644 index 00000000000..a681d385f3e --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/s3/compressors/factory.cpp b/ydb/library/yql/providers/s3/compressors/factory.cpp new file mode 100644 index 00000000000..7a4961ac924 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/factory.cpp @@ -0,0 +1,14 @@ +#include "factory.h" +#include "lz4io.h" + +namespace NYql { + +std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const std::string_view& compression) { + if ("lz4" == compression) + return std::make_unique<NLz4::TReadBuffer>(input); + + return nullptr; +} + +} + diff --git a/ydb/library/yql/providers/s3/compressors/factory.h b/ydb/library/yql/providers/s3/compressors/factory.h new file mode 100644 index 00000000000..8555c107c78 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/factory.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> + +namespace NYql { + +std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const std::string_view& compression); + +} diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp new file mode 100644 index 00000000000..0d197ee970c --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp @@ -0,0 +1,174 @@ +#include "lz4io.h" + +#include <util/generic/scope.h> +#include <util/generic/size_literals.h> +#include <util/stream/output.h> +#include <util/stream/input.h> +#include <util/string/builder.h> + +#include <contrib/libs/lz4/lz4.h> +#include <contrib/libs/lz4/lz4hc.h> + +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/public/udf/udf_allocator.h> +#include <ydb/library/yql/public/udf/udf_terminator.h> + +namespace NYql { + +namespace NLz4 { + +namespace { + +constexpr ui32 MagicNumberSize = 4U; +constexpr ui32 Lz4ioMagicNumber = 0x184D2204U; +constexpr ui32 LegacyMagicNumber = 0x184C2102U; + +constexpr size_t LegacyBlockSize = 8_MB; + +void WriteLE32 (void* p, ui32 value32) +{ + const auto dstPtr = static_cast<unsigned char*>(p); + dstPtr[0] = (unsigned char)value32; + dstPtr[1] = (unsigned char)(value32 >> 8U); + dstPtr[2] = (unsigned char)(value32 >> 16U); + dstPtr[3] = (unsigned char)(value32 >> 24U); +} + +ui32 ReadLE32 (const void* s) { + const auto srcPtr = static_cast<const unsigned char*>(s); + ui32 value32 = srcPtr[0]; + value32 += (ui32)srcPtr[1] << 8U; + value32 += (ui32)srcPtr[2] << 16U; + value32 += (ui32)srcPtr[3] << 24U; + return value32; +} + +constexpr size_t BufferSize = 64_KB; + +EStreamType CheckMagic(const void* data) { + switch (ReadLE32(data)) { + case Lz4ioMagicNumber: + return EStreamType::Frame; + case LegacyMagicNumber: + return EStreamType::Legacy; + default: + return EStreamType::Unknown; + } +} + +EStreamType CheckMagic(NDB::ReadBuffer& input) { + char data[4u]; + YQL_ENSURE(input.read(data, sizeof(data)) == sizeof(data), "Buffer too small."); + return CheckMagic(data); +} + +} + +TReadBuffer::TReadBuffer(NDB::ReadBuffer& source) + : NDB::ReadBuffer(nullptr, 0ULL), Source(source), StreamType(CheckMagic(Source)), Pos(0ULL), Remaining(0ULL) +{ + YQL_ENSURE(StreamType != EStreamType::Unknown, "Wrong magic."); + if (StreamType == EStreamType::Frame) { + const auto errorCode = LZ4F_createDecompressionContext(&Ctx, LZ4F_VERSION); + YQL_ENSURE(!LZ4F_isError(errorCode), "Can't create LZ4F context resource: " << LZ4F_getErrorName(errorCode)); + + InBuffer.resize(BufferSize); + OutBuffer.resize(BufferSize); + + size_t inSize = MagicNumberSize; + size_t outSize = 0ULL; + WriteLE32(InBuffer.data(), Lz4ioMagicNumber); + + NextToLoad = LZ4F_decompress_usingDict(Ctx, OutBuffer.data(), &outSize, InBuffer.data(), &inSize, nullptr, 0ULL, nullptr); + YQL_ENSURE(!LZ4F_isError(NextToLoad), "Header error: " << LZ4F_getErrorName(NextToLoad)); + } +} + +TReadBuffer::~TReadBuffer() +{ + if (StreamType == EStreamType::Frame) { + LZ4F_freeDecompressionContext(Ctx); + } +} + +bool TReadBuffer::nextImpl() { + switch (StreamType) { + case EStreamType::Frame: + if (!NextToLoad) + return false; + + if (const auto size = DecompressFrame()) { + working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + size); + return true; + } + break; + case EStreamType::Legacy: + if (const auto size = DecompressLegacy()) { + working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + size); + return true; + } + break; + default: + break; + } + + set(nullptr, 0ULL); + return false; +} + +size_t TReadBuffer::DecompressFrame() { + if (NextToLoad > InBuffer.size()) + NextToLoad = InBuffer.size(); + + if (Pos >= Remaining) { + for (auto toRead = NextToLoad; toRead > 0U;) { + const auto sizeCheck = Source.read(InBuffer.data() + NextToLoad - toRead, toRead); + YQL_ENSURE(sizeCheck > 0U && sizeCheck <= toRead, "Cannot access compressed block."); + toRead -= sizeCheck; + } + + Pos = 0ULL; + Remaining = NextToLoad; + } + + if (Pos < Remaining) { + auto decodedBytes = OutBuffer.size(); + NextToLoad = LZ4F_decompress_usingDict(Ctx, OutBuffer.data(), &decodedBytes, InBuffer.data() + Pos, &Remaining, nullptr, 0ULL, nullptr); + YQL_ENSURE(!LZ4F_isError(NextToLoad), "Decompression error: " << LZ4F_getErrorName(NextToLoad)); + Pos += Remaining; + + if (decodedBytes) + return decodedBytes; + } + + return 0ULL; +} + +size_t TReadBuffer::DecompressLegacy() { + InBuffer.resize(LZ4_compressBound(LegacyBlockSize)); + OutBuffer.resize(LegacyBlockSize); + + unsigned int blockSize = 0U; + + if (const auto sizeCheck = Source.read(InBuffer.data(), 4U)) { + YQL_ENSURE(sizeCheck == 4U, "Cannot access block size."); + blockSize = ReadLE32(InBuffer.data()); + YQL_ENSURE(blockSize <= LZ4_COMPRESSBOUND(LegacyBlockSize), "Block size out of bounds."); + } else + return 0ULL; + + for (auto toRead = blockSize; toRead > 0U;) { + const auto sizeCheck = Source.read(InBuffer.data() + blockSize - toRead, toRead); + YQL_ENSURE(sizeCheck > 0U && sizeCheck <= toRead, "Cannot access compressed block."); + toRead -= sizeCheck; + } + + const auto decodeSize = LZ4_decompress_safe(InBuffer.data(), OutBuffer.data(), (int)blockSize, LegacyBlockSize); + + YQL_ENSURE(decodeSize >= 0, "Corrupted input detected."); + return size_t(decodeSize); +} + +} + +} diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.h b/ydb/library/yql/providers/s3/compressors/lz4io.h new file mode 100644 index 00000000000..36203485eb0 --- /dev/null +++ b/ydb/library/yql/providers/s3/compressors/lz4io.h @@ -0,0 +1,38 @@ +#pragma once + +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> +#define LZ4F_STATIC_LINKING_ONLY +#include <contrib/libs/lz4/lz4frame.h> + +namespace NYql { + +namespace NLz4 { + +enum class EStreamType { + Unknown = 0, + Frame, + Legacy +}; + +class TReadBuffer : public NDB::ReadBuffer { +public: + TReadBuffer(NDB::ReadBuffer& source); + ~TReadBuffer(); +private: + bool nextImpl() final; + + size_t DecompressFrame(); + size_t DecompressLegacy(); + + NDB::ReadBuffer& Source; + const EStreamType StreamType; + std::vector<char> InBuffer, OutBuffer; + + LZ4F_decompressionContext_t Ctx; + LZ4F_errorCode_t NextToLoad; + size_t Pos, Remaining; +}; + +} + +} |