diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-10-14 15:15:00 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-10-14 15:15:00 +0300 |
commit | 11cf1c4aa90b19d0a23b3a645ade2d53e4cadb53 (patch) | |
tree | ed30b5c0727c67e6219b2f0b5bf45f6274fdf8c5 | |
parent | bf1aa215365fbfa8f2cf2c12ae472f265b2cbbd8 (diff) | |
download | ydb-11cf1c4aa90b19d0a23b3a645ade2d53e4cadb53.tar.gz |
Fix lz4 decompression.
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.cpp | 17 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/compressors/lz4io.cpp | 17 |
2 files changed, 13 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index dbb0c4b730..aa99dcc44a 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -41,7 +41,7 @@ namespace { "json_each_row"sv, "parquet"sv }; - constexpr std::array<std::string_view, 6> CompressionsForInput = { + constexpr std::array<std::string_view, 6> Compressions = { "gzip"sv, "zstd"sv, "lz4"sv, @@ -49,13 +49,6 @@ namespace { "bzip2"sv, "xz"sv }; - constexpr std::array<std::string_view, 5> CompressionsForOutput = { - "gzip"sv, - "brotli"sv, - "zstd"sv, - "bzip2"sv, - "xz"sv - }; constexpr std::array<std::string_view, 10> IntervalUnits = { "MICROSECONDS"sv, "MILLISECONDS"sv, @@ -1106,20 +1099,20 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap< } bool ValidateCompressionForInput(std::string_view compression, TExprContext& ctx) { - if (compression.empty() || IsIn(CompressionsForInput, compression)) { + if (compression.empty() || IsIn(Compressions, compression)) { return true; } ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression - << ". Use one of: " << JoinSeq(", ", CompressionsForInput))); + << ". Use one of: " << JoinSeq(", ", Compressions))); return false; } bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ctx) { - if (compression.empty() || IsIn(CompressionsForOutput, compression)) { + if (compression.empty() || IsIn(Compressions, compression)) { return true; } ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression - << ". Use one of: " << JoinSeq(", ", CompressionsForOutput))); + << ". Use one of: " << JoinSeq(", ", Compressions))); return false; } diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp index cd5558f3d4..70bdfb87b2 100644 --- a/ydb/library/yql/providers/s3/compressors/lz4io.cpp +++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp @@ -20,6 +20,7 @@ constexpr ui32 Lz4ioMagicNumber = 0x184D2204U; constexpr ui32 LegacyMagicNumber = 0x184C2102U; constexpr size_t LegacyBlockSize = 8_MB; +constexpr size_t FrameMaxBlockSize = 4_MB; void WriteLE32 (void* p, ui32 value32) { @@ -39,8 +40,6 @@ ui32 ReadLE32 (const void* s) { return value32; } -constexpr size_t BufferSize = 64_KB; - EStreamType CheckMagic(const void* data) { switch (ReadLE32(data)) { case Lz4ioMagicNumber: @@ -68,8 +67,8 @@ TReadBuffer::TReadBuffer(NDB::ReadBuffer& source) const auto errorCode = LZ4F_createDecompressionContext(&Ctx, LZ4F_VERSION); YQL_ENSURE(!LZ4F_isError(errorCode), "Can't create LZ4F context resource: " << LZ4F_getErrorName(errorCode)); - InBuffer.resize(BufferSize); - OutBuffer.resize(BufferSize); + InBuffer.resize(64_KB); + OutBuffer.resize(FrameMaxBlockSize); size_t inSize = MagicNumberSize; size_t outSize = 0ULL; @@ -113,8 +112,9 @@ bool TReadBuffer::nextImpl() { } size_t TReadBuffer::DecompressFrame() { - if (NextToLoad > InBuffer.size()) - NextToLoad = InBuffer.size(); + if (NextToLoad > InBuffer.size()) { + InBuffer.resize(NextToLoad); + } if (Pos >= Remaining) { for (auto toRead = NextToLoad; toRead > 0U;) { @@ -168,10 +168,9 @@ size_t TReadBuffer::DecompressLegacy() { namespace { class TCompressor : public TOutputQueue<> { -static constexpr size_t BlockSize = 4_MB; public: TCompressor(int level) - : OutputBufferSize(LZ4F_compressFrameBound(BlockSize, nullptr)), OutputBuffer(std::make_unique<char[]>(OutputBufferSize)) + : OutputBufferSize(LZ4F_compressFrameBound(FrameMaxBlockSize, nullptr)), OutputBuffer(std::make_unique<char[]>(OutputBufferSize)) { Prefs.autoFlush = 1; Prefs.compressionLevel = level; @@ -254,7 +253,7 @@ private: LZ4F_preferences_t Prefs; LZ4F_compressionContext_t Ctx; - TOutputQueue<BlockSize, BlockSize> InputQueue; + TOutputQueue<FrameMaxBlockSize, FrameMaxBlockSize> InputQueue; bool IsFirstBlock = true; }; |