diff options
author | Олег <[email protected]> | 2024-09-22 00:01:13 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2024-09-22 00:01:13 +0300 |
commit | eec992f3c98e5c2a213a3f0ee09f47db6027df14 (patch) | |
tree | 66561fd569e8eb8c9fd14a735b67c9cc923ef2f6 | |
parent | 34e573cbc641b74d8321cb43981c1558dff28c95 (diff) |
Fix generating too much data in memory for ydb workload import command (#9601)
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp | 41 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload_import.h | 2 |
2 files changed, 19 insertions, 24 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp index b0241f497f5..225ad93ecc6 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp @@ -44,7 +44,7 @@ TWorkloadCommandImport::TUploadCommand::TUploadCommand(NYdbWorkload::TWorkloadPa int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& /*workloadGen*/, TConfig& /*config*/) { auto dataGeneratorList = Initializer->GetBulkInitialData(); AtomicSet(ErrorsCount, 0); - InFlightSemaphore = NThreading::TAsyncSemaphore::Make(UploadParams.MaxInFlight); + InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight); for (auto dataGen : dataGeneratorList) { TThreadPoolParams params; params.SetCatching(false); @@ -97,36 +97,31 @@ TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkloa } void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try { - TDeque<NThreading::TFuture<void>> sendings; + TAtomic counter = 0; for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) { for (const auto& data: portions) { - sendings.emplace_back( - InFlightSemaphore->AcquireAsync().Apply([this, data](const auto& sem) { - auto ar = MakeAtomicShared<NThreading::TAsyncSemaphore::TAutoRelease>(sem.GetValueSync()->MakeAutoRelease()); - return SendDataPortion(data).Apply( - [ar, data, this](const TAsyncStatus& result) { - const auto& res = result.GetValueSync(); - data->SetSendResult(res); - auto guard = Guard(Lock); - if (!res.IsSuccess()) { - Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; - AtomicIncrement(ErrorsCount); - } else { - Bar->AddProgress(data->GetSize()); - } - }); + AtomicIncrement(counter); + SendDataPortion(data).Apply( + [data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) { + const auto& res = result.GetValueSync(); + data->SetSendResult(res); + auto guard = Guard(Lock); + if (!res.IsSuccess()) { + Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; + AtomicIncrement(ErrorsCount); + } else { + Bar->AddProgress(data->GetSize()); } - ) - ); - while(sendings.size() > UploadParams.MaxInFlight) { - sendings.pop_front(); - } + AtomicDecrement(counter); + }); } if (AtomicGet(ErrorsCount)) { break; } } - NThreading::WaitAll(sendings).GetValueSync(); + while(AtomicGet(counter) > 0) { + Sleep(TDuration::MilliSeconds(100)); + } } catch (...) { auto g = Guard(Lock); Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: "; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h index 0dafa6af1a2..8bf49facc66 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h @@ -35,7 +35,7 @@ private: NYdbWorkload::TWorkloadDataInitializer::TPtr Initializer; THolder<TProgressBar> Bar; TAdaptiveLock Lock; - NThreading::TAsyncSemaphore::TPtr InFlightSemaphore; + THolder<TFastSemaphore> InFlightSemaphore; TAtomic ErrorsCount; }; |