diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-23 19:03:30 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-23 19:03:30 +0300 |
commit | 6f53b0cf4c2e1f7ada88e3481a872b9a9c63aebc (patch) | |
tree | 3069943b8f4a19e9c63fe350d6f420b2f81efa44 | |
parent | 800c8a7ec361b0d6de29be680d69a640c3ce9519 (diff) | |
download | ydb-6f53b0cf4c2e1f7ada88e3481a872b9a9c63aebc.tar.gz |
Key-Value Workload for KQP benchmarking
Supports 2 main operations:
- insert (a, b)
- select (a, b) where a = val
-rw-r--r-- | ydb/core/blobstorage/testload/test_load_kqp.cpp | 42 | ||||
-rw-r--r-- | ydb/core/protos/blobstorage.proto | 6 | ||||
-rw-r--r-- | ydb/library/workload/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.cpp | 129 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.h | 60 | ||||
-rw-r--r-- | ydb/library/workload/stock_workload.cpp | 4 | ||||
-rw-r--r-- | ydb/library/workload/stock_workload.h | 12 | ||||
-rw-r--r-- | ydb/library/workload/workload_factory.cpp | 3 | ||||
-rw-r--r-- | ydb/library/workload/workload_factory.h | 1 | ||||
-rw-r--r-- | ydb/library/workload/workload_query_generator.h | 20 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.cpp | 185 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.h | 65 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 2 |
14 files changed, 513 insertions, 18 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp index 1f8e0519639..bb4ad82ff76 100644 --- a/ydb/core/blobstorage/testload/test_load_kqp.cpp +++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp @@ -15,6 +15,7 @@ #include <ydb/library/workload/workload_factory.h> #include <ydb/library/workload/stock_workload.h> +#include <ydb/library/workload/kv_workload.h> #include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/public/sdk/cpp/client/ydb_params/params.h> @@ -218,7 +219,7 @@ private: request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL); + request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC); NKikimrMiniKQL::TParams params; ConvertYdbParamsToMiniKQLParams(query_params, params); @@ -348,6 +349,7 @@ public: NYdbWorkload::TWorkloadFactory factory; if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) { + WorkloadClass = NYdbWorkload::EWorkload::STOCK; NYdbWorkload::TStockWorkloadParams params; params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad(); params.OrderCount = cmd.GetStock().GetOrderCount(); @@ -357,6 +359,15 @@ public: params.DbPath = WorkingDir; params.MinPartitions = UniformPartitionsCount; WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); + } else if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kKv) { + WorkloadClass = NYdbWorkload::EWorkload::KV; + NYdbWorkload::TKvWorkloadParams params; + params.InitRowCount = cmd.GetKv().GetInitRowCount(); + params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad(); + params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey(); + params.MinPartitions = UniformPartitionsCount; + params.DbPath = WorkingDir; + WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); } else { return; } @@ -379,6 +390,26 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called"); Become(&TKqpWriterTestLoadActor::StateStart); + if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) { + NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {" + << "PartitionsByLoad: " << params->PartitionsByLoad << " " + << "OrderCount: " << params->OrderCount << " " + << "ProductCount: " << params->ProductCount << " " + << "Quantity: " << params->Quantity << " " + << "Limit: " << params->Limit << " " + << "DbPath: " << params->DbPath << " " + << "MinPartitions: " << params->MinPartitions); + } else if (WorkloadClass == NYdbWorkload::EWorkload::KV) { + NYdbWorkload::TKvWorkloadParams* params = static_cast<NYdbWorkload::TKvWorkloadParams*>(WorkloadQueryGen->GetParams()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload KV, Params: {" + << "InitRowCount: " << params->InitRowCount << " " + << "PartitionsByLoad: " << params->PartitionsByLoad << " " + << "MaxFirstKey: " << params->MaxFirstKey << " " + << "MinPartitions: " << params->MinPartitions << " " + << "DbPath: " << params->DbPath); + } + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); ctx.Schedule(TDuration::Seconds(DurationSeconds * 2), new TEvents::TEvPoisonPill); @@ -573,7 +604,7 @@ private: auto q = std::move(InitData.front()); InitData.pop_front(); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Creating request for init query, need to exec: " << InitData.size() + 1); TString query_text = TString(q.Query); @@ -581,7 +612,7 @@ private: auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag << " using session: " << TableSession); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " using session: " << TableSession); request->Record.MutableRequest()->SetSessionId(TableSession); request->Record.MutableRequest()->SetKeepSession(true); @@ -595,14 +626,14 @@ private: request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL); + request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_BASIC); NKikimrMiniKQL::TParams params; ConvertYdbParamsToMiniKQLParams(query_params, params); request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending init query to proxy: " + kqp_proxy.ToString()); ctx.Send(kqp_proxy, request.Release()); @@ -734,6 +765,7 @@ private: ui64 UniformPartitionsCount; bool DeleteTableOnFinish; ui32 NumOfSessions; + NYdbWorkload::EWorkload WorkloadClass; NYdbWorkload::TQueryInfoList InitData; diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 09e80b0debd..c7aebd4ef7e 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1544,6 +1544,11 @@ message TEvTestLoadRequest { optional uint64 Limit = 4 [default = 10]; optional bool PartitionsByLoad = 5 [default = true]; } + message TKvWorkload { + optional uint64 InitRowCount = 1 [default = 1000]; + optional bool PartitionsByLoad = 2 [default = true]; + optional uint64 MaxFirstKey = 3 [default = 5000]; + } optional uint64 Tag = 1; optional uint32 DurationSeconds = 2; optional uint32 WindowDuration = 3; @@ -1554,6 +1559,7 @@ message TEvTestLoadRequest { optional uint32 WorkloadType = 8; oneof Workload { TStockWorkload Stock = 9; + TKvWorkload Kv = 10; } } diff --git a/ydb/library/workload/CMakeLists.txt b/ydb/library/workload/CMakeLists.txt index 452733ea629..d0b61750d2d 100644 --- a/ydb/library/workload/CMakeLists.txt +++ b/ydb/library/workload/CMakeLists.txt @@ -15,5 +15,6 @@ target_link_libraries(ydb-library-workload PUBLIC ) target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp ) diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp new file mode 100644 index 00000000000..b36b8cd2f30 --- /dev/null +++ b/ydb/library/workload/kv_workload.cpp @@ -0,0 +1,129 @@ +#include "kv_workload.h" + +#include <util/datetime/base.h> + +#include <cmath> +#include <iomanip> +#include <string> +#include <thread> +#include <random> + +namespace NYdbWorkload { + +TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params) + : DbPath(params->DbPath) + , Params(*params) + , Rd() + , Gen(Rd()) + , UniformDistGen(0, Params.MaxFirstKey) +{ + Gen.seed(Now().MicroSeconds()); +} + +TKvWorkloadParams* TKvWorkloadGenerator::GetParams() { + return &Params; +} + +std::string TKvWorkloadGenerator::GetDDLQueries() const { + std::string partsNum = std::to_string(Params.MinPartitions); + std::string KvPartitionsDdl = ""; + + if (Params.PartitionsByLoad) { + KvPartitionsDdl = "WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " + + partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; + } else { + KvPartitionsDdl = "WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " + + partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; + } + + static const char TablesDdl[] = R"(--!syntax_v1 + CREATE TABLE `%s/kv_test`(a Uint64, b Uint64, PRIMARY KEY(a, b)) %s; + )"; + + char buf[sizeof(TablesDdl) + sizeof(KvPartitionsDdl) + 8192*3]; // 32*256 for DbPath + int res = std::sprintf(buf, TablesDdl, + DbPath.c_str(), KvPartitionsDdl.c_str() + ); + + if (res < 0) { + return ""; + } + + return buf; +} + +TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) { + switch (static_cast<EType>(type)) { + case EType::UpsertRandom: + return UpsertRandom(); + case EType::SelectRandom: + return SelectRandom(); + default: + return TQueryInfoList(); + } +} + +TQueryInfoList TKvWorkloadGenerator::UpsertRandom() { + std::string query = R"( + --!syntax_v1 + DECLARE $a AS Uint64; + DECLARE $b AS Uint64; + + UPSERT INTO `kv_test` (a, b) VALUES ($a, $b); + )"; + + ui64 a = UniformDistGen(Gen); + ui64 b = Gen(); + + NYdb::TParamsBuilder paramsBuilder; + auto params = paramsBuilder + .AddParam("$a") + .Uint64(a) + .Build() + .AddParam("$b") + .Uint64(b) + .Build() + .Build(); + + return TQueryInfoList(1, TQueryInfo(query, std::move(params))); +} + +TQueryInfoList TKvWorkloadGenerator::SelectRandom() { + std::string query = R"( + --!syntax_v1 + DECLARE $a AS Uint64; + + SELECT * FROM `kv_test` WHERE a = $a + )"; + + ui64 a = UniformDistGen(Gen); + + NYdb::TParamsBuilder paramsBuilder; + auto params = paramsBuilder + .AddParam("$a") + .Uint64(a) + .Build() + .Build(); + + return TQueryInfoList(1, TQueryInfo(query, std::move(params))); +} + +TQueryInfoList TKvWorkloadGenerator::GetInitialData() { + TQueryInfoList res; + for (size_t i = 0; i < Params.InitRowCount; ++i) { + auto queryInfos = UpsertRandom(); + res.insert(res.end(), queryInfos.begin(), queryInfos.end()); + } + + return res; +} + +std::string TKvWorkloadGenerator::GetCleanDDLQueries() const { + std::string query = R"( + DROP TABLE `kv_test`; + )"; + + return query; +} + +}
\ No newline at end of file diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h new file mode 100644 index 00000000000..b681d092ebd --- /dev/null +++ b/ydb/library/workload/kv_workload.h @@ -0,0 +1,60 @@ +#pragma once + +#include "workload_query_generator.h" + +#include <cctype> +#include <random> + +namespace NYdbWorkload { + +struct TKvWorkloadParams : public TWorkloadParams { + ui64 MinPartitions = 1; + ui64 InitRowCount = 1000; + ui64 MaxFirstKey = 5000; + bool PartitionsByLoad = true; +}; + +class TKvWorkloadGenerator : public IWorkloadQueryGenerator { +public: + + static TKvWorkloadGenerator* New(const TKvWorkloadParams* params) { + if (!validateDbPath(params->DbPath)) { + throw yexception() << "Invalid path to database." << Endl; + } + return new TKvWorkloadGenerator(params); + } + + virtual ~TKvWorkloadGenerator() {} + + std::string GetDDLQueries() const override; + + TQueryInfoList GetInitialData() override; + + std::string GetCleanDDLQueries() const override; + + TQueryInfoList GetWorkload(int type) override; + + TKvWorkloadParams* GetParams() override; + + enum class EType { + UpsertRandom, + SelectRandom + }; + +private: + TQueryInfoList UpsertRandom(); + TQueryInfoList SelectRandom(); + + TKvWorkloadGenerator(const TKvWorkloadParams* params); + + TQueryInfo FillKvData() const; + + std::string DbPath; + TKvWorkloadParams Params; + + std::random_device Rd; + std::mt19937_64 Gen; + std::uniform_int_distribution<ui64> UniformDistGen; +}; + +} // namespace NYdbWorkload
\ No newline at end of file diff --git a/ydb/library/workload/stock_workload.cpp b/ydb/library/workload/stock_workload.cpp index 28d70fa44c5..f46e16b40c5 100644 --- a/ydb/library/workload/stock_workload.cpp +++ b/ydb/library/workload/stock_workload.cpp @@ -31,6 +31,10 @@ TStockWorkloadGenerator::TStockWorkloadGenerator(const TStockWorkloadParams* par Gen.seed(Now().MicroSeconds()); } +TStockWorkloadParams* TStockWorkloadGenerator::GetParams() { + return &Params; +} + std::string TStockWorkloadGenerator::GetDDLQueries() const { std::string StockPartitionsDdl = ""; std::string OrdersPartitionsDdl = "WITH (READ_REPLICAS_SETTINGS = \"per_az:1\")"; diff --git a/ydb/library/workload/stock_workload.h b/ydb/library/workload/stock_workload.h index aaac35c4451..b99edc81c7c 100644 --- a/ydb/library/workload/stock_workload.h +++ b/ydb/library/workload/stock_workload.h @@ -37,6 +37,8 @@ public: TQueryInfoList GetWorkload(int type) override; + TStockWorkloadParams* GetParams() override; + enum class EType { InsertRandomOrder, SubmitRandomOrder, @@ -65,16 +67,6 @@ private: TStockWorkloadGenerator(const TStockWorkloadParams* params); - static bool validateDbPath(const std::string& path) { - for (size_t i = 0; i < path.size(); ++i) { - char c = path[i]; - if (!std::isalnum(c) && c != '/' && c != '_' && c != '-') { - return false; - } - } - return true; - } - TQueryInfo FillStockData() const; std::string DbPath; diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp index 823da997dc8..1b34bc3838c 100644 --- a/ydb/library/workload/workload_factory.cpp +++ b/ydb/library/workload/workload_factory.cpp @@ -1,6 +1,7 @@ #include "workload_factory.h" #include "stock_workload.h" +#include "kv_workload.h" namespace NYdbWorkload { @@ -12,6 +13,8 @@ namespace NYdbWorkload { if (type == EWorkload::STOCK) { return std::shared_ptr<TStockWorkloadGenerator>(TStockWorkloadGenerator::New(static_cast<const TStockWorkloadParams*>(params))); + } else if (type == EWorkload::KV) { + return std::shared_ptr<TKvWorkloadGenerator>(TKvWorkloadGenerator::New(static_cast<const TKvWorkloadParams*>(params))); } throw yexception() << "Unknown workload"; diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h index 53bd764a1ad..a786fae2c4f 100644 --- a/ydb/library/workload/workload_factory.h +++ b/ydb/library/workload/workload_factory.h @@ -8,6 +8,7 @@ namespace NYdbWorkload { enum class EWorkload { STOCK, + KV, }; class TWorkloadFactory { diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h index f48d3960246..7e5291f2ba9 100644 --- a/ydb/library/workload/workload_query_generator.h +++ b/ydb/library/workload/workload_query_generator.h @@ -25,6 +25,10 @@ struct TQueryInfo { using TQueryInfoList = std::list<TQueryInfo>; +struct TWorkloadParams { + std::string DbPath; +}; + class IWorkloadQueryGenerator { public: virtual ~IWorkloadQueryGenerator() {} @@ -34,10 +38,20 @@ public: virtual std::string GetCleanDDLQueries() const = 0; virtual TQueryInfoList GetWorkload(int type) = 0; -}; -struct TWorkloadParams { - std::string DbPath; + virtual TWorkloadParams* GetParams() = 0; + +protected: + static bool validateDbPath(const std::string& path) { + for (size_t i = 0; i < path.size(); ++i) { + char c = path[i]; + if (!std::isalnum(c) && c != '/' && c != '_' && c != '-') { + return false; + } + } + return true; + } + }; } // namespace NYdbWorkload diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt index 5892a1d144a..fe8e547a3ae 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt @@ -36,6 +36,7 @@ target_link_libraries(clicommands PUBLIC ) target_sources(clicommands PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/stock_workload.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/kv_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp new file mode 100644 index 00000000000..1a0efbc07a8 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp @@ -0,0 +1,185 @@ +#include "kv_workload.h" + +#include <ydb/library/workload/kv_workload.h> +#include <ydb/library/workload/workload_factory.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> + +namespace NYdb::NConsoleClient { + +TCommandKv::TCommandKv() + : TClientCommandTree("kv", {}, "YDB kv workload") +{ + AddCommand(std::make_unique<TCommandKvInit>()); + AddCommand(std::make_unique<TCommandKvClean>()); + AddCommand(std::make_unique<TCommandKvRun>()); +} + +TCommandKvInit::TCommandKvInit() + : TWorkloadCommand("init", {}, "Create and initialize tables for workload") + , InitRowCount(1000) + , MinPartitions(1) + , MaxFirstKey(5000) + , PartitionsByLoad(true) +{} + +void TCommandKvInit::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption("rows-cnt", "count of rows need to Insert while table initialization") + .DefaultValue(1000).StoreResult(&InitRowCount); + config.Opts->AddLongOption("min-partitions", "Minimum partitions for tables.") + .DefaultValue(40).StoreResult(&MinPartitions); + config.Opts->AddLongOption("auto-partition", "Enable auto partitioning by load.") + .DefaultValue(true).StoreResult(&PartitionsByLoad); + config.Opts->AddLongOption("max-first-key", "maximum value of first primary key") + .DefaultValue(5000).StoreResult(&MaxFirstKey); +} + +void TCommandKvInit::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandKvInit::Run(TConfig& config) { + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); + TableClient = std::make_unique<NTable::TTableClient>(*Driver); + NYdbWorkload::TKvWorkloadParams params; + params.DbPath = config.Database; + params.InitRowCount = InitRowCount; + params.MinPartitions = MinPartitions; + params.PartitionsByLoad = PartitionsByLoad; + params.MaxFirstKey = MaxFirstKey; + + NYdbWorkload::TWorkloadFactory factory; + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); + + auto session = GetSession(); + auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); + ThrowOnError(result); + + auto queryInfoList = workloadGen->GetInitialData(); + for (auto queryInfo : queryInfoList) { + auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); + if (!prepareResult.IsSuccess()) { + Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl + << "Query:\n" << queryInfo.Query << Endl; + return EXIT_FAILURE; + } + + auto dataQuery = prepareResult.GetQuery(); + auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), + std::move(queryInfo.Params)).GetValueSync(); + if (!result.IsSuccess()) { + Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl + << "Query:\n" << queryInfo.Query << Endl; + return EXIT_FAILURE; + } + } + + return EXIT_SUCCESS; +} + + +TCommandKvClean::TCommandKvClean() + : TWorkloadCommand("clean", {}, "drop tables created in init phase") {} + +void TCommandKvClean::Config(TConfig& config) { + TWorkloadCommand::Config(config); + config.SetFreeArgsNum(0); +} + +void TCommandKvClean::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandKvClean::Run(TConfig& config) { + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); + TableClient = std::make_unique<NTable::TTableClient>(*Driver); + NYdbWorkload::TKvWorkloadParams params; + params.DbPath = config.Database; + + NYdbWorkload::TWorkloadFactory factory; + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); + + auto session = GetSession(); + + auto query = workloadGen->GetCleanDDLQueries(); + TStatus result(EStatus::SUCCESS, NYql::TIssues()); + result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); + + if (!result.IsSuccess()) { + Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl + << "Query:\n" << query << Endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} + +TCommandKvRun::TCommandKvRun() + : TClientCommandTree("run", {}, "Run YDB KV workload") +{ + AddCommand(std::make_unique<TCommandKvRunUpsertRandom>()); + AddCommand(std::make_unique<TCommandKvRunSelectRandom>()); +} + +TCommandKvRunUpsertRandom::TCommandKvRunUpsertRandom() + : TWorkloadCommand("upsert", {}, "upsert random pairs (a, b) into table") +{} + +void TCommandKvRunUpsertRandom::Config(TConfig& config) { + TWorkloadCommand::Config(config); + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption("max-first-key", "maximum value of first primary key") + .DefaultValue(5000).StoreResult(&MaxFirstKey); +} + +void TCommandKvRunUpsertRandom::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandKvRunUpsertRandom::Run(TConfig& config) { + PrepareForRun(config); + + NYdbWorkload::TKvWorkloadParams params; + params.DbPath = config.Database; + params.MaxFirstKey = MaxFirstKey; + + NYdbWorkload::TWorkloadFactory factory; + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); + + return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::UpsertRandom)); +} + +TCommandKvRunSelectRandom::TCommandKvRunSelectRandom() + : TWorkloadCommand("select", {}, "select row by exactly matching of a") +{} + +void TCommandKvRunSelectRandom::Config(TConfig& config) { + TWorkloadCommand::Config(config); + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption("max-first-key", "maximum value of first primary key") + .DefaultValue(5000).StoreResult(&MaxFirstKey); +} + +void TCommandKvRunSelectRandom::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandKvRunSelectRandom::Run(TConfig& config) { + PrepareForRun(config); + + NYdbWorkload::TKvWorkloadParams params; + params.DbPath = config.Database; + params.MaxFirstKey = MaxFirstKey; + + NYdbWorkload::TWorkloadFactory factory; + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); + + return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::SelectRandom)); +} + +} // namespace NYdb::NConsoleClient { diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h new file mode 100644 index 00000000000..dc7b4fe01ba --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h @@ -0,0 +1,65 @@ +#pragma once + +#include "ydb/public/lib/ydb_cli/commands/ydb_workload.h" + +namespace NYdb { +namespace NConsoleClient { + +class TCommandKv : public TClientCommandTree { +public: + TCommandKv(); +}; + +class TCommandKvInit : public TWorkloadCommand { +public: + TCommandKvInit(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +private: + ui64 InitRowCount; + ui64 MinPartitions; + ui64 MaxFirstKey; + bool PartitionsByLoad; +}; + +class TCommandKvClean : public TWorkloadCommand { +public: + TCommandKvClean(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; +}; + +class TCommandKvRun : public TClientCommandTree { +public: + TCommandKvRun(); +}; + +class TCommandKvRunUpsertRandom : public TWorkloadCommand { +public: + TCommandKvRunUpsertRandom(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +private: + ui64 MaxFirstKey; + +}; + +class TCommandKvRunSelectRandom : public TWorkloadCommand { +public: + TCommandKvRunSelectRandom(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +private: + ui64 MaxFirstKey; + +}; + +} +} diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 3c567c3bdf4..89c31c8406d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -1,6 +1,7 @@ #include "ydb_workload.h" #include "stock_workload.h" +#include "kv_workload.h" #include <ydb/library/workload/workload_factory.h> #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> @@ -34,6 +35,7 @@ TCommandWorkload::TCommandWorkload() : TClientCommandTree("workload", {}, "YDB workload service") { AddCommand(std::make_unique<TCommandStock>()); + AddCommand(std::make_unique<TCommandKv>()); } TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description) |