diff options
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp index e1845b8945d..142410dd930 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp @@ -72,7 +72,9 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe } NThreading::WaitAll(sendings).Wait(); const bool wereErrors = AtomicGet(ErrorsCount); - Cout << "Fill table " << dataGen->GetName() << " "<< (wereErrors ? "Failed" : "OK" ) << " " << Bar->GetCurProgress() << " / " << Bar->GetCapacity() << " (" << (Now() - start) << ")" << Endl; + with_lock(Lock) { + Cout << "Fill table " << dataGen->GetName() << " "<< (wereErrors ? "Failed" : "OK" ) << " " << Bar->GetCurProgress() << " / " << Bar->GetCapacity() << " (" << (Now() - start) << ")" << Endl; + } if (wereErrors) { break; } @@ -232,48 +234,50 @@ private: TAdaptiveLock Lock; }; -void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try { +void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept { TAtomic counter = 0; - for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) { - TVector<TAsyncStatus> sendingResults; - for (const auto& data: portions) { - AtomicIncrement(counter); - sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) { - AtomicDecrement(counter); - return result.GetValueSync(); - })); - } - NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) { - bool success = true; - for (size_t i = 0; i < portions.size(); ++i) { - const auto& data = portions[i]; - const auto& res = sendingResults[i].GetValueSync(); - auto guard = Guard(Lock); - if (!res.IsSuccess()) { - Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; - AtomicIncrement(ErrorsCount); - success = false; - } else if (data->GetSize()) { - Bar->AddProgress(data->GetSize()); - } + try { + for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) { + TVector<TAsyncStatus> sendingResults; + for (const auto& data: portions) { + AtomicIncrement(counter); + sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) { + AtomicDecrement(counter); + return result.GetValueSync(); + })); } - if (success) { + NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) { + bool success = true; for (size_t i = 0; i < portions.size(); ++i) { - portions[i]->SetSendResult(sendingResults[i].GetValueSync()); + const auto& data = portions[i]; + const auto& res = sendingResults[i].GetValueSync(); + auto guard = Guard(Lock); + if (!res.IsSuccess()) { + Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; + AtomicIncrement(ErrorsCount); + success = false; + } else if (data->GetSize()) { + Bar->AddProgress(data->GetSize()); + } } + if (success) { + for (size_t i = 0; i < portions.size(); ++i) { + portions[i]->SetSendResult(sendingResults[i].GetValueSync()); + } + } + }); + if (AtomicGet(ErrorsCount)) { + break; } - }); - if (AtomicGet(ErrorsCount)) { - break; } + } catch (...) { + auto g = Guard(Lock); + Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: "; + PrintBackTrace(); + AtomicSet(ErrorsCount, 1); } while(AtomicGet(counter) > 0) { Sleep(TDuration::MilliSeconds(100)); } -} catch (...) { - auto g = Guard(Lock); - Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: "; - PrintBackTrace(); - AtomicSet(ErrorsCount, 1); } }
\ No newline at end of file |
