diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-10-26 21:06:30 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-10-26 21:06:30 +0300 |
commit | c943ac86caeb0daa18266c601b4c7314c2d34e81 (patch) | |
tree | b355e84ee1b89eaab7caacbfe68b706a506dd6d8 | |
parent | 7d977b6f917a50979779188b685a9201975048d6 (diff) | |
download | ydb-c943ac86caeb0daa18266c601b4c7314c2d34e81.tar.gz |
Count rows & bytes properly
-rw-r--r-- | ydb/core/tx/datashard/import_s3.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_restore.cpp | 60 |
2 files changed, 66 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index 1fa6100667b..f3ba06180c4 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -478,6 +478,12 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { TMemoryPool pool(256); while (ProcessData(data, pool)); + if (Reader->ReadyBytes()) { // has progress + WrittenRows += std::exchange(PendingRows, 0); + WrittenBytes += std::exchange(PendingBytes, 0); + } + + UploadRows(); Reader->Confirm(); } else { Y_FAIL("unreachable"); @@ -495,10 +501,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return true; // skip empty line } - const auto& record = RequestBuilder.GetRecord(); - WrittenRows += record->RowsSize(); - - UploadRows(); return false; } @@ -515,7 +517,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { TVector<TCell> values; TString strError; - if (!NFormats::TYdbDump::ParseLine(line, columnOrderTypes, pool, keys, values, strError, WrittenBytes)) { + if (!NFormats::TYdbDump::ParseLine(line, columnOrderTypes, pool, keys, values, strError, PendingBytes)) { Finish(false, TStringBuilder() << strError << " on line: " << origLine); return false; } @@ -527,6 +529,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { } RequestBuilder.AddRow(keys, values); + ++PendingRows; + return true; } @@ -770,6 +774,8 @@ private: ui64 ProcessedBytes = 0; ui64 WrittenBytes = 0; ui64 WrittenRows = 0; + ui64 PendingBytes = 0; + ui64 PendingRows = 0; const ui32 ReadBatchSize; const ui64 ReadBufferSizeLimit; diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp index c1e05806b3f..b71bae36885 100644 --- a/ydb/core/tx/schemeshard/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore.cpp @@ -449,19 +449,69 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { Y_UNIT_TEST(ShouldSucceedOnSmallBuffer) { TTestBasicRuntime runtime; TTestEnv env(runtime); + ui64 txId = 100; + runtime.GetAppData().ZstdBlockSizeForTest = 16; runtime.GetAppData().DataShardConfig.SetRestoreReadBufferSizeLimit(16); - const auto data = GenerateZstdTestData("a", 2); - const ui32 batchSize = 1; - - Restore(runtime, env, R"( + TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] - )", {data}, batchSize); + )"); + env.TestWaitNotification(runtime, txId); + + bool uploadResponseDropped = false; + runtime.SetObserverFunc([&uploadResponseDropped](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvUnsafeUploadRowsResponse) { + uploadResponseDropped = true; + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + TPortManager portManager; + THolder<TS3Mock> s3Mock; + const auto data = GenerateZstdTestData("a", 2); + const ui32 batchSize = 1; + RestoreNoWait(runtime, txId, portManager.GetPort(), s3Mock, {data}, batchSize); + + if (!uploadResponseDropped) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&uploadResponseDropped](IEventHandle&) -> bool { + return uploadResponseDropped; + }); + runtime.DispatchEvents(opts); + } + + TMaybe<NKikimrTxDataShard::TShardOpResult> result; + runtime.SetObserverFunc([&result](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvSchemaChanged) { + const auto& record = ev->Get<TEvDataShard::TEvSchemaChanged>()->Record; + if (record.HasOpResult()) { + result = record.GetOpResult(); + } + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + RebootTablet(runtime, TTestTxConfig::FakeHiveTablets, runtime.AllocateEdgeActor()); + if (!result) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&result](IEventHandle&) -> bool { + return result.Defined(); + }); + runtime.DispatchEvents(opts); + } + + UNIT_ASSERT_VALUES_EQUAL(result->GetBytesProcessed(), 16); + UNIT_ASSERT_VALUES_EQUAL(result->GetRowsProcessed(), 2); + + env.TestWaitNotification(runtime, txId); auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets); NKqp::CompareYson(data.YsonStr, content); } |