summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <[email protected]>2024-11-25 12:18:15 +0300
committerGitHub <[email protected]>2024-11-25 12:18:15 +0300
commit8f553e4107044ad017fb0660656596d8cf163a04 (patch)
treed35d722047c92caa2a39bbfb86028d7031d0e73b
parentb07b2ad5759bec15e8dd38b321c4fb468177214e (diff)
Set position to state only if all data portions succesfully send (#11912)
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp38
-rw-r--r--ydb/tests/olap/load/test_tpch.py6
2 files changed, 31 insertions, 13 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
index 97a4e178e99..504715a83e6 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
@@ -161,22 +161,34 @@ private:
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
TAtomic counter = 0;
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
+ TVector<TAsyncStatus> sendingResults;
for (const auto& data: portions) {
AtomicIncrement(counter);
- Writer->WriteDataPortion(data).Apply(
- [data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
- const auto& res = result.GetValueSync();
- data->SetSendResult(res);
- auto guard = Guard(Lock);
- if (!res.IsSuccess()) {
- Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
- AtomicIncrement(ErrorsCount);
- } else if (data->GetSize()) {
- Bar->AddProgress(data->GetSize());
- }
- AtomicDecrement(counter);
- });
+ sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
+ AtomicDecrement(counter);
+ return result.GetValueSync();
+ }));
}
+ NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
+ bool success = true;
+ for (size_t i = 0; i < portions.size(); ++i) {
+ const auto& data = portions[i];
+ const auto& res = sendingResults[i].GetValueSync();
+ auto guard = Guard(Lock);
+ if (!res.IsSuccess()) {
+ Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
+ AtomicIncrement(ErrorsCount);
+ success = false;
+ } else if (data->GetSize()) {
+ Bar->AddProgress(data->GetSize());
+ }
+ }
+ if (success) {
+ for (size_t i = 0; i < portions.size(); ++i) {
+ portions[i]->SetSendResult(sendingResults[i].GetValueSync());
+ }
+ }
+ });
if (AtomicGet(ErrorsCount)) {
break;
}
diff --git a/ydb/tests/olap/load/test_tpch.py b/ydb/tests/olap/load/test_tpch.py
index dc3442e1336..3282dc83619 100644
--- a/ydb/tests/olap/load/test_tpch.py
+++ b/ydb/tests/olap/load/test_tpch.py
@@ -71,12 +71,18 @@ class TestTpch100(TpchSuiteBase):
class TestTpch1000(TpchSuiteBase):
+ tables_size: dict[str, int] = {
+ 'lineitem': 5999989709,
+ }
scale: int = 1000
check_canonical: bool = False
timeout = max(TpchSuiteBase.timeout, 3600.)
class TestTpch10000(TpchSuiteBase):
+ tables_size: dict[str, int] = {
+ 'lineitem': 59999994267,
+ }
scale: int = 10000
iterations: int = 2
check_canonical: bool = False