aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-08-23 19:03:30 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-08-23 19:03:30 +0300
commit6f53b0cf4c2e1f7ada88e3481a872b9a9c63aebc (patch)
tree3069943b8f4a19e9c63fe350d6f420b2f81efa44
parent800c8a7ec361b0d6de29be680d69a640c3ce9519 (diff)
downloadydb-6f53b0cf4c2e1f7ada88e3481a872b9a9c63aebc.tar.gz
Key-Value Workload for KQP benchmarking
Supports 2 main operations: - insert (a, b) - select (a, b) where a = val
-rw-r--r--ydb/core/blobstorage/testload/test_load_kqp.cpp42
-rw-r--r--ydb/core/protos/blobstorage.proto6
-rw-r--r--ydb/library/workload/CMakeLists.txt1
-rw-r--r--ydb/library/workload/kv_workload.cpp129
-rw-r--r--ydb/library/workload/kv_workload.h60
-rw-r--r--ydb/library/workload/stock_workload.cpp4
-rw-r--r--ydb/library/workload/stock_workload.h12
-rw-r--r--ydb/library/workload/workload_factory.cpp3
-rw-r--r--ydb/library/workload/workload_factory.h1
-rw-r--r--ydb/library/workload/workload_query_generator.h20
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/kv_workload.cpp185
-rw-r--r--ydb/public/lib/ydb_cli/commands/kv_workload.h65
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp2
14 files changed, 513 insertions, 18 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp
index 1f8e0519639..bb4ad82ff76 100644
--- a/ydb/core/blobstorage/testload/test_load_kqp.cpp
+++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp
@@ -15,6 +15,7 @@
#include <ydb/library/workload/workload_factory.h>
#include <ydb/library/workload/stock_workload.h>
+#include <ydb/library/workload/kv_workload.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
@@ -218,7 +219,7 @@ private:
request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
- request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL);
+ request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC);
NKikimrMiniKQL::TParams params;
ConvertYdbParamsToMiniKQLParams(query_params, params);
@@ -348,6 +349,7 @@ public:
NYdbWorkload::TWorkloadFactory factory;
if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) {
+ WorkloadClass = NYdbWorkload::EWorkload::STOCK;
NYdbWorkload::TStockWorkloadParams params;
params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad();
params.OrderCount = cmd.GetStock().GetOrderCount();
@@ -357,6 +359,15 @@ public:
params.DbPath = WorkingDir;
params.MinPartitions = UniformPartitionsCount;
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
+ } else if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kKv) {
+ WorkloadClass = NYdbWorkload::EWorkload::KV;
+ NYdbWorkload::TKvWorkloadParams params;
+ params.InitRowCount = cmd.GetKv().GetInitRowCount();
+ params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();
+ params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey();
+ params.MinPartitions = UniformPartitionsCount;
+ params.DbPath = WorkingDir;
+ WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
} else {
return;
}
@@ -379,6 +390,26 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called");
Become(&TKqpWriterTestLoadActor::StateStart);
+ if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {
+ NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {"
+ << "PartitionsByLoad: " << params->PartitionsByLoad << " "
+ << "OrderCount: " << params->OrderCount << " "
+ << "ProductCount: " << params->ProductCount << " "
+ << "Quantity: " << params->Quantity << " "
+ << "Limit: " << params->Limit << " "
+ << "DbPath: " << params->DbPath << " "
+ << "MinPartitions: " << params->MinPartitions);
+ } else if (WorkloadClass == NYdbWorkload::EWorkload::KV) {
+ NYdbWorkload::TKvWorkloadParams* params = static_cast<NYdbWorkload::TKvWorkloadParams*>(WorkloadQueryGen->GetParams());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload KV, Params: {"
+ << "InitRowCount: " << params->InitRowCount << " "
+ << "PartitionsByLoad: " << params->PartitionsByLoad << " "
+ << "MaxFirstKey: " << params->MaxFirstKey << " "
+ << "MinPartitions: " << params->MinPartitions << " "
+ << "DbPath: " << params->DbPath);
+ }
+
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill");
ctx.Schedule(TDuration::Seconds(DurationSeconds * 2), new TEvents::TEvPoisonPill);
@@ -573,7 +604,7 @@ private:
auto q = std::move(InitData.front());
InitData.pop_front();
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
<< " Creating request for init query, need to exec: " << InitData.size() + 1);
TString query_text = TString(q.Query);
@@ -581,7 +612,7 @@ private:
auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag << " using session: " << TableSession);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " using session: " << TableSession);
request->Record.MutableRequest()->SetSessionId(TableSession);
request->Record.MutableRequest()->SetKeepSession(true);
@@ -595,14 +626,14 @@ private:
request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
- request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL);
+ request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC);
NKikimrMiniKQL::TParams params;
ConvertYdbParamsToMiniKQLParams(query_params, params);
request->Record.MutableRequest()->MutableParameters()->Swap(&params);
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
<< " sending init query to proxy: " + kqp_proxy.ToString());
ctx.Send(kqp_proxy, request.Release());
@@ -734,6 +765,7 @@ private:
ui64 UniformPartitionsCount;
bool DeleteTableOnFinish;
ui32 NumOfSessions;
+ NYdbWorkload::EWorkload WorkloadClass;
NYdbWorkload::TQueryInfoList InitData;
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 09e80b0debd..c7aebd4ef7e 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1544,6 +1544,11 @@ message TEvTestLoadRequest {
optional uint64 Limit = 4 [default = 10];
optional bool PartitionsByLoad = 5 [default = true];
}
+ message TKvWorkload {
+ optional uint64 InitRowCount = 1 [default = 1000];
+ optional bool PartitionsByLoad = 2 [default = true];
+ optional uint64 MaxFirstKey = 3 [default = 5000];
+ }
optional uint64 Tag = 1;
optional uint32 DurationSeconds = 2;
optional uint32 WindowDuration = 3;
@@ -1554,6 +1559,7 @@ message TEvTestLoadRequest {
optional uint32 WorkloadType = 8;
oneof Workload {
TStockWorkload Stock = 9;
+ TKvWorkload Kv = 10;
}
}
diff --git a/ydb/library/workload/CMakeLists.txt b/ydb/library/workload/CMakeLists.txt
index 452733ea629..d0b61750d2d 100644
--- a/ydb/library/workload/CMakeLists.txt
+++ b/ydb/library/workload/CMakeLists.txt
@@ -15,5 +15,6 @@ target_link_libraries(ydb-library-workload PUBLIC
)
target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp
)
diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp
new file mode 100644
index 00000000000..b36b8cd2f30
--- /dev/null
+++ b/ydb/library/workload/kv_workload.cpp
@@ -0,0 +1,129 @@
+#include "kv_workload.h"
+
+#include <util/datetime/base.h>
+
+#include <cmath>
+#include <iomanip>
+#include <string>
+#include <thread>
+#include <random>
+
+namespace NYdbWorkload {
+
+TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
+ : DbPath(params->DbPath)
+ , Params(*params)
+ , Rd()
+ , Gen(Rd())
+ , UniformDistGen(0, Params.MaxFirstKey)
+{
+ Gen.seed(Now().MicroSeconds());
+}
+
+TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {
+ return &Params;
+}
+
+std::string TKvWorkloadGenerator::GetDDLQueries() const {
+ std::string partsNum = std::to_string(Params.MinPartitions);
+ std::string KvPartitionsDdl = "";
+
+ if (Params.PartitionsByLoad) {
+ KvPartitionsDdl = "WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " +
+ partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)";
+ } else {
+ KvPartitionsDdl = "WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " +
+ partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)";
+ }
+
+ static const char TablesDdl[] = R"(--!syntax_v1
+ CREATE TABLE `%s/kv_test`(a Uint64, b Uint64, PRIMARY KEY(a, b)) %s;
+ )";
+
+ char buf[sizeof(TablesDdl) + sizeof(KvPartitionsDdl) + 8192*3]; // 32*256 for DbPath
+ int res = std::sprintf(buf, TablesDdl,
+ DbPath.c_str(), KvPartitionsDdl.c_str()
+ );
+
+ if (res < 0) {
+ return "";
+ }
+
+ return buf;
+}
+
+TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {
+ switch (static_cast<EType>(type)) {
+ case EType::UpsertRandom:
+ return UpsertRandom();
+ case EType::SelectRandom:
+ return SelectRandom();
+ default:
+ return TQueryInfoList();
+ }
+}
+
+TQueryInfoList TKvWorkloadGenerator::UpsertRandom() {
+ std::string query = R"(
+ --!syntax_v1
+ DECLARE $a AS Uint64;
+ DECLARE $b AS Uint64;
+
+ UPSERT INTO `kv_test` (a, b) VALUES ($a, $b);
+ )";
+
+ ui64 a = UniformDistGen(Gen);
+ ui64 b = Gen();
+
+ NYdb::TParamsBuilder paramsBuilder;
+ auto params = paramsBuilder
+ .AddParam("$a")
+ .Uint64(a)
+ .Build()
+ .AddParam("$b")
+ .Uint64(b)
+ .Build()
+ .Build();
+
+ return TQueryInfoList(1, TQueryInfo(query, std::move(params)));
+}
+
+TQueryInfoList TKvWorkloadGenerator::SelectRandom() {
+ std::string query = R"(
+ --!syntax_v1
+ DECLARE $a AS Uint64;
+
+ SELECT * FROM `kv_test` WHERE a = $a
+ )";
+
+ ui64 a = UniformDistGen(Gen);
+
+ NYdb::TParamsBuilder paramsBuilder;
+ auto params = paramsBuilder
+ .AddParam("$a")
+ .Uint64(a)
+ .Build()
+ .Build();
+
+ return TQueryInfoList(1, TQueryInfo(query, std::move(params)));
+}
+
+TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
+ TQueryInfoList res;
+ for (size_t i = 0; i < Params.InitRowCount; ++i) {
+ auto queryInfos = UpsertRandom();
+ res.insert(res.end(), queryInfos.begin(), queryInfos.end());
+ }
+
+ return res;
+}
+
+std::string TKvWorkloadGenerator::GetCleanDDLQueries() const {
+ std::string query = R"(
+ DROP TABLE `kv_test`;
+ )";
+
+ return query;
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h
new file mode 100644
index 00000000000..b681d092ebd
--- /dev/null
+++ b/ydb/library/workload/kv_workload.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include "workload_query_generator.h"
+
+#include <cctype>
+#include <random>
+
+namespace NYdbWorkload {
+
+struct TKvWorkloadParams : public TWorkloadParams {
+ ui64 MinPartitions = 1;
+ ui64 InitRowCount = 1000;
+ ui64 MaxFirstKey = 5000;
+ bool PartitionsByLoad = true;
+};
+
+class TKvWorkloadGenerator : public IWorkloadQueryGenerator {
+public:
+
+ static TKvWorkloadGenerator* New(const TKvWorkloadParams* params) {
+ if (!validateDbPath(params->DbPath)) {
+ throw yexception() << "Invalid path to database." << Endl;
+ }
+ return new TKvWorkloadGenerator(params);
+ }
+
+ virtual ~TKvWorkloadGenerator() {}
+
+ std::string GetDDLQueries() const override;
+
+ TQueryInfoList GetInitialData() override;
+
+ std::string GetCleanDDLQueries() const override;
+
+ TQueryInfoList GetWorkload(int type) override;
+
+ TKvWorkloadParams* GetParams() override;
+
+ enum class EType {
+ UpsertRandom,
+ SelectRandom
+ };
+
+private:
+ TQueryInfoList UpsertRandom();
+ TQueryInfoList SelectRandom();
+
+ TKvWorkloadGenerator(const TKvWorkloadParams* params);
+
+ TQueryInfo FillKvData() const;
+
+ std::string DbPath;
+ TKvWorkloadParams Params;
+
+ std::random_device Rd;
+ std::mt19937_64 Gen;
+ std::uniform_int_distribution<ui64> UniformDistGen;
+};
+
+} // namespace NYdbWorkload \ No newline at end of file
diff --git a/ydb/library/workload/stock_workload.cpp b/ydb/library/workload/stock_workload.cpp
index 28d70fa44c5..f46e16b40c5 100644
--- a/ydb/library/workload/stock_workload.cpp
+++ b/ydb/library/workload/stock_workload.cpp
@@ -31,6 +31,10 @@ TStockWorkloadGenerator::TStockWorkloadGenerator(const TStockWorkloadParams* par
Gen.seed(Now().MicroSeconds());
}
+TStockWorkloadParams* TStockWorkloadGenerator::GetParams() {
+ return &Params;
+}
+
std::string TStockWorkloadGenerator::GetDDLQueries() const {
std::string StockPartitionsDdl = "";
std::string OrdersPartitionsDdl = "WITH (READ_REPLICAS_SETTINGS = \"per_az:1\")";
diff --git a/ydb/library/workload/stock_workload.h b/ydb/library/workload/stock_workload.h
index aaac35c4451..b99edc81c7c 100644
--- a/ydb/library/workload/stock_workload.h
+++ b/ydb/library/workload/stock_workload.h
@@ -37,6 +37,8 @@ public:
TQueryInfoList GetWorkload(int type) override;
+ TStockWorkloadParams* GetParams() override;
+
enum class EType {
InsertRandomOrder,
SubmitRandomOrder,
@@ -65,16 +67,6 @@ private:
TStockWorkloadGenerator(const TStockWorkloadParams* params);
- static bool validateDbPath(const std::string& path) {
- for (size_t i = 0; i < path.size(); ++i) {
- char c = path[i];
- if (!std::isalnum(c) && c != '/' && c != '_' && c != '-') {
- return false;
- }
- }
- return true;
- }
-
TQueryInfo FillStockData() const;
std::string DbPath;
diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp
index 823da997dc8..1b34bc3838c 100644
--- a/ydb/library/workload/workload_factory.cpp
+++ b/ydb/library/workload/workload_factory.cpp
@@ -1,6 +1,7 @@
#include "workload_factory.h"
#include "stock_workload.h"
+#include "kv_workload.h"
namespace NYdbWorkload {
@@ -12,6 +13,8 @@ namespace NYdbWorkload {
if (type == EWorkload::STOCK) {
return std::shared_ptr<TStockWorkloadGenerator>(TStockWorkloadGenerator::New(static_cast<const TStockWorkloadParams*>(params)));
+ } else if (type == EWorkload::KV) {
+ return std::shared_ptr<TKvWorkloadGenerator>(TKvWorkloadGenerator::New(static_cast<const TKvWorkloadParams*>(params)));
}
throw yexception() << "Unknown workload";
diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h
index 53bd764a1ad..a786fae2c4f 100644
--- a/ydb/library/workload/workload_factory.h
+++ b/ydb/library/workload/workload_factory.h
@@ -8,6 +8,7 @@ namespace NYdbWorkload {
enum class EWorkload {
STOCK,
+ KV,
};
class TWorkloadFactory {
diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h
index f48d3960246..7e5291f2ba9 100644
--- a/ydb/library/workload/workload_query_generator.h
+++ b/ydb/library/workload/workload_query_generator.h
@@ -25,6 +25,10 @@ struct TQueryInfo {
using TQueryInfoList = std::list<TQueryInfo>;
+struct TWorkloadParams {
+ std::string DbPath;
+};
+
class IWorkloadQueryGenerator {
public:
virtual ~IWorkloadQueryGenerator() {}
@@ -34,10 +38,20 @@ public:
virtual std::string GetCleanDDLQueries() const = 0;
virtual TQueryInfoList GetWorkload(int type) = 0;
-};
-struct TWorkloadParams {
- std::string DbPath;
+ virtual TWorkloadParams* GetParams() = 0;
+
+protected:
+ static bool validateDbPath(const std::string& path) {
+ for (size_t i = 0; i < path.size(); ++i) {
+ char c = path[i];
+ if (!std::isalnum(c) && c != '/' && c != '_' && c != '-') {
+ return false;
+ }
+ }
+ return true;
+ }
+
};
} // namespace NYdbWorkload
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt
index 5892a1d144a..fe8e547a3ae 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt
@@ -36,6 +36,7 @@ target_link_libraries(clicommands PUBLIC
)
target_sources(clicommands PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
new file mode 100644
index 00000000000..1a0efbc07a8
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
@@ -0,0 +1,185 @@
+#include "kv_workload.h"
+
+#include <ydb/library/workload/kv_workload.h>
+#include <ydb/library/workload/workload_factory.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+namespace NYdb::NConsoleClient {
+
+TCommandKv::TCommandKv()
+ : TClientCommandTree("kv", {}, "YDB kv workload")
+{
+ AddCommand(std::make_unique<TCommandKvInit>());
+ AddCommand(std::make_unique<TCommandKvClean>());
+ AddCommand(std::make_unique<TCommandKvRun>());
+}
+
+TCommandKvInit::TCommandKvInit()
+ : TWorkloadCommand("init", {}, "Create and initialize tables for workload")
+ , InitRowCount(1000)
+ , MinPartitions(1)
+ , MaxFirstKey(5000)
+ , PartitionsByLoad(true)
+{}
+
+void TCommandKvInit::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption("rows-cnt", "count of rows need to Insert while table initialization")
+ .DefaultValue(1000).StoreResult(&InitRowCount);
+ config.Opts->AddLongOption("min-partitions", "Minimum partitions for tables.")
+ .DefaultValue(40).StoreResult(&MinPartitions);
+ config.Opts->AddLongOption("auto-partition", "Enable auto partitioning by load.")
+ .DefaultValue(true).StoreResult(&PartitionsByLoad);
+ config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")
+ .DefaultValue(5000).StoreResult(&MaxFirstKey);
+}
+
+void TCommandKvInit::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandKvInit::Run(TConfig& config) {
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
+ TableClient = std::make_unique<NTable::TTableClient>(*Driver);
+ NYdbWorkload::TKvWorkloadParams params;
+ params.DbPath = config.Database;
+ params.InitRowCount = InitRowCount;
+ params.MinPartitions = MinPartitions;
+ params.PartitionsByLoad = PartitionsByLoad;
+ params.MaxFirstKey = MaxFirstKey;
+
+ NYdbWorkload::TWorkloadFactory factory;
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
+
+ auto session = GetSession();
+ auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync();
+ ThrowOnError(result);
+
+ auto queryInfoList = workloadGen->GetInitialData();
+ for (auto queryInfo : queryInfoList) {
+ auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync();
+ if (!prepareResult.IsSuccess()) {
+ Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl
+ << "Query:\n" << queryInfo.Query << Endl;
+ return EXIT_FAILURE;
+ }
+
+ auto dataQuery = prepareResult.GetQuery();
+ auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
+ std::move(queryInfo.Params)).GetValueSync();
+ if (!result.IsSuccess()) {
+ Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
+ << "Query:\n" << queryInfo.Query << Endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ return EXIT_SUCCESS;
+}
+
+
+TCommandKvClean::TCommandKvClean()
+ : TWorkloadCommand("clean", {}, "drop tables created in init phase") {}
+
+void TCommandKvClean::Config(TConfig& config) {
+ TWorkloadCommand::Config(config);
+ config.SetFreeArgsNum(0);
+}
+
+void TCommandKvClean::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandKvClean::Run(TConfig& config) {
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
+ TableClient = std::make_unique<NTable::TTableClient>(*Driver);
+ NYdbWorkload::TKvWorkloadParams params;
+ params.DbPath = config.Database;
+
+ NYdbWorkload::TWorkloadFactory factory;
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
+
+ auto session = GetSession();
+
+ auto query = workloadGen->GetCleanDDLQueries();
+ TStatus result(EStatus::SUCCESS, NYql::TIssues());
+ result = session.ExecuteSchemeQuery(TString(query)).GetValueSync();
+
+ if (!result.IsSuccess()) {
+ Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
+ << "Query:\n" << query << Endl;
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
+
+TCommandKvRun::TCommandKvRun()
+ : TClientCommandTree("run", {}, "Run YDB KV workload")
+{
+ AddCommand(std::make_unique<TCommandKvRunUpsertRandom>());
+ AddCommand(std::make_unique<TCommandKvRunSelectRandom>());
+}
+
+TCommandKvRunUpsertRandom::TCommandKvRunUpsertRandom()
+ : TWorkloadCommand("upsert", {}, "upsert random pairs (a, b) into table")
+{}
+
+void TCommandKvRunUpsertRandom::Config(TConfig& config) {
+ TWorkloadCommand::Config(config);
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")
+ .DefaultValue(5000).StoreResult(&MaxFirstKey);
+}
+
+void TCommandKvRunUpsertRandom::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandKvRunUpsertRandom::Run(TConfig& config) {
+ PrepareForRun(config);
+
+ NYdbWorkload::TKvWorkloadParams params;
+ params.DbPath = config.Database;
+ params.MaxFirstKey = MaxFirstKey;
+
+ NYdbWorkload::TWorkloadFactory factory;
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
+
+ return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::UpsertRandom));
+}
+
+TCommandKvRunSelectRandom::TCommandKvRunSelectRandom()
+ : TWorkloadCommand("select", {}, "select row by exactly matching of a")
+{}
+
+void TCommandKvRunSelectRandom::Config(TConfig& config) {
+ TWorkloadCommand::Config(config);
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")
+ .DefaultValue(5000).StoreResult(&MaxFirstKey);
+}
+
+void TCommandKvRunSelectRandom::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandKvRunSelectRandom::Run(TConfig& config) {
+ PrepareForRun(config);
+
+ NYdbWorkload::TKvWorkloadParams params;
+ params.DbPath = config.Database;
+ params.MaxFirstKey = MaxFirstKey;
+
+ NYdbWorkload::TWorkloadFactory factory;
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
+
+ return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::SelectRandom));
+}
+
+} // namespace NYdb::NConsoleClient {
diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h
new file mode 100644
index 00000000000..dc7b4fe01ba
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include "ydb/public/lib/ydb_cli/commands/ydb_workload.h"
+
+namespace NYdb {
+namespace NConsoleClient {
+
+class TCommandKv : public TClientCommandTree {
+public:
+ TCommandKv();
+};
+
+class TCommandKvInit : public TWorkloadCommand {
+public:
+ TCommandKvInit();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+private:
+ ui64 InitRowCount;
+ ui64 MinPartitions;
+ ui64 MaxFirstKey;
+ bool PartitionsByLoad;
+};
+
+class TCommandKvClean : public TWorkloadCommand {
+public:
+ TCommandKvClean();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+};
+
+class TCommandKvRun : public TClientCommandTree {
+public:
+ TCommandKvRun();
+};
+
+class TCommandKvRunUpsertRandom : public TWorkloadCommand {
+public:
+ TCommandKvRunUpsertRandom();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+private:
+ ui64 MaxFirstKey;
+
+};
+
+class TCommandKvRunSelectRandom : public TWorkloadCommand {
+public:
+ TCommandKvRunSelectRandom();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+private:
+ ui64 MaxFirstKey;
+
+};
+
+}
+}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index 3c567c3bdf4..89c31c8406d 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -1,6 +1,7 @@
#include "ydb_workload.h"
#include "stock_workload.h"
+#include "kv_workload.h"
#include <ydb/library/workload/workload_factory.h>
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
@@ -34,6 +35,7 @@ TCommandWorkload::TCommandWorkload()
: TClientCommandTree("workload", {}, "YDB workload service")
{
AddCommand(std::make_unique<TCommandStock>());
+ AddCommand(std::make_unique<TCommandKv>());
}
TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description)