aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-11-22 18:48:11 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-11-22 18:48:11 +0300
commit95a38446087c270cb43e6f648dc1ec4bf1953365 (patch)
tree3465c16fd1d0c545459bc32b960b7db53de81d5e
parentb9de8664fdf674a1d4214dc126ed0747ea682f82 (diff)
downloadydb-95a38446087c270cb43e6f648dc1ec4bf1953365.tar.gz
Parallelize requests in workload UT
-rw-r--r--ydb/core/kqp/ut/kqp_workload_ut.cpp53
1 files changed, 31 insertions, 22 deletions
diff --git a/ydb/core/kqp/ut/kqp_workload_ut.cpp b/ydb/core/kqp/ut/kqp_workload_ut.cpp
index aa95ecbd2a..030a928ae8 100644
--- a/ydb/core/kqp/ut/kqp_workload_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_workload_ut.cpp
@@ -6,6 +6,8 @@
#include <ydb/library/workload/stock_workload.h>
#include <ydb/library/workload/kv_workload.h>
+#include <library/cpp/threading/local_executor/local_executor.h>
+
namespace NKikimr::NKqp {
using namespace NYdb;
@@ -19,27 +21,26 @@ void Test(NYdbWorkload::EWorkload workloadType) {
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
- NYdbWorkload::TWorkloadFactory factory;
- std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadQueryGen;
+ std::unique_ptr<NYdbWorkload::TWorkloadParams> params;
if (workloadType == NYdbWorkload::EWorkload::STOCK) {
- NYdbWorkload::TStockWorkloadParams params;
- params.ProductCount = 100;
- params.Quantity = 1000;
- params.OrderCount = 100;
- params.Limit = 10;
- params.MinPartitions = 40;
- params.PartitionsByLoad = true;
- params.DbPath = "/Root";
- workloadQueryGen = factory.GetWorkloadQueryGenerator(workloadType, &params);
+ auto stockParams = std::make_unique<NYdbWorkload::TStockWorkloadParams>();
+ stockParams->ProductCount = 100;
+ stockParams->Quantity = 1000;
+ stockParams->OrderCount = 100;
+ stockParams->Limit = 10;
+ stockParams->MinPartitions = 40;
+ stockParams->PartitionsByLoad = true;
+ params = std::move(stockParams);
} else if (workloadType == NYdbWorkload::EWorkload::KV) {
- NYdbWorkload::TKvWorkloadParams params;
- params.DbPath = "/Root";
- workloadQueryGen = factory.GetWorkloadQueryGenerator(workloadType, &params);
+ params = std::make_unique<NYdbWorkload::TKvWorkloadParams>();
} else {
UNIT_ASSERT(false);
}
+ UNIT_ASSERT(params);
+ params->DbPath = "/Root";
+ auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get());
auto result = session.ExecuteSchemeQuery(workloadQueryGen->GetDDLQueries()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
@@ -56,15 +57,23 @@ void Test(NYdbWorkload::EWorkload workloadType) {
maxType = static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::MaxType);
}
for (int type = 0; type < maxType; ++type) {
- TTimer t;
- for (size_t i = 0; i < REPEATS; ++i) {
- queriesList = workloadQueryGen->GetWorkload(type);
- for (const auto& queryInfo : queriesList) {
- auto result = session.ExecuteDataQuery(TString(queryInfo.Query),
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), queryInfo.Params).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess() || result.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ size_t InFlight = 10;
+ NPar::LocalExecutor().RunAdditionalThreads(InFlight);
+ NPar::LocalExecutor().ExecRange([&db, type, &params, workloadType](int /*id*/) {
+ TTimer t;
+ auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get());
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ for (size_t i = 0; i < REPEATS; ++i) {
+ auto queriesList = workloadQueryGen->GetWorkload(type);
+ for (const auto& queryInfo : queriesList) {
+ auto result = session.ExecuteDataQuery(TString(queryInfo.Query),
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), queryInfo.Params).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess() || result.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED
+ || result.GetStatus() == NYdb::EStatus::ABORTED, result.GetIssues().ToString()
+ << " status: " << int(result.GetStatus()));
+ }
}
- }
+ }, 0, InFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
}
result = session.ExecuteSchemeQuery(workloadQueryGen->GetCleanDDLQueries()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());