diff options
author | hor911 <hor911@ydb.tech> | 2023-03-06 18:32:24 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-03-06 18:32:24 +0300 |
commit | b78205b7cd86eb78e7be560fe47a2ff48e35a9ed (patch) | |
tree | 7ccca1bff183a6f2c28d086ed8fda08e62615835 | |
parent | 1b9d5978a4c93d0049fce37a2d341b80017eb514 (diff) | |
download | ydb-b78205b7cd86eb78e7be560fe47a2ff48e35a9ed.tar.gz |
Fix compression issues (lz4 deadlock + unknown conversions handling)
LZ4 в конструкторе пытается считать свое сигнатуру. Поэтому загрузку надо начинать до вызова конструктора иначе наступает deadlock
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 795dbdc7b7..3a021f1ec9 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1270,14 +1270,15 @@ public: std::unique_ptr<NDB::ReadBuffer> decompressorBuffer; NDB::ReadBuffer* buffer = coroBuffer.get(); + // lz4 decompressor reads signature in ctor, w/o actual data it will be deadlocked + DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize); + if (ReadSpec->Compression) { decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression); - YQL_ENSURE(buffer, "Unsupported " << ReadSpec->Compression << " compression."); + YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression."); buffer = decompressorBuffer.get(); } - DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize); - auto stream = std::make_unique<NDB::InputStreamFromInputFormat>( NDB::FormatFactory::instance().getInputFormat( ReadSpec->Format, *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings @@ -1303,7 +1304,7 @@ public: if (ReadSpec->Compression) { decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression); - YQL_ENSURE(buffer, "Unsupported " << ReadSpec->Compression << " compression."); + YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression."); buffer = decompressorBuffer.get(); } |