summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp72
1 files changed, 38 insertions, 34 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
index e1845b8945d..142410dd930 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
@@ -72,7 +72,9 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
}
NThreading::WaitAll(sendings).Wait();
const bool wereErrors = AtomicGet(ErrorsCount);
- Cout << "Fill table " << dataGen->GetName() << " "<< (wereErrors ? "Failed" : "OK" ) << " " << Bar->GetCurProgress() << " / " << Bar->GetCapacity() << " (" << (Now() - start) << ")" << Endl;
+ with_lock(Lock) {
+ Cout << "Fill table " << dataGen->GetName() << " "<< (wereErrors ? "Failed" : "OK" ) << " " << Bar->GetCurProgress() << " / " << Bar->GetCapacity() << " (" << (Now() - start) << ")" << Endl;
+ }
if (wereErrors) {
break;
}
@@ -232,48 +234,50 @@ private:
TAdaptiveLock Lock;
};
-void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
+void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept {
TAtomic counter = 0;
- for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
- TVector<TAsyncStatus> sendingResults;
- for (const auto& data: portions) {
- AtomicIncrement(counter);
- sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
- AtomicDecrement(counter);
- return result.GetValueSync();
- }));
- }
- NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
- bool success = true;
- for (size_t i = 0; i < portions.size(); ++i) {
- const auto& data = portions[i];
- const auto& res = sendingResults[i].GetValueSync();
- auto guard = Guard(Lock);
- if (!res.IsSuccess()) {
- Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
- AtomicIncrement(ErrorsCount);
- success = false;
- } else if (data->GetSize()) {
- Bar->AddProgress(data->GetSize());
- }
+ try {
+ for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
+ TVector<TAsyncStatus> sendingResults;
+ for (const auto& data: portions) {
+ AtomicIncrement(counter);
+ sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
+ AtomicDecrement(counter);
+ return result.GetValueSync();
+ }));
}
- if (success) {
+ NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
+ bool success = true;
for (size_t i = 0; i < portions.size(); ++i) {
- portions[i]->SetSendResult(sendingResults[i].GetValueSync());
+ const auto& data = portions[i];
+ const auto& res = sendingResults[i].GetValueSync();
+ auto guard = Guard(Lock);
+ if (!res.IsSuccess()) {
+ Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
+ AtomicIncrement(ErrorsCount);
+ success = false;
+ } else if (data->GetSize()) {
+ Bar->AddProgress(data->GetSize());
+ }
}
+ if (success) {
+ for (size_t i = 0; i < portions.size(); ++i) {
+ portions[i]->SetSendResult(sendingResults[i].GetValueSync());
+ }
+ }
+ });
+ if (AtomicGet(ErrorsCount)) {
+ break;
}
- });
- if (AtomicGet(ErrorsCount)) {
- break;
}
+ } catch (...) {
+ auto g = Guard(Lock);
+ Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: ";
+ PrintBackTrace();
+ AtomicSet(ErrorsCount, 1);
}
while(AtomicGet(counter) > 0) {
Sleep(TDuration::MilliSeconds(100));
}
-} catch (...) {
- auto g = Guard(Lock);
- Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: ";
- PrintBackTrace();
- AtomicSet(ErrorsCount, 1);
}
} \ No newline at end of file