aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-10-14 15:15:00 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-10-14 15:15:00 +0300
commit11cf1c4aa90b19d0a23b3a645ade2d53e4cadb53 (patch)
treeed30b5c0727c67e6219b2f0b5bf45f6274fdf8c5
parentbf1aa215365fbfa8f2cf2c12ae472f265b2cbbd8 (diff)
downloadydb-11cf1c4aa90b19d0a23b3a645ade2d53e4cadb53.tar.gz
Fix lz4 decompression.
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp17
-rw-r--r--ydb/library/yql/providers/s3/compressors/lz4io.cpp17
2 files changed, 13 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index dbb0c4b730..aa99dcc44a 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -41,7 +41,7 @@ namespace {
"json_each_row"sv,
"parquet"sv
};
- constexpr std::array<std::string_view, 6> CompressionsForInput = {
+ constexpr std::array<std::string_view, 6> Compressions = {
"gzip"sv,
"zstd"sv,
"lz4"sv,
@@ -49,13 +49,6 @@ namespace {
"bzip2"sv,
"xz"sv
};
- constexpr std::array<std::string_view, 5> CompressionsForOutput = {
- "gzip"sv,
- "brotli"sv,
- "zstd"sv,
- "bzip2"sv,
- "xz"sv
- };
constexpr std::array<std::string_view, 10> IntervalUnits = {
"MICROSECONDS"sv,
"MILLISECONDS"sv,
@@ -1106,20 +1099,20 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<
}
bool ValidateCompressionForInput(std::string_view compression, TExprContext& ctx) {
- if (compression.empty() || IsIn(CompressionsForInput, compression)) {
+ if (compression.empty() || IsIn(Compressions, compression)) {
return true;
}
ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
- << ". Use one of: " << JoinSeq(", ", CompressionsForInput)));
+ << ". Use one of: " << JoinSeq(", ", Compressions)));
return false;
}
bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ctx) {
- if (compression.empty() || IsIn(CompressionsForOutput, compression)) {
+ if (compression.empty() || IsIn(Compressions, compression)) {
return true;
}
ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
- << ". Use one of: " << JoinSeq(", ", CompressionsForOutput)));
+ << ". Use one of: " << JoinSeq(", ", Compressions)));
return false;
}
diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
index cd5558f3d4..70bdfb87b2 100644
--- a/ydb/library/yql/providers/s3/compressors/lz4io.cpp
+++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
@@ -20,6 +20,7 @@ constexpr ui32 Lz4ioMagicNumber = 0x184D2204U;
constexpr ui32 LegacyMagicNumber = 0x184C2102U;
constexpr size_t LegacyBlockSize = 8_MB;
+constexpr size_t FrameMaxBlockSize = 4_MB;
void WriteLE32 (void* p, ui32 value32)
{
@@ -39,8 +40,6 @@ ui32 ReadLE32 (const void* s) {
return value32;
}
-constexpr size_t BufferSize = 64_KB;
-
EStreamType CheckMagic(const void* data) {
switch (ReadLE32(data)) {
case Lz4ioMagicNumber:
@@ -68,8 +67,8 @@ TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
const auto errorCode = LZ4F_createDecompressionContext(&Ctx, LZ4F_VERSION);
YQL_ENSURE(!LZ4F_isError(errorCode), "Can't create LZ4F context resource: " << LZ4F_getErrorName(errorCode));
- InBuffer.resize(BufferSize);
- OutBuffer.resize(BufferSize);
+ InBuffer.resize(64_KB);
+ OutBuffer.resize(FrameMaxBlockSize);
size_t inSize = MagicNumberSize;
size_t outSize = 0ULL;
@@ -113,8 +112,9 @@ bool TReadBuffer::nextImpl() {
}
size_t TReadBuffer::DecompressFrame() {
- if (NextToLoad > InBuffer.size())
- NextToLoad = InBuffer.size();
+ if (NextToLoad > InBuffer.size()) {
+ InBuffer.resize(NextToLoad);
+ }
if (Pos >= Remaining) {
for (auto toRead = NextToLoad; toRead > 0U;) {
@@ -168,10 +168,9 @@ size_t TReadBuffer::DecompressLegacy() {
namespace {
class TCompressor : public TOutputQueue<> {
-static constexpr size_t BlockSize = 4_MB;
public:
TCompressor(int level)
- : OutputBufferSize(LZ4F_compressFrameBound(BlockSize, nullptr)), OutputBuffer(std::make_unique<char[]>(OutputBufferSize))
+ : OutputBufferSize(LZ4F_compressFrameBound(FrameMaxBlockSize, nullptr)), OutputBuffer(std::make_unique<char[]>(OutputBufferSize))
{
Prefs.autoFlush = 1;
Prefs.compressionLevel = level;
@@ -254,7 +253,7 @@ private:
LZ4F_preferences_t Prefs;
LZ4F_compressionContext_t Ctx;
- TOutputQueue<BlockSize, BlockSize> InputQueue;
+ TOutputQueue<FrameMaxBlockSize, FrameMaxBlockSize> InputQueue;
bool IsFirstBlock = true;
};