diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-11-22 18:48:11 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-11-22 18:48:11 +0300 |
commit | 95a38446087c270cb43e6f648dc1ec4bf1953365 (patch) | |
tree | 3465c16fd1d0c545459bc32b960b7db53de81d5e | |
parent | b9de8664fdf674a1d4214dc126ed0747ea682f82 (diff) | |
download | ydb-95a38446087c270cb43e6f648dc1ec4bf1953365.tar.gz |
Parallelize requests in workload UT
-rw-r--r-- | ydb/core/kqp/ut/kqp_workload_ut.cpp | 53 |
1 files changed, 31 insertions, 22 deletions
diff --git a/ydb/core/kqp/ut/kqp_workload_ut.cpp b/ydb/core/kqp/ut/kqp_workload_ut.cpp index aa95ecbd2a..030a928ae8 100644 --- a/ydb/core/kqp/ut/kqp_workload_ut.cpp +++ b/ydb/core/kqp/ut/kqp_workload_ut.cpp @@ -6,6 +6,8 @@ #include <ydb/library/workload/stock_workload.h> #include <ydb/library/workload/kv_workload.h> +#include <library/cpp/threading/local_executor/local_executor.h> + namespace NKikimr::NKqp { using namespace NYdb; @@ -19,27 +21,26 @@ void Test(NYdbWorkload::EWorkload workloadType) { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - NYdbWorkload::TWorkloadFactory factory; - std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadQueryGen; + std::unique_ptr<NYdbWorkload::TWorkloadParams> params; if (workloadType == NYdbWorkload::EWorkload::STOCK) { - NYdbWorkload::TStockWorkloadParams params; - params.ProductCount = 100; - params.Quantity = 1000; - params.OrderCount = 100; - params.Limit = 10; - params.MinPartitions = 40; - params.PartitionsByLoad = true; - params.DbPath = "/Root"; - workloadQueryGen = factory.GetWorkloadQueryGenerator(workloadType, ¶ms); + auto stockParams = std::make_unique<NYdbWorkload::TStockWorkloadParams>(); + stockParams->ProductCount = 100; + stockParams->Quantity = 1000; + stockParams->OrderCount = 100; + stockParams->Limit = 10; + stockParams->MinPartitions = 40; + stockParams->PartitionsByLoad = true; + params = std::move(stockParams); } else if (workloadType == NYdbWorkload::EWorkload::KV) { - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = "/Root"; - workloadQueryGen = factory.GetWorkloadQueryGenerator(workloadType, ¶ms); + params = std::make_unique<NYdbWorkload::TKvWorkloadParams>(); } else { UNIT_ASSERT(false); } + UNIT_ASSERT(params); + params->DbPath = "/Root"; + auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get()); auto result = session.ExecuteSchemeQuery(workloadQueryGen->GetDDLQueries()).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); @@ -56,15 +57,23 @@ void Test(NYdbWorkload::EWorkload workloadType) { maxType = static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::MaxType); } for (int type = 0; type < maxType; ++type) { - TTimer t; - for (size_t i = 0; i < REPEATS; ++i) { - queriesList = workloadQueryGen->GetWorkload(type); - for (const auto& queryInfo : queriesList) { - auto result = session.ExecuteDataQuery(TString(queryInfo.Query), - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), queryInfo.Params).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess() || result.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + size_t InFlight = 10; + NPar::LocalExecutor().RunAdditionalThreads(InFlight); + NPar::LocalExecutor().ExecRange([&db, type, ¶ms, workloadType](int /*id*/) { + TTimer t; + auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get()); + auto session = db.CreateSession().GetValueSync().GetSession(); + for (size_t i = 0; i < REPEATS; ++i) { + auto queriesList = workloadQueryGen->GetWorkload(type); + for (const auto& queryInfo : queriesList) { + auto result = session.ExecuteDataQuery(TString(queryInfo.Query), + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), queryInfo.Params).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess() || result.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED + || result.GetStatus() == NYdb::EStatus::ABORTED, result.GetIssues().ToString() + << " status: " << int(result.GetStatus())); + } } - } + }, 0, InFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); } result = session.ExecuteSchemeQuery(workloadQueryGen->GetCleanDDLQueries()).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); |