summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <[email protected]>2024-09-22 00:01:13 +0300
committerGitHub <[email protected]>2024-09-22 00:01:13 +0300
commiteec992f3c98e5c2a213a3f0ee09f47db6027df14 (patch)
tree66561fd569e8eb8c9fd14a735b67c9cc923ef2f6
parent34e573cbc641b74d8321cb43981c1558dff28c95 (diff)
Fix generating too much data in memory for ydb workload import command (#9601)
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp41
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.h2
2 files changed, 19 insertions, 24 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
index b0241f497f5..225ad93ecc6 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
@@ -44,7 +44,7 @@ TWorkloadCommandImport::TUploadCommand::TUploadCommand(NYdbWorkload::TWorkloadPa
int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& /*workloadGen*/, TConfig& /*config*/) {
auto dataGeneratorList = Initializer->GetBulkInitialData();
AtomicSet(ErrorsCount, 0);
- InFlightSemaphore = NThreading::TAsyncSemaphore::Make(UploadParams.MaxInFlight);
+ InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight);
for (auto dataGen : dataGeneratorList) {
TThreadPoolParams params;
params.SetCatching(false);
@@ -97,36 +97,31 @@ TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkloa
}
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
- TDeque<NThreading::TFuture<void>> sendings;
+ TAtomic counter = 0;
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
for (const auto& data: portions) {
- sendings.emplace_back(
- InFlightSemaphore->AcquireAsync().Apply([this, data](const auto& sem) {
- auto ar = MakeAtomicShared<NThreading::TAsyncSemaphore::TAutoRelease>(sem.GetValueSync()->MakeAutoRelease());
- return SendDataPortion(data).Apply(
- [ar, data, this](const TAsyncStatus& result) {
- const auto& res = result.GetValueSync();
- data->SetSendResult(res);
- auto guard = Guard(Lock);
- if (!res.IsSuccess()) {
- Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
- AtomicIncrement(ErrorsCount);
- } else {
- Bar->AddProgress(data->GetSize());
- }
- });
+ AtomicIncrement(counter);
+ SendDataPortion(data).Apply(
+ [data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
+ const auto& res = result.GetValueSync();
+ data->SetSendResult(res);
+ auto guard = Guard(Lock);
+ if (!res.IsSuccess()) {
+ Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
+ AtomicIncrement(ErrorsCount);
+ } else {
+ Bar->AddProgress(data->GetSize());
}
- )
- );
- while(sendings.size() > UploadParams.MaxInFlight) {
- sendings.pop_front();
- }
+ AtomicDecrement(counter);
+ });
}
if (AtomicGet(ErrorsCount)) {
break;
}
}
- NThreading::WaitAll(sendings).GetValueSync();
+ while(AtomicGet(counter) > 0) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
} catch (...) {
auto g = Guard(Lock);
Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: ";
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
index 0dafa6af1a2..8bf49facc66 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
@@ -35,7 +35,7 @@ private:
NYdbWorkload::TWorkloadDataInitializer::TPtr Initializer;
THolder<TProgressBar> Bar;
TAdaptiveLock Lock;
- NThreading::TAsyncSemaphore::TPtr InFlightSemaphore;
+ THolder<TFastSemaphore> InFlightSemaphore;
TAtomic ErrorsCount;
};