diff options
| author | Олег <[email protected]> | 2024-11-25 12:18:15 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-11-25 12:18:15 +0300 |
| commit | 8f553e4107044ad017fb0660656596d8cf163a04 (patch) | |
| tree | d35d722047c92caa2a39bbfb86028d7031d0e73b | |
| parent | b07b2ad5759bec15e8dd38b321c4fb468177214e (diff) | |
Set position to state only if all data portions succesfully send (#11912)
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp | 38 | ||||
| -rw-r--r-- | ydb/tests/olap/load/test_tpch.py | 6 |
2 files changed, 31 insertions, 13 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp index 97a4e178e99..504715a83e6 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp @@ -161,22 +161,34 @@ private: void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try { TAtomic counter = 0; for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) { + TVector<TAsyncStatus> sendingResults; for (const auto& data: portions) { AtomicIncrement(counter); - Writer->WriteDataPortion(data).Apply( - [data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) { - const auto& res = result.GetValueSync(); - data->SetSendResult(res); - auto guard = Guard(Lock); - if (!res.IsSuccess()) { - Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; - AtomicIncrement(ErrorsCount); - } else if (data->GetSize()) { - Bar->AddProgress(data->GetSize()); - } - AtomicDecrement(counter); - }); + sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) { + AtomicDecrement(counter); + return result.GetValueSync(); + })); } + NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) { + bool success = true; + for (size_t i = 0; i < portions.size(); ++i) { + const auto& data = portions[i]; + const auto& res = sendingResults[i].GetValueSync(); + auto guard = Guard(Lock); + if (!res.IsSuccess()) { + Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; + AtomicIncrement(ErrorsCount); + success = false; + } else if (data->GetSize()) { + Bar->AddProgress(data->GetSize()); + } + } + if (success) { + for (size_t i = 0; i < portions.size(); ++i) { + portions[i]->SetSendResult(sendingResults[i].GetValueSync()); + } + } + }); if (AtomicGet(ErrorsCount)) { break; } diff --git a/ydb/tests/olap/load/test_tpch.py b/ydb/tests/olap/load/test_tpch.py index dc3442e1336..3282dc83619 100644 --- a/ydb/tests/olap/load/test_tpch.py +++ b/ydb/tests/olap/load/test_tpch.py @@ -71,12 +71,18 @@ class TestTpch100(TpchSuiteBase): class TestTpch1000(TpchSuiteBase): + tables_size: dict[str, int] = { + 'lineitem': 5999989709, + } scale: int = 1000 check_canonical: bool = False timeout = max(TpchSuiteBase.timeout, 3600.) class TestTpch10000(TpchSuiteBase): + tables_size: dict[str, int] = { + 'lineitem': 59999994267, + } scale: int = 10000 iterations: int = 2 check_canonical: bool = False |
