aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-08-30 17:42:47 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-08-30 17:42:47 +0300
commitb39c849cda9decf6c19b135df25f21b5cf0d0d5b (patch)
tree7c66544c9538c58469c92d184b2764432ba004dc
parentf79f740e5b92e2c8a10c9c74383e6b47e63a3ef3 (diff)
downloadydb-b39c849cda9decf6c19b135df25f21b5cf0d0d5b.tar.gz
add MORE workloads for KQP
Adds 2 Workloads for KQP: 1) BigString -> generates kv pairs with big string as second field 2) MultiColumn -> generates multi column table with FetchCnt not null columns
-rw-r--r--ydb/core/blobstorage/testload/test_load_kqp.cpp132
-rw-r--r--ydb/core/protos/blobstorage.proto4
-rw-r--r--ydb/library/workload/kv_workload.cpp116
-rw-r--r--ydb/library/workload/kv_workload.h7
-rw-r--r--ydb/library/workload/workload_factory.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/commands/kv_workload.cpp58
-rw-r--r--ydb/public/lib/ydb_cli/commands/kv_workload.h5
-rw-r--r--ydb/public/lib/ydb_cli/commands/stock_workload.cpp39
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp45
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.h6
10 files changed, 223 insertions, 191 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp
index bb4ad82ff7..1efe502282 100644
--- a/ydb/core/blobstorage/testload/test_load_kqp.cpp
+++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp
@@ -45,7 +45,7 @@ public:
: WindowHist(60000, 2)
, WindowErrors(window_errors)
{
- WindowHist.Add(hist);
+ WindowHist.Add(hist);
}
void Add(const MonitoringData& other) {
@@ -63,7 +63,7 @@ struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerRespo
public:
TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag)
: Data(hist, window_errors)
- , Phase(phase)
+ , Phase(phase)
, WorkerTag(worker_tag) {}
public:
@@ -89,16 +89,16 @@ void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL:
class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {
public:
TKqpLoadWorker(TActorId parent,
- TString working_dir,
- std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,
- ui64 workload_type,
- ui64 parentTag,
+ TString working_dir,
+ std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,
+ ui64 workload_type,
+ ui64 parentTag,
ui64 workerTag,
ui64 durationSeconds,
ui64 windowDuration,
- ui64 windowCount,
+ ui64 windowCount,
NMonitoring::TDynamicCounters::TCounterPtr transactions,
- NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)
+ NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)
: Parent(std::move(parent))
, WorkingDir(std::move(working_dir))
, WorkloadQueryGen(workload_query_gen)
@@ -114,7 +114,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called");
-
+
ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);
ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
@@ -139,7 +139,7 @@ private:
if (Phase < WindowCount) {
SendMonitoringEvent(ctx);
}
-
+
CloseSession(ctx);
Die(ctx);
}
@@ -149,9 +149,9 @@ private:
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
ev->Record.MutableRequest()->SetSessionId(WorkerSession);
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " sending session close query to proxy: " + kqp_proxy.ToString());
ctx.Send(kqp_proxy, ev.Release());
@@ -166,11 +166,11 @@ private:
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
ev->Record.MutableRequest()->SetDatabase(WorkingDir);
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " sending event for session creation to proxy: " << kqp_proxy.ToString());
-
+
Send(kqp_proxy, ev.Release());
}
@@ -182,7 +182,7 @@ private:
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " Session is created: " + WorkerSession);
CreateDataQuery(ctx);
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " Session creation failed: " + ev->Get()->ToString());
}
}
@@ -195,7 +195,7 @@ private:
auto q = std::move(queries.front());
queries.pop_front();
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size());
Transactions->Inc();
@@ -224,13 +224,13 @@ private:
NKikimrMiniKQL::TParams params;
ConvertYdbParamsToMiniKQLParams(query_params, params);
request->Record.MutableRequest()->MutableParameters()->Swap(&params);
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " sending data query to proxy: " + kqp_proxy.ToString());
ctx.Send(kqp_proxy, request.Release());
-
+
}
void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
@@ -243,7 +243,7 @@ private:
TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize());
WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs());
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " data request status: Fail, Issue: " + ev->Get()->ToString());
++WindowErrors;
}
@@ -258,22 +258,22 @@ private:
// monitoring
void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " handle TEvUpdateMonitoring, Phase: " << Phase);
-
+
SendMonitoringEvent(ctx);
if (Phase < WindowCount) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
- << " reschedule TEvUpdateMonitoring, Phase: " << Phase);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " reschedule TEvUpdateMonitoring, Phase: " << Phase);
ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
}
}
private:
-
+
// common
-
+
void SendMonitoringEvent(const TActorContext& ctx) {
auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag);
@@ -319,9 +319,9 @@ public:
}
TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
- const TActorId& parent,
- const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
- ui64 index,
+ const TActorId& parent,
+ const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
+ ui64 index,
ui64 tag)
: Parent(parent)
, Tag(tag)
@@ -345,7 +345,7 @@ public:
for (size_t i = 0; i < WindowCount; ++i) {
Chunk.push_back(std::make_unique<MonitoringData>());
}
-
+
NYdbWorkload::TWorkloadFactory factory;
if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) {
@@ -365,18 +365,20 @@ public:
params.InitRowCount = cmd.GetKv().GetInitRowCount();
params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();
params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey();
+ params.StringLen = cmd.GetKv().GetStringLen();
+ params.ColumnsCnt = cmd.GetKv().GetColumnsCnt();
params.MinPartitions = UniformPartitionsCount;
params.DbPath = WorkingDir;
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
} else {
return;
}
-
+
Y_ASSERT(WorkloadQueryGen.get() != nullptr);
Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds());
// Monitoring initialization
-
+
LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag));
Transactions = LoadCounters->GetCounter("Transactions", true);
TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true);
@@ -389,7 +391,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called");
Become(&TKqpWriterTestLoadActor::StateStart);
-
+
if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {
NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {"
@@ -407,6 +409,8 @@ public:
<< "PartitionsByLoad: " << params->PartitionsByLoad << " "
<< "MaxFirstKey: " << params->MaxFirstKey << " "
<< "MinPartitions: " << params->MinPartitions << " "
+ << "StringLen: " << params->StringLen << " "
+ << "ColumnsCnt: " << params->ColumnsCnt << " "
<< "DbPath: " << params->DbPath);
}
@@ -457,14 +461,14 @@ private:
void DropTables(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables drop");
-
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetDatabase(WorkingDir);
ev->Record.MutableRequest()->SetSessionId(TableSession);
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetCleanDDLQueries());
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending drop tables query to proxy: " + kqp_proxy.ToString());
@@ -483,12 +487,12 @@ private:
DeathReport(ctx);
}
- void DeathReport(const TActorContext& ctx) {
+ void DeathReport(const TActorContext& ctx) {
CloseSession(ctx);
TIntrusivePtr<TLoadReport> Report(new TLoadReport());
Report->Duration = TDuration::Seconds(DurationSeconds);
-
+
ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess"));
Die(ctx);
}
@@ -498,8 +502,8 @@ private:
// monitoring
void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) {
const auto& response = ev->Get();
-
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag
+
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag
<< " Phase: " << response->Phase
<< " Min: " << response->Data.WindowHist.GetMin()
<< " Max: " << response->Data.WindowHist.GetMax()
@@ -517,7 +521,7 @@ private:
void SendNewRowToParent(const TActorContext& ctx) {
Phase += 1;
- LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
<< " total: Phase: " << Phase << " -> "
<< Total->WindowHist.GetTotalCount() << " | "
<< Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | "
@@ -527,7 +531,7 @@ private:
<< Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0) << " | "
<< Total->WindowHist.GetMax() / (WindowDuration * 1000.0)
);
-
+
if (Phase >= WindowCount) {
StartDeathProcess(ctx);
}
@@ -542,10 +546,10 @@ private:
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
ev->Record.MutableRequest()->SetDatabase(WorkingDir);
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending event for session creation to proxy: " << kqp_proxy.ToString());
-
+
Send(kqp_proxy, ev.Release());
}
@@ -563,14 +567,14 @@ private:
void CreateTables(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables creation");
-
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetDatabase(WorkingDir);
ev->Record.MutableRequest()->SetSessionId(TableSession);
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetDDLQueries());
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending ddl query to proxy: " + kqp_proxy.ToString());
@@ -580,14 +584,14 @@ private:
void HandleCreateTableResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
auto& response = ev->Get()->Record.GetRef();
- Become(&TKqpWriterTestLoadActor::StateMain);
-
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created");
+ Become(&TKqpWriterTestLoadActor::StateMain);
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created");
InitData = WorkloadQueryGen->GetInitialData();
InsertInitData(ctx);
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString());
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString());
+ CreateTables(ctx);
}
}
@@ -604,7 +608,7 @@ private:
auto q = std::move(InitData.front());
InitData.pop_front();
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
<< " Creating request for init query, need to exec: " << InitData.size() + 1);
TString query_text = TString(q.Query);
@@ -631,7 +635,7 @@ private:
NKikimrMiniKQL::TParams params;
ConvertYdbParamsToMiniKQLParams(query_params, params);
request->Record.MutableRequest()->MutableParameters()->Swap(&params);
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
<< " sending init query to proxy: " + kqp_proxy.ToString());
@@ -699,7 +703,7 @@ private:
TABLED() { str << Total->WindowHist.GetMax() / (WindowDuration * 1000.0); };
}
for (size_t i = Phase; i >= 1; --i) {
- TABLER() {
+ TABLER() {
TABLED() { str << i; };
TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); };
TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); };
@@ -727,16 +731,16 @@ private:
void InitWorkers(const TActorContext& ctx) {
for (ui64 i = 0; i < NumOfSessions; ++i) {
auto* worker = new TKqpLoadWorker(
- SelfId(),
- WorkingDir,
- WorkloadQueryGen,
- WorkloadType,
- Tag,
- i,
- DurationSeconds,
- WindowDuration,
- WindowCount,
- Transactions,
+ SelfId(),
+ WorkingDir,
+ WorkloadQueryGen,
+ WorkloadType,
+ Tag,
+ i,
+ DurationSeconds,
+ WindowDuration,
+ WindowCount,
+ Transactions,
TransactionsBytesWritten);
Workers.push_back(ctx.Register(worker));
}
@@ -747,7 +751,7 @@ private:
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
ev->Record.MutableRequest()->SetSessionId(TableSession);
-
+
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending session close query to proxy: " + kqp_proxy.ToString());
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 72f4f32e17..4209b1dd2f 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1544,13 +1544,15 @@ message TEvTestLoadRequest {
optional uint64 ProductCount = 1 [default = 100];
optional uint64 Quantity = 2 [default = 1000];
optional uint64 OrderCount = 3 [default = 100];
- optional uint64 Limit = 4 [default = 10];
+ optional uint64 Limit = 4 [default = 10];
optional bool PartitionsByLoad = 5 [default = true];
}
message TKvWorkload {
optional uint64 InitRowCount = 1 [default = 1000];
optional bool PartitionsByLoad = 2 [default = true];
optional uint64 MaxFirstKey = 3 [default = 5000];
+ optional uint64 StringLen = 4 [default = 8];
+ optional uint64 ColumnsCnt = 5 [default = 2];
}
optional uint64 Tag = 1;
optional uint32 DurationSeconds = 2;
diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp
index b36b8cd2f3..49786e1b92 100644
--- a/ydb/library/workload/kv_workload.cpp
+++ b/ydb/library/workload/kv_workload.cpp
@@ -2,20 +2,24 @@
#include <util/datetime/base.h>
+#include <ydb/core/util/lz4_data_generator.h>
+
#include <cmath>
#include <iomanip>
#include <string>
#include <thread>
#include <random>
+#include <sstream>
namespace NYdbWorkload {
TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
: DbPath(params->DbPath)
, Params(*params)
+ , BigString(NKikimr::GenDataForLZ4(Params.StringLen))
, Rd()
, Gen(Rd())
- , UniformDistGen(0, Params.MaxFirstKey)
+ , KeyUniformDistGen(0, Params.MaxFirstKey)
{
Gen.seed(Now().MicroSeconds());
}
@@ -26,30 +30,26 @@ TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {
std::string TKvWorkloadGenerator::GetDDLQueries() const {
std::string partsNum = std::to_string(Params.MinPartitions);
- std::string KvPartitionsDdl = "";
- if (Params.PartitionsByLoad) {
- KvPartitionsDdl = "WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " +
- partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)";
- } else {
- KvPartitionsDdl = "WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " +
- partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)";
- }
+ std::stringstream ss;
- static const char TablesDdl[] = R"(--!syntax_v1
- CREATE TABLE `%s/kv_test`(a Uint64, b Uint64, PRIMARY KEY(a, b)) %s;
- )";
+ ss << "--!syntax_v1\n";
+ ss << "CREATE TABLE `" << DbPath << "/kv_test`(c0 Uint64, ";
- char buf[sizeof(TablesDdl) + sizeof(KvPartitionsDdl) + 8192*3]; // 32*256 for DbPath
- int res = std::sprintf(buf, TablesDdl,
- DbPath.c_str(), KvPartitionsDdl.c_str()
- );
+ for (size_t i = 1; i < Params.ColumnsCnt; ++i) {
+ ss << "c" << i << " " << "String, ";
+ }
+
+ ss << "PRIMARY KEY(c0)) WITH (";
- if (res < 0) {
- return "";
+ if (Params.PartitionsByLoad) {
+ ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
}
- return buf;
+ ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << partsNum << ", "
+ << "UNIFORM_PARTITIONS = " << partsNum << ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)";
+
+ return ss.str();
}
TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {
@@ -64,48 +64,70 @@ TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {
}
TQueryInfoList TKvWorkloadGenerator::UpsertRandom() {
- std::string query = R"(
- --!syntax_v1
- DECLARE $a AS Uint64;
- DECLARE $b AS Uint64;
+ std::stringstream ss;
- UPSERT INTO `kv_test` (a, b) VALUES ($a, $b);
- )";
+ NYdb::TParamsBuilder paramsBuilder;
- ui64 a = UniformDistGen(Gen);
- ui64 b = Gen();
+ ss << "--!syntax_v1\n";
+ ss << "DECLARE $c0 AS Uint64;\n";
+ paramsBuilder.AddParam("$c0").Uint64(KeyUniformDistGen(Gen)).Build();
- NYdb::TParamsBuilder paramsBuilder;
- auto params = paramsBuilder
- .AddParam("$a")
- .Uint64(a)
- .Build()
- .AddParam("$b")
- .Uint64(b)
- .Build()
- .Build();
+ for (size_t col = 1; col < Params.ColumnsCnt; ++col) {
+ ss << "DECLARE $c" << col << " AS String;\n";
+ paramsBuilder.AddParam("$c" + std::to_string(col)).String(BigString).Build();
+ }
+
+ ss << "UPSERT INTO `kv_test`(";
+
+ for (size_t col = 0; col < Params.ColumnsCnt; ++col) {
+ ss << "c" << col;
+ if (col + 1 < Params.ColumnsCnt) {
+ ss << ", ";
+ }
+ }
+
+ ss << ") VALUES (";
+
+ for (size_t col = 0; col < Params.ColumnsCnt; ++col) {
+ ss << "$c" << col;
+ if (col + 1 < Params.ColumnsCnt) {
+ ss << ", ";
+ }
+ }
+
+ ss << ")";
- return TQueryInfoList(1, TQueryInfo(query, std::move(params)));
+ auto params = paramsBuilder.Build();
+
+ return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
}
TQueryInfoList TKvWorkloadGenerator::SelectRandom() {
- std::string query = R"(
- --!syntax_v1
- DECLARE $a AS Uint64;
+ std::stringstream ss;
+
+ ss << "--!syntax_v1\n";
+ ss << "DECLARE $c0 AS Uint64;\n";
+ ss << "SELECT ";
+ for (size_t col = 0; col < Params.ColumnsCnt; ++col) {
+ ss << "c" << col;
+ if (col + 1 < Params.ColumnsCnt) {
+ ss << ",";
+ }
+ ss << " ";
+ }
- SELECT * FROM `kv_test` WHERE a = $a
- )";
+ ss << "FROM `kv_test` WHERE c0 = $c0";
- ui64 a = UniformDistGen(Gen);
+ ui64 x = KeyUniformDistGen(Gen);
NYdb::TParamsBuilder paramsBuilder;
auto params = paramsBuilder
- .AddParam("$a")
- .Uint64(a)
+ .AddParam("$c0")
+ .Uint64(x)
.Build()
.Build();
- return TQueryInfoList(1, TQueryInfo(query, std::move(params)));
+ return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
}
TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
@@ -114,7 +136,7 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
auto queryInfos = UpsertRandom();
res.insert(res.end(), queryInfos.begin(), queryInfos.end());
}
-
+
return res;
}
diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h
index b681d092eb..b5e492cfdd 100644
--- a/ydb/library/workload/kv_workload.h
+++ b/ydb/library/workload/kv_workload.h
@@ -11,6 +11,8 @@ struct TKvWorkloadParams : public TWorkloadParams {
ui64 MinPartitions = 1;
ui64 InitRowCount = 1000;
ui64 MaxFirstKey = 5000;
+ ui64 StringLen = 8;
+ ui64 ColumnsCnt = 2;
bool PartitionsByLoad = true;
};
@@ -44,17 +46,18 @@ public:
private:
TQueryInfoList UpsertRandom();
TQueryInfoList SelectRandom();
-
+
TKvWorkloadGenerator(const TKvWorkloadParams* params);
TQueryInfo FillKvData() const;
std::string DbPath;
TKvWorkloadParams Params;
+ TString BigString;
std::random_device Rd;
std::mt19937_64 Gen;
- std::uniform_int_distribution<ui64> UniformDistGen;
+ std::uniform_int_distribution<ui64> KeyUniformDistGen;
};
} // namespace NYdbWorkload \ No newline at end of file
diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp
index 1b34bc3838..11038e697f 100644
--- a/ydb/library/workload/workload_factory.cpp
+++ b/ydb/library/workload/workload_factory.cpp
@@ -5,7 +5,7 @@
namespace NYdbWorkload {
- std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params)
+ std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params)
{
if (!params) {
throw yexception() << "Params not specified";
diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
index 1a0efbc07a..c8bff4f05c 100644
--- a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp
@@ -19,7 +19,9 @@ TCommandKvInit::TCommandKvInit()
, InitRowCount(1000)
, MinPartitions(1)
, MaxFirstKey(5000)
- , PartitionsByLoad(true)
+ , StringLen(8)
+ , ColumnsCnt(2)
+ , PartitionsByLoad(true)
{}
void TCommandKvInit::Config(TConfig& config) {
@@ -35,6 +37,10 @@ void TCommandKvInit::Config(TConfig& config) {
.DefaultValue(true).StoreResult(&PartitionsByLoad);
config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")
.DefaultValue(5000).StoreResult(&MaxFirstKey);
+ config.Opts->AddLongOption("len", "String len")
+ .DefaultValue(8).StoreResult(&StringLen);
+ config.Opts->AddLongOption("cols", "Number of columns")
+ .DefaultValue(2).StoreResult(&ColumnsCnt);
}
void TCommandKvInit::Parse(TConfig& config) {
@@ -50,34 +56,13 @@ int TCommandKvInit::Run(TConfig& config) {
params.MinPartitions = MinPartitions;
params.PartitionsByLoad = PartitionsByLoad;
params.MaxFirstKey = MaxFirstKey;
+ params.StringLen = StringLen;
+ params.ColumnsCnt = ColumnsCnt;
NYdbWorkload::TWorkloadFactory factory;
auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
- auto session = GetSession();
- auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync();
- ThrowOnError(result);
-
- auto queryInfoList = workloadGen->GetInitialData();
- for (auto queryInfo : queryInfoList) {
- auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync();
- if (!prepareResult.IsSuccess()) {
- Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl
- << "Query:\n" << queryInfo.Query << Endl;
- return EXIT_FAILURE;
- }
-
- auto dataQuery = prepareResult.GetQuery();
- auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
- std::move(queryInfo.Params)).GetValueSync();
- if (!result.IsSuccess()) {
- Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
- << "Query:\n" << queryInfo.Query << Endl;
- return EXIT_FAILURE;
- }
- }
-
- return EXIT_SUCCESS;
+ return InitTables(workloadGen);
}
@@ -102,19 +87,7 @@ int TCommandKvClean::Run(TConfig& config) {
NYdbWorkload::TWorkloadFactory factory;
auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
- auto session = GetSession();
-
- auto query = workloadGen->GetCleanDDLQueries();
- TStatus result(EStatus::SUCCESS, NYql::TIssues());
- result = session.ExecuteSchemeQuery(TString(query)).GetValueSync();
-
- if (!result.IsSuccess()) {
- Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
- << "Query:\n" << query << Endl;
- return EXIT_FAILURE;
- }
-
- return EXIT_SUCCESS;
+ return CleanTables(workloadGen);
}
TCommandKvRun::TCommandKvRun()
@@ -134,6 +107,10 @@ void TCommandKvRunUpsertRandom::Config(TConfig& config) {
config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")
.DefaultValue(5000).StoreResult(&MaxFirstKey);
+ config.Opts->AddLongOption("len", "String len")
+ .DefaultValue(8).StoreResult(&StringLen);
+ config.Opts->AddLongOption("cols", "Number of columns")
+ .DefaultValue(2).StoreResult(&ColumnsCnt);
}
void TCommandKvRunUpsertRandom::Parse(TConfig& config) {
@@ -146,6 +123,8 @@ int TCommandKvRunUpsertRandom::Run(TConfig& config) {
NYdbWorkload::TKvWorkloadParams params;
params.DbPath = config.Database;
params.MaxFirstKey = MaxFirstKey;
+ params.StringLen = StringLen;
+ params.ColumnsCnt = ColumnsCnt;
NYdbWorkload::TWorkloadFactory factory;
auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
@@ -163,6 +142,8 @@ void TCommandKvRunSelectRandom::Config(TConfig& config) {
config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")
.DefaultValue(5000).StoreResult(&MaxFirstKey);
+ config.Opts->AddLongOption("cols", "Number of columns")
+ .DefaultValue(2).StoreResult(&ColumnsCnt);
}
void TCommandKvRunSelectRandom::Parse(TConfig& config) {
@@ -175,6 +156,7 @@ int TCommandKvRunSelectRandom::Run(TConfig& config) {
NYdbWorkload::TKvWorkloadParams params;
params.DbPath = config.Database;
params.MaxFirstKey = MaxFirstKey;
+ params.ColumnsCnt = ColumnsCnt;
NYdbWorkload::TWorkloadFactory factory;
auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h
index dc7b4fe01b..b3e5559831 100644
--- a/ydb/public/lib/ydb_cli/commands/kv_workload.h
+++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h
@@ -21,6 +21,8 @@ private:
ui64 InitRowCount;
ui64 MinPartitions;
ui64 MaxFirstKey;
+ ui64 StringLen;
+ ui64 ColumnsCnt;
bool PartitionsByLoad;
};
@@ -46,6 +48,8 @@ public:
private:
ui64 MaxFirstKey;
+ ui64 StringLen;
+ ui64 ColumnsCnt;
};
@@ -58,6 +62,7 @@ public:
private:
ui64 MaxFirstKey;
+ ui64 ColumnsCnt;
};
diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
index 240625924e..dae5ec87af 100644
--- a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
@@ -67,30 +67,7 @@ int TCommandStockInit::Run(TConfig& config) {
NYdbWorkload::TWorkloadFactory factory;
auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
- auto session = GetSession();
- auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync();
- ThrowOnError(result);
-
- auto queryInfoList = workloadGen->GetInitialData();
- for (auto queryInfo : queryInfoList) {
- auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync();
- if (!prepareResult.IsSuccess()) {
- Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl
- << "Query:\n" << queryInfo.Query << Endl;
- return EXIT_FAILURE;
- }
-
- auto dataQuery = prepareResult.GetQuery();
- auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
- std::move(queryInfo.Params)).GetValueSync();
- if (!result.IsSuccess()) {
- Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
- << "Query:\n" << queryInfo.Query << Endl;
- return EXIT_FAILURE;
- }
- }
-
- return EXIT_SUCCESS;
+ return InitTables(workloadGen);
}
TCommandStockClean::TCommandStockClean()
@@ -114,19 +91,7 @@ int TCommandStockClean::Run(TConfig& config) {
NYdbWorkload::TWorkloadFactory factory;
auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
- auto session = GetSession();
-
- auto query = workloadGen->GetCleanDDLQueries();
- TStatus result(EStatus::SUCCESS, NYql::TIssues());
- result = session.ExecuteSchemeQuery(TString(query)).GetValueSync();
-
- if (!result.IsSuccess()) {
- Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
- << "Query:\n" << query << Endl;
- return EXIT_FAILURE;
- }
-
- return EXIT_SUCCESS;
+ return CleanTables(workloadGen);
}
TCommandStockRun::TCommandStockRun()
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index 89c31c8406..559fe8171e 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -83,7 +83,7 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) {
.SetDatabase(config.Database)
.SetBalancingPolicy(EBalancingPolicy::UseAllNodes)
.SetCredentialsProviderFactory(config.CredentialsGetter(config));
-
+
if (config.EnableSsl) {
driverConfig.UseSecureConnection(config.CaCerts);
}
@@ -214,4 +214,47 @@ void TWorkloadCommand::PrintWindowStats(int windowIt) {
}
}
+int TWorkloadCommand::InitTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen) {
+ auto session = GetSession();
+ auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync();
+ ThrowOnError(result);
+
+ auto queryInfoList = workloadGen->GetInitialData();
+ for (auto queryInfo : queryInfoList) {
+ auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync();
+ if (!prepareResult.IsSuccess()) {
+ Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl
+ << "Query:\n" << queryInfo.Query << Endl;
+ return EXIT_FAILURE;
+ }
+
+ auto dataQuery = prepareResult.GetQuery();
+ auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
+ std::move(queryInfo.Params)).GetValueSync();
+ if (!result.IsSuccess()) {
+ Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
+ << "Query:\n" << queryInfo.Query << Endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ return EXIT_SUCCESS;
+}
+
+int TWorkloadCommand::CleanTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen) {
+ auto session = GetSession();
+
+ auto query = workloadGen->GetCleanDDLQueries();
+ TStatus result(EStatus::SUCCESS, NYql::TIssues());
+ result = session.ExecuteSchemeQuery(TString(query)).GetValueSync();
+
+ if (!result.IsSuccess()) {
+ Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
+ << "Query:\n" << query << Endl;
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
+
} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.h b/ydb/public/lib/ydb_cli/commands/ydb_workload.h
index 07221c56a6..6ea3abce25 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.h
@@ -66,6 +66,12 @@ protected:
std::atomic_uint64_t WindowRetryCount;
std::atomic_uint64_t TotalErrors;
std::atomic_uint64_t WindowErrors;
+
+protected:
+ int InitTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen);
+
+ int CleanTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen);
+
};
}