aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-10-26 21:06:30 +0300
committerilnaz <ilnaz@ydb.tech>2022-10-26 21:06:30 +0300
commitc943ac86caeb0daa18266c601b4c7314c2d34e81 (patch)
treeb355e84ee1b89eaab7caacbfe68b706a506dd6d8
parent7d977b6f917a50979779188b685a9201975048d6 (diff)
downloadydb-c943ac86caeb0daa18266c601b4c7314c2d34e81.tar.gz
Count rows & bytes properly
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp16
-rw-r--r--ydb/core/tx/schemeshard/ut_restore.cpp60
2 files changed, 66 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index 1fa6100667b..f3ba06180c4 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -478,6 +478,12 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
TMemoryPool pool(256);
while (ProcessData(data, pool));
+ if (Reader->ReadyBytes()) { // has progress
+ WrittenRows += std::exchange(PendingRows, 0);
+ WrittenBytes += std::exchange(PendingBytes, 0);
+ }
+
+ UploadRows();
Reader->Confirm();
} else {
Y_FAIL("unreachable");
@@ -495,10 +501,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return true; // skip empty line
}
- const auto& record = RequestBuilder.GetRecord();
- WrittenRows += record->RowsSize();
-
- UploadRows();
return false;
}
@@ -515,7 +517,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
TVector<TCell> values;
TString strError;
- if (!NFormats::TYdbDump::ParseLine(line, columnOrderTypes, pool, keys, values, strError, WrittenBytes)) {
+ if (!NFormats::TYdbDump::ParseLine(line, columnOrderTypes, pool, keys, values, strError, PendingBytes)) {
Finish(false, TStringBuilder() << strError << " on line: " << origLine);
return false;
}
@@ -527,6 +529,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}
RequestBuilder.AddRow(keys, values);
+ ++PendingRows;
+
return true;
}
@@ -770,6 +774,8 @@ private:
ui64 ProcessedBytes = 0;
ui64 WrittenBytes = 0;
ui64 WrittenRows = 0;
+ ui64 PendingBytes = 0;
+ ui64 PendingRows = 0;
const ui32 ReadBatchSize;
const ui64 ReadBufferSizeLimit;
diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp
index c1e05806b3f..b71bae36885 100644
--- a/ydb/core/tx/schemeshard/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore.cpp
@@ -449,19 +449,69 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
Y_UNIT_TEST(ShouldSucceedOnSmallBuffer) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
+ ui64 txId = 100;
+
runtime.GetAppData().ZstdBlockSizeForTest = 16;
runtime.GetAppData().DataShardConfig.SetRestoreReadBufferSizeLimit(16);
- const auto data = GenerateZstdTestData("a", 2);
- const ui32 batchSize = 1;
-
- Restore(runtime, env, R"(
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
- )", {data}, batchSize);
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ bool uploadResponseDropped = false;
+ runtime.SetObserverFunc([&uploadResponseDropped](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvUnsafeUploadRowsResponse) {
+ uploadResponseDropped = true;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TPortManager portManager;
+ THolder<TS3Mock> s3Mock;
+ const auto data = GenerateZstdTestData("a", 2);
+ const ui32 batchSize = 1;
+ RestoreNoWait(runtime, txId, portManager.GetPort(), s3Mock, {data}, batchSize);
+
+ if (!uploadResponseDropped) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&uploadResponseDropped](IEventHandle&) -> bool {
+ return uploadResponseDropped;
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ TMaybe<NKikimrTxDataShard::TShardOpResult> result;
+ runtime.SetObserverFunc([&result](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvSchemaChanged) {
+ const auto& record = ev->Get<TEvDataShard::TEvSchemaChanged>()->Record;
+ if (record.HasOpResult()) {
+ result = record.GetOpResult();
+ }
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ RebootTablet(runtime, TTestTxConfig::FakeHiveTablets, runtime.AllocateEdgeActor());
+ if (!result) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&result](IEventHandle&) -> bool {
+ return result.Defined();
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result->GetBytesProcessed(), 16);
+ UNIT_ASSERT_VALUES_EQUAL(result->GetRowsProcessed(), 2);
+
+ env.TestWaitNotification(runtime, txId);
auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets);
NKqp::CompareYson(data.YsonStr, content);
}