diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-30 17:42:47 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-30 17:42:47 +0300 |
commit | b39c849cda9decf6c19b135df25f21b5cf0d0d5b (patch) | |
tree | 7c66544c9538c58469c92d184b2764432ba004dc | |
parent | f79f740e5b92e2c8a10c9c74383e6b47e63a3ef3 (diff) | |
download | ydb-b39c849cda9decf6c19b135df25f21b5cf0d0d5b.tar.gz |
add MORE workloads for KQP
Adds 2 Workloads for KQP:
1) BigString -> generates kv pairs with big string as second field
2) MultiColumn -> generates multi column table with FetchCnt not null columns
-rw-r--r-- | ydb/core/blobstorage/testload/test_load_kqp.cpp | 132 | ||||
-rw-r--r-- | ydb/core/protos/blobstorage.proto | 4 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.cpp | 116 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.h | 7 | ||||
-rw-r--r-- | ydb/library/workload/workload_factory.cpp | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.cpp | 58 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.h | 5 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/stock_workload.cpp | 39 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 45 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.h | 6 |
10 files changed, 223 insertions, 191 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp index bb4ad82ff7..1efe502282 100644 --- a/ydb/core/blobstorage/testload/test_load_kqp.cpp +++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp @@ -45,7 +45,7 @@ public: : WindowHist(60000, 2) , WindowErrors(window_errors) { - WindowHist.Add(hist); + WindowHist.Add(hist); } void Add(const MonitoringData& other) { @@ -63,7 +63,7 @@ struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerRespo public: TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag) : Data(hist, window_errors) - , Phase(phase) + , Phase(phase) , WorkerTag(worker_tag) {} public: @@ -89,16 +89,16 @@ void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL: class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> { public: TKqpLoadWorker(TActorId parent, - TString working_dir, - std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen, - ui64 workload_type, - ui64 parentTag, + TString working_dir, + std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen, + ui64 workload_type, + ui64 parentTag, ui64 workerTag, ui64 durationSeconds, ui64 windowDuration, - ui64 windowCount, + ui64 windowCount, NMonitoring::TDynamicCounters::TCounterPtr transactions, - NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten) + NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten) : Parent(std::move(parent)) , WorkingDir(std::move(working_dir)) , WorkloadQueryGen(workload_query_gen) @@ -114,7 +114,7 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called"); - + ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); @@ -139,7 +139,7 @@ private: if (Phase < WindowCount) { SendMonitoringEvent(ctx); } - + CloseSession(ctx); Die(ctx); } @@ -149,9 +149,9 @@ private: auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); ev->Record.MutableRequest()->SetSessionId(WorkerSession); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " sending session close query to proxy: " + kqp_proxy.ToString()); ctx.Send(kqp_proxy, ev.Release()); @@ -166,11 +166,11 @@ private: auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); ev->Record.MutableRequest()->SetDatabase(WorkingDir); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " sending event for session creation to proxy: " << kqp_proxy.ToString()); - + Send(kqp_proxy, ev.Release()); } @@ -182,7 +182,7 @@ private: LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " Session is created: " + WorkerSession); CreateDataQuery(ctx); } else { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " Session creation failed: " + ev->Get()->ToString()); } } @@ -195,7 +195,7 @@ private: auto q = std::move(queries.front()); queries.pop_front(); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size()); Transactions->Inc(); @@ -224,13 +224,13 @@ private: NKikimrMiniKQL::TParams params; ConvertYdbParamsToMiniKQLParams(query_params, params); request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " sending data query to proxy: " + kqp_proxy.ToString()); ctx.Send(kqp_proxy, request.Release()); - + } void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { @@ -243,7 +243,7 @@ private: TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize()); WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs()); } else { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " data request status: Fail, Issue: " + ev->Get()->ToString()); ++WindowErrors; } @@ -258,22 +258,22 @@ private: // monitoring void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " handle TEvUpdateMonitoring, Phase: " << Phase); - + SendMonitoringEvent(ctx); if (Phase < WindowCount) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag - << " reschedule TEvUpdateMonitoring, Phase: " << Phase); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " reschedule TEvUpdateMonitoring, Phase: " << Phase); ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); } } private: - + // common - + void SendMonitoringEvent(const TActorContext& ctx) { auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag); @@ -319,9 +319,9 @@ public: } TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, - const TActorId& parent, - const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, - ui64 index, + const TActorId& parent, + const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, + ui64 index, ui64 tag) : Parent(parent) , Tag(tag) @@ -345,7 +345,7 @@ public: for (size_t i = 0; i < WindowCount; ++i) { Chunk.push_back(std::make_unique<MonitoringData>()); } - + NYdbWorkload::TWorkloadFactory factory; if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) { @@ -365,18 +365,20 @@ public: params.InitRowCount = cmd.GetKv().GetInitRowCount(); params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad(); params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey(); + params.StringLen = cmd.GetKv().GetStringLen(); + params.ColumnsCnt = cmd.GetKv().GetColumnsCnt(); params.MinPartitions = UniformPartitionsCount; params.DbPath = WorkingDir; WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); } else { return; } - + Y_ASSERT(WorkloadQueryGen.get() != nullptr); Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds()); // Monitoring initialization - + LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag)); Transactions = LoadCounters->GetCounter("Transactions", true); TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true); @@ -389,7 +391,7 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called"); Become(&TKqpWriterTestLoadActor::StateStart); - + if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) { NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {" @@ -407,6 +409,8 @@ public: << "PartitionsByLoad: " << params->PartitionsByLoad << " " << "MaxFirstKey: " << params->MaxFirstKey << " " << "MinPartitions: " << params->MinPartitions << " " + << "StringLen: " << params->StringLen << " " + << "ColumnsCnt: " << params->ColumnsCnt << " " << "DbPath: " << params->DbPath); } @@ -457,14 +461,14 @@ private: void DropTables(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables drop"); - + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetDatabase(WorkingDir); ev->Record.MutableRequest()->SetSessionId(TableSession); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL); ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetCleanDDLQueries()); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending drop tables query to proxy: " + kqp_proxy.ToString()); @@ -483,12 +487,12 @@ private: DeathReport(ctx); } - void DeathReport(const TActorContext& ctx) { + void DeathReport(const TActorContext& ctx) { CloseSession(ctx); TIntrusivePtr<TLoadReport> Report(new TLoadReport()); Report->Duration = TDuration::Seconds(DurationSeconds); - + ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess")); Die(ctx); } @@ -498,8 +502,8 @@ private: // monitoring void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) { const auto& response = ev->Get(); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag << " Phase: " << response->Phase << " Min: " << response->Data.WindowHist.GetMin() << " Max: " << response->Data.WindowHist.GetMax() @@ -517,7 +521,7 @@ private: void SendNewRowToParent(const TActorContext& ctx) { Phase += 1; - LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " total: Phase: " << Phase << " -> " << Total->WindowHist.GetTotalCount() << " | " << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | " @@ -527,7 +531,7 @@ private: << Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0) << " | " << Total->WindowHist.GetMax() / (WindowDuration * 1000.0) ); - + if (Phase >= WindowCount) { StartDeathProcess(ctx); } @@ -542,10 +546,10 @@ private: auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); ev->Record.MutableRequest()->SetDatabase(WorkingDir); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending event for session creation to proxy: " << kqp_proxy.ToString()); - + Send(kqp_proxy, ev.Release()); } @@ -563,14 +567,14 @@ private: void CreateTables(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables creation"); - + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetDatabase(WorkingDir); ev->Record.MutableRequest()->SetSessionId(TableSession); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL); ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetDDLQueries()); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending ddl query to proxy: " + kqp_proxy.ToString()); @@ -580,14 +584,14 @@ private: void HandleCreateTableResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { auto& response = ev->Get()->Record.GetRef(); - Become(&TKqpWriterTestLoadActor::StateMain); - if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created"); + Become(&TKqpWriterTestLoadActor::StateMain); + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created"); InitData = WorkloadQueryGen->GetInitialData(); InsertInitData(ctx); } else { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString()); + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString()); + CreateTables(ctx); } } @@ -604,7 +608,7 @@ private: auto q = std::move(InitData.front()); InitData.pop_front(); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Creating request for init query, need to exec: " << InitData.size() + 1); TString query_text = TString(q.Query); @@ -631,7 +635,7 @@ private: NKikimrMiniKQL::TParams params; ConvertYdbParamsToMiniKQLParams(query_params, params); request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending init query to proxy: " + kqp_proxy.ToString()); @@ -699,7 +703,7 @@ private: TABLED() { str << Total->WindowHist.GetMax() / (WindowDuration * 1000.0); }; } for (size_t i = Phase; i >= 1; --i) { - TABLER() { + TABLER() { TABLED() { str << i; }; TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); }; TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); }; @@ -727,16 +731,16 @@ private: void InitWorkers(const TActorContext& ctx) { for (ui64 i = 0; i < NumOfSessions; ++i) { auto* worker = new TKqpLoadWorker( - SelfId(), - WorkingDir, - WorkloadQueryGen, - WorkloadType, - Tag, - i, - DurationSeconds, - WindowDuration, - WindowCount, - Transactions, + SelfId(), + WorkingDir, + WorkloadQueryGen, + WorkloadType, + Tag, + i, + DurationSeconds, + WindowDuration, + WindowCount, + Transactions, TransactionsBytesWritten); Workers.push_back(ctx.Register(worker)); } @@ -747,7 +751,7 @@ private: auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); ev->Record.MutableRequest()->SetSessionId(TableSession); - + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending session close query to proxy: " + kqp_proxy.ToString()); diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 72f4f32e17..4209b1dd2f 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1544,13 +1544,15 @@ message TEvTestLoadRequest { optional uint64 ProductCount = 1 [default = 100]; optional uint64 Quantity = 2 [default = 1000]; optional uint64 OrderCount = 3 [default = 100]; - optional uint64 Limit = 4 [default = 10]; + optional uint64 Limit = 4 [default = 10]; optional bool PartitionsByLoad = 5 [default = true]; } message TKvWorkload { optional uint64 InitRowCount = 1 [default = 1000]; optional bool PartitionsByLoad = 2 [default = true]; optional uint64 MaxFirstKey = 3 [default = 5000]; + optional uint64 StringLen = 4 [default = 8]; + optional uint64 ColumnsCnt = 5 [default = 2]; } optional uint64 Tag = 1; optional uint32 DurationSeconds = 2; diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp index b36b8cd2f3..49786e1b92 100644 --- a/ydb/library/workload/kv_workload.cpp +++ b/ydb/library/workload/kv_workload.cpp @@ -2,20 +2,24 @@ #include <util/datetime/base.h> +#include <ydb/core/util/lz4_data_generator.h> + #include <cmath> #include <iomanip> #include <string> #include <thread> #include <random> +#include <sstream> namespace NYdbWorkload { TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params) : DbPath(params->DbPath) , Params(*params) + , BigString(NKikimr::GenDataForLZ4(Params.StringLen)) , Rd() , Gen(Rd()) - , UniformDistGen(0, Params.MaxFirstKey) + , KeyUniformDistGen(0, Params.MaxFirstKey) { Gen.seed(Now().MicroSeconds()); } @@ -26,30 +30,26 @@ TKvWorkloadParams* TKvWorkloadGenerator::GetParams() { std::string TKvWorkloadGenerator::GetDDLQueries() const { std::string partsNum = std::to_string(Params.MinPartitions); - std::string KvPartitionsDdl = ""; - if (Params.PartitionsByLoad) { - KvPartitionsDdl = "WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " + - partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; - } else { - KvPartitionsDdl = "WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " + - partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; - } + std::stringstream ss; - static const char TablesDdl[] = R"(--!syntax_v1 - CREATE TABLE `%s/kv_test`(a Uint64, b Uint64, PRIMARY KEY(a, b)) %s; - )"; + ss << "--!syntax_v1\n"; + ss << "CREATE TABLE `" << DbPath << "/kv_test`(c0 Uint64, "; - char buf[sizeof(TablesDdl) + sizeof(KvPartitionsDdl) + 8192*3]; // 32*256 for DbPath - int res = std::sprintf(buf, TablesDdl, - DbPath.c_str(), KvPartitionsDdl.c_str() - ); + for (size_t i = 1; i < Params.ColumnsCnt; ++i) { + ss << "c" << i << " " << "String, "; + } + + ss << "PRIMARY KEY(c0)) WITH ("; - if (res < 0) { - return ""; + if (Params.PartitionsByLoad) { + ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; } - return buf; + ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << partsNum << ", " + << "UNIFORM_PARTITIONS = " << partsNum << ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; + + return ss.str(); } TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) { @@ -64,48 +64,70 @@ TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) { } TQueryInfoList TKvWorkloadGenerator::UpsertRandom() { - std::string query = R"( - --!syntax_v1 - DECLARE $a AS Uint64; - DECLARE $b AS Uint64; + std::stringstream ss; - UPSERT INTO `kv_test` (a, b) VALUES ($a, $b); - )"; + NYdb::TParamsBuilder paramsBuilder; - ui64 a = UniformDistGen(Gen); - ui64 b = Gen(); + ss << "--!syntax_v1\n"; + ss << "DECLARE $c0 AS Uint64;\n"; + paramsBuilder.AddParam("$c0").Uint64(KeyUniformDistGen(Gen)).Build(); - NYdb::TParamsBuilder paramsBuilder; - auto params = paramsBuilder - .AddParam("$a") - .Uint64(a) - .Build() - .AddParam("$b") - .Uint64(b) - .Build() - .Build(); + for (size_t col = 1; col < Params.ColumnsCnt; ++col) { + ss << "DECLARE $c" << col << " AS String;\n"; + paramsBuilder.AddParam("$c" + std::to_string(col)).String(BigString).Build(); + } + + ss << "UPSERT INTO `kv_test`("; + + for (size_t col = 0; col < Params.ColumnsCnt; ++col) { + ss << "c" << col; + if (col + 1 < Params.ColumnsCnt) { + ss << ", "; + } + } + + ss << ") VALUES ("; + + for (size_t col = 0; col < Params.ColumnsCnt; ++col) { + ss << "$c" << col; + if (col + 1 < Params.ColumnsCnt) { + ss << ", "; + } + } + + ss << ")"; - return TQueryInfoList(1, TQueryInfo(query, std::move(params))); + auto params = paramsBuilder.Build(); + + return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); } TQueryInfoList TKvWorkloadGenerator::SelectRandom() { - std::string query = R"( - --!syntax_v1 - DECLARE $a AS Uint64; + std::stringstream ss; + + ss << "--!syntax_v1\n"; + ss << "DECLARE $c0 AS Uint64;\n"; + ss << "SELECT "; + for (size_t col = 0; col < Params.ColumnsCnt; ++col) { + ss << "c" << col; + if (col + 1 < Params.ColumnsCnt) { + ss << ","; + } + ss << " "; + } - SELECT * FROM `kv_test` WHERE a = $a - )"; + ss << "FROM `kv_test` WHERE c0 = $c0"; - ui64 a = UniformDistGen(Gen); + ui64 x = KeyUniformDistGen(Gen); NYdb::TParamsBuilder paramsBuilder; auto params = paramsBuilder - .AddParam("$a") - .Uint64(a) + .AddParam("$c0") + .Uint64(x) .Build() .Build(); - return TQueryInfoList(1, TQueryInfo(query, std::move(params))); + return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); } TQueryInfoList TKvWorkloadGenerator::GetInitialData() { @@ -114,7 +136,7 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() { auto queryInfos = UpsertRandom(); res.insert(res.end(), queryInfos.begin(), queryInfos.end()); } - + return res; } diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h index b681d092eb..b5e492cfdd 100644 --- a/ydb/library/workload/kv_workload.h +++ b/ydb/library/workload/kv_workload.h @@ -11,6 +11,8 @@ struct TKvWorkloadParams : public TWorkloadParams { ui64 MinPartitions = 1; ui64 InitRowCount = 1000; ui64 MaxFirstKey = 5000; + ui64 StringLen = 8; + ui64 ColumnsCnt = 2; bool PartitionsByLoad = true; }; @@ -44,17 +46,18 @@ public: private: TQueryInfoList UpsertRandom(); TQueryInfoList SelectRandom(); - + TKvWorkloadGenerator(const TKvWorkloadParams* params); TQueryInfo FillKvData() const; std::string DbPath; TKvWorkloadParams Params; + TString BigString; std::random_device Rd; std::mt19937_64 Gen; - std::uniform_int_distribution<ui64> UniformDistGen; + std::uniform_int_distribution<ui64> KeyUniformDistGen; }; } // namespace NYdbWorkload
\ No newline at end of file diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp index 1b34bc3838..11038e697f 100644 --- a/ydb/library/workload/workload_factory.cpp +++ b/ydb/library/workload/workload_factory.cpp @@ -5,7 +5,7 @@ namespace NYdbWorkload { - std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params) + std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params) { if (!params) { throw yexception() << "Params not specified"; diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp index 1a0efbc07a..c8bff4f05c 100644 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp @@ -19,7 +19,9 @@ TCommandKvInit::TCommandKvInit() , InitRowCount(1000) , MinPartitions(1) , MaxFirstKey(5000) - , PartitionsByLoad(true) + , StringLen(8) + , ColumnsCnt(2) + , PartitionsByLoad(true) {} void TCommandKvInit::Config(TConfig& config) { @@ -35,6 +37,10 @@ void TCommandKvInit::Config(TConfig& config) { .DefaultValue(true).StoreResult(&PartitionsByLoad); config.Opts->AddLongOption("max-first-key", "maximum value of first primary key") .DefaultValue(5000).StoreResult(&MaxFirstKey); + config.Opts->AddLongOption("len", "String len") + .DefaultValue(8).StoreResult(&StringLen); + config.Opts->AddLongOption("cols", "Number of columns") + .DefaultValue(2).StoreResult(&ColumnsCnt); } void TCommandKvInit::Parse(TConfig& config) { @@ -50,34 +56,13 @@ int TCommandKvInit::Run(TConfig& config) { params.MinPartitions = MinPartitions; params.PartitionsByLoad = PartitionsByLoad; params.MaxFirstKey = MaxFirstKey; + params.StringLen = StringLen; + params.ColumnsCnt = ColumnsCnt; NYdbWorkload::TWorkloadFactory factory; auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - auto session = GetSession(); - auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); - ThrowOnError(result); - - auto queryInfoList = workloadGen->GetInitialData(); - for (auto queryInfo : queryInfoList) { - auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); - if (!prepareResult.IsSuccess()) { - Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl - << "Query:\n" << queryInfo.Query << Endl; - return EXIT_FAILURE; - } - - auto dataQuery = prepareResult.GetQuery(); - auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), - std::move(queryInfo.Params)).GetValueSync(); - if (!result.IsSuccess()) { - Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl - << "Query:\n" << queryInfo.Query << Endl; - return EXIT_FAILURE; - } - } - - return EXIT_SUCCESS; + return InitTables(workloadGen); } @@ -102,19 +87,7 @@ int TCommandKvClean::Run(TConfig& config) { NYdbWorkload::TWorkloadFactory factory; auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - auto session = GetSession(); - - auto query = workloadGen->GetCleanDDLQueries(); - TStatus result(EStatus::SUCCESS, NYql::TIssues()); - result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); - - if (!result.IsSuccess()) { - Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl - << "Query:\n" << query << Endl; - return EXIT_FAILURE; - } - - return EXIT_SUCCESS; + return CleanTables(workloadGen); } TCommandKvRun::TCommandKvRun() @@ -134,6 +107,10 @@ void TCommandKvRunUpsertRandom::Config(TConfig& config) { config.Opts->AddLongOption("max-first-key", "maximum value of first primary key") .DefaultValue(5000).StoreResult(&MaxFirstKey); + config.Opts->AddLongOption("len", "String len") + .DefaultValue(8).StoreResult(&StringLen); + config.Opts->AddLongOption("cols", "Number of columns") + .DefaultValue(2).StoreResult(&ColumnsCnt); } void TCommandKvRunUpsertRandom::Parse(TConfig& config) { @@ -146,6 +123,8 @@ int TCommandKvRunUpsertRandom::Run(TConfig& config) { NYdbWorkload::TKvWorkloadParams params; params.DbPath = config.Database; params.MaxFirstKey = MaxFirstKey; + params.StringLen = StringLen; + params.ColumnsCnt = ColumnsCnt; NYdbWorkload::TWorkloadFactory factory; auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); @@ -163,6 +142,8 @@ void TCommandKvRunSelectRandom::Config(TConfig& config) { config.Opts->AddLongOption("max-first-key", "maximum value of first primary key") .DefaultValue(5000).StoreResult(&MaxFirstKey); + config.Opts->AddLongOption("cols", "Number of columns") + .DefaultValue(2).StoreResult(&ColumnsCnt); } void TCommandKvRunSelectRandom::Parse(TConfig& config) { @@ -175,6 +156,7 @@ int TCommandKvRunSelectRandom::Run(TConfig& config) { NYdbWorkload::TKvWorkloadParams params; params.DbPath = config.Database; params.MaxFirstKey = MaxFirstKey; + params.ColumnsCnt = ColumnsCnt; NYdbWorkload::TWorkloadFactory factory; auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h index dc7b4fe01b..b3e5559831 100644 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.h +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h @@ -21,6 +21,8 @@ private: ui64 InitRowCount; ui64 MinPartitions; ui64 MaxFirstKey; + ui64 StringLen; + ui64 ColumnsCnt; bool PartitionsByLoad; }; @@ -46,6 +48,8 @@ public: private: ui64 MaxFirstKey; + ui64 StringLen; + ui64 ColumnsCnt; }; @@ -58,6 +62,7 @@ public: private: ui64 MaxFirstKey; + ui64 ColumnsCnt; }; diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp index 240625924e..dae5ec87af 100644 --- a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp @@ -67,30 +67,7 @@ int TCommandStockInit::Run(TConfig& config) { NYdbWorkload::TWorkloadFactory factory; auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - auto session = GetSession(); - auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); - ThrowOnError(result); - - auto queryInfoList = workloadGen->GetInitialData(); - for (auto queryInfo : queryInfoList) { - auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); - if (!prepareResult.IsSuccess()) { - Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl - << "Query:\n" << queryInfo.Query << Endl; - return EXIT_FAILURE; - } - - auto dataQuery = prepareResult.GetQuery(); - auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), - std::move(queryInfo.Params)).GetValueSync(); - if (!result.IsSuccess()) { - Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl - << "Query:\n" << queryInfo.Query << Endl; - return EXIT_FAILURE; - } - } - - return EXIT_SUCCESS; + return InitTables(workloadGen); } TCommandStockClean::TCommandStockClean() @@ -114,19 +91,7 @@ int TCommandStockClean::Run(TConfig& config) { NYdbWorkload::TWorkloadFactory factory; auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - auto session = GetSession(); - - auto query = workloadGen->GetCleanDDLQueries(); - TStatus result(EStatus::SUCCESS, NYql::TIssues()); - result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); - - if (!result.IsSuccess()) { - Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl - << "Query:\n" << query << Endl; - return EXIT_FAILURE; - } - - return EXIT_SUCCESS; + return CleanTables(workloadGen); } TCommandStockRun::TCommandStockRun() diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 89c31c8406..559fe8171e 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -83,7 +83,7 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) { .SetDatabase(config.Database) .SetBalancingPolicy(EBalancingPolicy::UseAllNodes) .SetCredentialsProviderFactory(config.CredentialsGetter(config)); - + if (config.EnableSsl) { driverConfig.UseSecureConnection(config.CaCerts); } @@ -214,4 +214,47 @@ void TWorkloadCommand::PrintWindowStats(int windowIt) { } } +int TWorkloadCommand::InitTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen) { + auto session = GetSession(); + auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); + ThrowOnError(result); + + auto queryInfoList = workloadGen->GetInitialData(); + for (auto queryInfo : queryInfoList) { + auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); + if (!prepareResult.IsSuccess()) { + Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl + << "Query:\n" << queryInfo.Query << Endl; + return EXIT_FAILURE; + } + + auto dataQuery = prepareResult.GetQuery(); + auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), + std::move(queryInfo.Params)).GetValueSync(); + if (!result.IsSuccess()) { + Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl + << "Query:\n" << queryInfo.Query << Endl; + return EXIT_FAILURE; + } + } + + return EXIT_SUCCESS; +} + +int TWorkloadCommand::CleanTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen) { + auto session = GetSession(); + + auto query = workloadGen->GetCleanDDLQueries(); + TStatus result(EStatus::SUCCESS, NYql::TIssues()); + result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); + + if (!result.IsSuccess()) { + Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl + << "Query:\n" << query << Endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} + } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.h b/ydb/public/lib/ydb_cli/commands/ydb_workload.h index 07221c56a6..6ea3abce25 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.h @@ -66,6 +66,12 @@ protected: std::atomic_uint64_t WindowRetryCount; std::atomic_uint64_t TotalErrors; std::atomic_uint64_t WindowErrors; + +protected: + int InitTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen); + + int CleanTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen); + }; } |