diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-11-07 12:22:03 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-11-07 12:22:03 +0300 |
commit | 53f587219b1281a60091e6dba560ff3d7e0a1310 (patch) | |
tree | de2e76f6404320bc2663ae870bb8e71345a92b6f | |
parent | eb936463c91a94f63f99d26dda2063fdd4ccda11 (diff) | |
download | ydb-53f587219b1281a60091e6dba560ff3d7e0a1310.tar.gz |
Do not decompress the whole portion at once (if possible)
-rw-r--r-- | ydb/core/tx/datashard/import_s3.cpp | 125 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_restore.cpp | 25 |
2 files changed, 98 insertions, 52 deletions
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index f3ba06180c4..ba4f23c09d0 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -54,7 +54,7 @@ using namespace Aws; class TS3Downloader: public TActorBootstrapped<TS3Downloader> { class IReadController { public: - enum EFeed { + enum EDataStatus { READY_DATA = 0, NOT_ENOUGH_DATA = 1, ERROR = 2, @@ -62,11 +62,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { public: virtual ~IReadController() = default; - // Returns status or error - virtual EFeed Feed(TString&& portion, TString& error) = 0; - // Returns view to internal buffer with ready data after successful Feed() - virtual TStringBuf GetReadyData() const = 0; - // Clear internal buffer & makes it ready for another Feed() + virtual void Feed(TString&& portion) = 0; + // Returns data (view to internal buffer) or error + virtual EDataStatus TryGetData(TStringBuf& data, TString& error) = 0; + // Clear internal buffer & makes it ready for another Feed() and TryGetData() virtual void Confirm() = 0; virtual ui64 PendingBytes() const = 0; virtual ui64 ReadyBytes() const = 0; @@ -78,6 +77,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { : RangeSize(rangeSize) , BufferSizeLimit(bufferSizeLimit) { + // able to contain at least one range + Buffer.Reserve(RangeSize); } std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const { @@ -90,8 +91,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { } protected: - bool CheckBufferSize(size_t size, TString& reason) const { - if ((size + RangeSize) >= BufferSizeLimit) { + bool CanIncreaseBuffer(size_t size, size_t delta, TString& reason) const { + if ((size + delta) >= BufferSizeLimit) { reason = "reached buffer size limit"; return false; } @@ -99,6 +100,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return true; } + bool CanRequestNextRange(size_t size, TString& reason) const { + return CanIncreaseBuffer(size, RangeSize, reason); + } + TStringBuf AsStringBuf(size_t size) const { return TStringBuf(Buffer.Data(), size); } @@ -116,14 +121,16 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { public: using TReadController::TReadController; - EFeed Feed(TString&& portion, TString& error) override { + void Feed(TString&& portion) override { + Buffer.Append(portion.data(), portion.size()); + } + + EDataStatus TryGetData(TStringBuf& data, TString& error) override { Y_VERIFY(Pos == 0); - Buffer.Append(portion.data(), portion.size()); const ui64 pos = AsStringBuf(Buffer.Size()).rfind('\n'); - if (TString::npos == pos) { - if (!CheckBufferSize(Buffer.Size(), error)) { + if (!CanRequestNextRange(Buffer.Size(), error)) { return ERROR; } else { return NOT_ENOUGH_DATA; @@ -131,11 +138,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { } Pos = pos + 1 /* \n */; - return READY_DATA; - } + data = AsStringBuf(Pos); - TStringBuf GetReadyData() const override { - return AsStringBuf(Pos); + return READY_DATA; } void Confirm() override { @@ -167,13 +172,21 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { , Context(ZSTD_createDCtx()) { Reset(); + // able to contain at least one block + // take effect if RangeSize < BlockSize + Buffer.Reserve(AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX)); } - EFeed Feed(TString&& portion, TString& error) override { + void Feed(TString&& portion) override { + Y_VERIFY(Portion.Empty()); + Portion.Assign(portion.data(), portion.size()); + } + + EDataStatus TryGetData(TStringBuf& data, TString& error) override { Y_VERIFY(ReadyInputBytes == 0 && ReadyOutputPos == 0); - auto input = ZSTD_inBuffer{portion.data(), portion.size(), 0}; - while (input.pos < input.size) { + auto input = ZSTD_inBuffer{Portion.Data(), Portion.Size(), 0}; + while (!ReadyOutputPos) { PendingInputBytes -= input.pos; // dec before decompress auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), Buffer.Size()}; @@ -184,18 +197,11 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return ERROR; } - if (output.pos == output.size) { - if (!CheckBufferSize(Buffer.Capacity(), error)) { - return ERROR; - } - - Buffer.Reserve(Buffer.Capacity() + AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX)); - } - PendingInputBytes += input.pos; // inc after decompress Buffer.Proceed(output.pos); if (res == 0) { + // end of frame if (AsStringBuf(Buffer.Size()).back() != '\n') { error = "cannot find new line symbol"; return ERROR; @@ -205,35 +211,48 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { ReadyOutputPos = Buffer.Size(); Reset(); } else { + // try to find complete row const ui64 pos = AsStringBuf(Buffer.Size()).rfind('\n'); if (TString::npos != pos) { ReadyOutputPos = pos + 1 /* \n */; } } + + if (input.pos >= input.size) { + // end of input + break; + } + + if (!ReadyOutputPos && output.pos == output.size) { + const auto blockSize = AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX); + if (CanIncreaseBuffer(Buffer.Size(), blockSize, error)) { + Buffer.Reserve(Buffer.Size() + blockSize); + } else { + return ERROR; + } + } } + Portion.ChopHead(input.pos); + if (!ReadyOutputPos) { - if (!CheckBufferSize(Buffer.Size(), error)) { + if (!CanRequestNextRange(Buffer.Size(), error)) { return ERROR; } else { return NOT_ENOUGH_DATA; } } + data = AsStringBuf(ReadyOutputPos); return READY_DATA; } - TStringBuf GetReadyData() const override { - return AsStringBuf(ReadyOutputPos); - } - void Confirm() override { Buffer.ChopHead(ReadyOutputPos); + ReadyOutputPos = 0; PendingInputBytes -= ReadyInputBytes; ReadyInputBytes = 0; - - ReadyOutputPos = 0; } ui64 PendingBytes() const override { @@ -246,6 +265,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { private: THolder<::ZSTD_DCtx, DestroyZCtx> Context; + TBuffer Portion; ui64 PendingInputBytes = 0; ui64 ReadyInputBytes = 0; ui64 ReadyOutputPos = 0; @@ -428,8 +448,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return Finish(); } - GetObject(Settings.GetDataKey(DataFormat, CompressionCodec), - Reader->NextRange(ContentLength, ProcessedBytes)); + Process(); } void Handle(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { @@ -452,8 +471,15 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { << ", content-length# " << ContentLength << ", body-size# " << msg.Body.size()); + Reader->Feed(std::move(msg.Body)); + Process(); + } + + void Process() { + TStringBuf data; TString error; - switch (Reader->Feed(std::move(msg.Body), error)) { + + switch (Reader->TryGetData(data, error)) { case TReadController::READY_DATA: break; @@ -471,23 +497,18 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { << ": " << error); } - if (auto data = Reader->GetReadyData()) { - ProcessedBytes += Reader->ReadyBytes(); - RequestBuilder.New(TableInfo, Scheme); + RequestBuilder.New(TableInfo, Scheme); + TMemoryPool pool(256); + while (ProcessData(data, pool)); - TMemoryPool pool(256); - while (ProcessData(data, pool)); - - if (Reader->ReadyBytes()) { // has progress - WrittenRows += std::exchange(PendingRows, 0); - WrittenBytes += std::exchange(PendingBytes, 0); - } - - UploadRows(); - Reader->Confirm(); - } else { - Y_FAIL("unreachable"); + if (const auto processed = Reader->ReadyBytes()) { // has progress + ProcessedBytes += processed; + WrittenBytes += std::exchange(PendingBytes, 0); + WrittenRows += std::exchange(PendingRows, 0); } + + Reader->Confirm(); + UploadRows(); } bool ProcessData(TStringBuf& data, TMemoryPool& pool) { diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp index b71bae36885..e454425f541 100644 --- a/ydb/core/tx/schemeshard/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore.cpp @@ -516,6 +516,31 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } + Y_UNIT_TEST(ShouldNotDecompressEntirePortionAtOnce) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + runtime.GetAppData().ZstdBlockSizeForTest = 113; // one row + + ui32 uploadRowsCount = 0; + runtime.SetObserverFunc([&uploadRowsCount](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + uploadRowsCount += ui32(ev->GetTypeRewrite() == TEvDataShard::EvUnsafeUploadRowsResponse); + return TTestActorRuntime::EEventAction::PROCESS; + }); + + const auto data = GenerateZstdTestData(TString(100, 'a'), 2); // 2 rows, 1 row = 113b + // ensure that one decompressed row is bigger than entire compressed file + UNIT_ASSERT(data.Data.size() < *runtime.GetAppData().ZstdBlockSizeForTest); + + Restore(runtime, env, R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", {data}, data.Data.size()); + + UNIT_ASSERT_VALUES_EQUAL(uploadRowsCount, 2); + } + Y_UNIT_TEST_WITH_COMPRESSION(ShouldExpandBuffer) { TTestBasicRuntime runtime; |