aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-12-16 14:44:28 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-12-16 14:44:28 +0300
commita40a42963c070dac90d92674b640a3e2a13c4ed1 (patch)
treed3f931da880e913e1760732396ac057d9a5671ec
parent878e605686b4bf9b99124dde8df0fd048a8098b5 (diff)
downloadydb-a40a42963c070dac90d92674b640a3e2a13c4ed1.tar.gz
Add IncreaseSessions mode to KqpLoad
-rw-r--r--ydb/core/load_test/kqp.cpp192
-rw-r--r--ydb/core/protos/load_test.proto1
2 files changed, 110 insertions, 83 deletions
diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp
index 4a5a5475da..f705d427ce 100644
--- a/ydb/core/load_test/kqp.cpp
+++ b/ydb/core/load_test/kqp.cpp
@@ -35,6 +35,30 @@ enum {
EvKqpWorkerResponse
};
+struct MonitoringData {
+public:
+ MonitoringData()
+ : WindowHist(60000, 2)
+ , WindowErrors(0) {}
+
+ MonitoringData(const NHdr::THistogram& hist, ui64 window_errors)
+ : WindowHist(60000, 2)
+ , WindowErrors(window_errors)
+ {
+ WindowHist.Add(hist);
+ }
+
+ void Add(const MonitoringData& other) {
+ WindowHist.Add(other.WindowHist);
+ WindowErrors += other.WindowErrors;
+ }
+
+public:
+ NHdr::THistogram WindowHist;
+ ui64 WindowErrors;
+
+};
+
void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) {
TString query_text = TString(q.Query);
auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -59,31 +83,7 @@ void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, con
auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
ctx.Send(kqp_proxy, request.Release());
-};
-
-struct MonitoringData {
-public:
- MonitoringData()
- : WindowHist(60000, 2)
- , WindowErrors(0) {}
-
- MonitoringData(const NHdr::THistogram& hist, ui64 window_errors)
- : WindowHist(60000, 2)
- , WindowErrors(window_errors)
- {
- WindowHist.Add(hist);
- }
-
- void Add(const MonitoringData& other) {
- WindowHist.Add(other.WindowHist);
- WindowErrors += other.WindowErrors;
- }
-
-public:
- NHdr::THistogram WindowHist;
- ui64 WindowErrors;
-
-};
+}
struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerResponse> {
public:
@@ -96,39 +96,39 @@ public:
MonitoringData Data;
ui64 Phase;
ui64 WorkerTag;
-
};
class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {
public:
TKqpLoadWorker(TActorId parent,
- TString working_dir,
- std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,
- ui64 workload_type,
- ui64 parentTag,
- ui64 workerTag,
- ui64 durationSeconds,
- ui64 windowDuration,
- ui64 windowCount,
- NMonitoring::TDynamicCounters::TCounterPtr transactions,
- NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)
+ TString working_dir,
+ std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,
+ ui64 workload_type,
+ ui64 parentTag,
+ ui64 workerTag,
+ TInstant endTimestamp,
+ ui64 windowDuration,
+ ui64 windowCount,
+ NMonitoring::TDynamicCounters::TCounterPtr transactions,
+ NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)
: Parent(std::move(parent))
, WorkingDir(std::move(working_dir))
, WorkloadQueryGen(workload_query_gen)
, WorkloadType(workload_type)
, ParentTag(parentTag)
, WorkerTag(workerTag)
- , DurationSeconds(durationSeconds)
+ , EndTimestamp(endTimestamp)
, WindowHist(60000, 2)
, WindowDuration(windowDuration)
, WindowCount(windowCount)
, Transactions(transactions)
- , TransactionsBytesWritten(transactionsBytesWritten) {}
+ , TransactionsBytesWritten(transactionsBytesWritten)
+ {}
void Bootstrap(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called");
- ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);
+ ctx.Schedule(EndTimestamp, new TEvents::TEvPoisonPill);
ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
Become(&TKqpLoadWorker::StateFunc);
@@ -137,8 +137,8 @@ public:
STRICT_STFUNC(StateFunc,
CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
- HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleResponse)
- HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle)
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle)
HFunc(TEvUpdateMonitoring, HandleWindowTimer)
)
@@ -183,7 +183,7 @@ private:
Send(kqp_proxy, ev.Release());
}
- void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
+ void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
auto& response = ev->Get()->Record;
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
@@ -197,24 +197,26 @@ private:
}
void CreateDataQuery(const TActorContext& ctx) {
- if (queries.empty()) {
- queries = WorkloadQueryGen->GetWorkload(WorkloadType);
+ if (Queries.empty()) {
+ Queries = WorkloadQueryGen->GetWorkload(WorkloadType);
}
- Y_VERIFY(!queries.empty());
- auto q = std::move(queries.front());
- queries.pop_front();
+ Y_VERIFY(!Queries.empty());
+ auto q = std::move(Queries.front());
+ Queries.pop_front();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
- << " session: " << WorkerSession << " query type: " << WorkloadType
- << ", params size: " << q.Params.GetValues().size());
+ << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size());
Transactions->Inc();
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession);
+
SendQueryRequest(ctx, q, WorkerSession, WorkingDir);
}
- void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
auto& response = ev->Get()->Record.GetRef();
Transactions->Dec();
@@ -273,12 +275,12 @@ private:
ui64 ParentTag;
ui64 WorkerTag;
- NYdbWorkload::TQueryInfoList queries;
+ NYdbWorkload::TQueryInfoList Queries;
TString WorkerSession = "wrong sessionId";
- ui64 DurationSeconds = 1;
private:
+ TInstant EndTimestamp;
// for monitoring
NHdr::THistogram WindowHist;
ui64 WindowErrors = 0;
@@ -298,9 +300,8 @@ public:
return NKikimrServices::TActivity::KQP_TEST_WORKLOAD;
}
- TKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoad& cmd,
- const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
- ui64 index, ui64 tag)
+ TKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoad& cmd, const TActorId& parent,
+ const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag)
: Parent(parent)
, Tag(tag)
{
@@ -317,6 +318,7 @@ public:
WindowDuration = cmd.GetWindowDuration();
WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration;
NumOfSessions = cmd.GetNumOfSessions();
+ IncreaseSessions = cmd.GetIncreaseSessions();
AnswersReceived.resize(WindowCount);
Chunk.reserve(WindowCount);
Total = std::make_unique<MonitoringData>();
@@ -369,6 +371,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpLoadActor Bootstrap called");
+
Become(&TKqpLoadActor::StateStart);
if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {
@@ -400,15 +403,30 @@ public:
CreateSessionForTablesDDL(ctx);
}
+ void HandleWakeup(const TActorContext& ctx) {
+ size_t targetSessions;
+ if (IncreaseSessions) {
+ targetSessions = 1 + NumOfSessions * (TInstant::Now() - Start).Seconds() / DurationSeconds;
+ targetSessions = std::min(targetSessions, NumOfSessions);
+ } else {
+ targetSessions = NumOfSessions;
+ }
+ while (Workers.size() < targetSessions) {
+ AppendWorker(ctx);
+ }
+ ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
+ }
+
STRICT_STFUNC(StateStart,
CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
- HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse)
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle)
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleCreateTableResponse)
HFunc(NMon::TEvHttpInfo, HandleHTML)
)
STRICT_STFUNC(StateMain,
CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
+ CFunc(TEvents::TSystem::Wakeup, HandleWakeup)
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDataQueryResponse)
HFunc(TEvKqpWorkerResponse, HandleMonitoring)
HFunc(NMon::TEvHttpInfo, HandleHTML)
@@ -508,10 +526,10 @@ private:
<< " Count: " << response->Data.WindowHist.GetTotalCount());
Chunk[response->Phase]->Add(response->Data);
+ Total->Add(response->Data);
AnswersReceived[response->Phase] += 1;
if (AnswersReceived[Phase] == NumOfSessions) {
- Total->Add(*Chunk[Phase]);
UpdatePhase(ctx);
}
}
@@ -519,13 +537,14 @@ private:
void UpdatePhase(const TActorContext& ctx) {
Phase += 1;
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " total: Phase: " << Phase);
+
if (Phase >= WindowCount) {
StartDeathProcess(ctx);
}
}
private:
-
// creating tables
void CreateSessionForTablesDDL(const TActorContext& ctx) {
@@ -539,7 +558,7 @@ private:
Send(kqp_proxy, ev.Release());
}
- void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
+ void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
auto& response = ev->Get()->Record;
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
@@ -585,11 +604,6 @@ private:
// table initialization
void InsertInitData(const TActorContext& ctx) {
- if (InitData.empty()) {
- InitWorkers(ctx);
- return;
- }
-
Y_VERIFY(!InitData.empty());
auto q = std::move(InitData.front());
InitData.pop_front();
@@ -606,10 +620,21 @@ private:
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: SUCCESS");
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: FAIL, reason: " + ev->Get()->ToString());
+ LOG_ERROR_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: FAIL, reason: " + ev->Get()->ToString());
}
- InsertInitData(ctx);
+ if (InitData.empty()) {
+ Start = TInstant::Now();
+ if (IncreaseSessions) {
+ ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
+ } else {
+ for (ui64 i = 0; i < NumOfSessions; ++i) {
+ AppendWorker(ctx);
+ }
+ }
+ } else {
+ InsertInitData(ctx);
+ }
}
private:
@@ -687,22 +712,20 @@ private:
// common
- void InitWorkers(const TActorContext& ctx) {
- for (ui64 i = 0; i < NumOfSessions; ++i) {
- auto* worker = new TKqpLoadWorker(
- SelfId(),
- WorkingDir,
- WorkloadQueryGen,
- WorkloadType,
- Tag,
- i,
- DurationSeconds,
- WindowDuration,
- WindowCount,
- Transactions,
- TransactionsBytesWritten);
- Workers.push_back(ctx.Register(worker));
- }
+ void AppendWorker(const TActorContext& ctx) {
+ auto* worker = new TKqpLoadWorker(
+ SelfId(),
+ WorkingDir,
+ WorkloadQueryGen,
+ WorkloadType,
+ Tag,
+ CurrentSessions++,
+ Start + TDuration::Seconds(DurationSeconds),
+ WindowDuration,
+ WindowCount,
+ Transactions,
+ TransactionsBytesWritten);
+ Workers.push_back(ctx.Register(worker));
}
void CloseSession(const TActorContext& ctx) {
@@ -717,6 +740,7 @@ private:
}
private:
+ TInstant Start;
TString TableSession = "wrong sessionId";
TString WorkingDir;
ui64 WorkloadType;
@@ -726,7 +750,9 @@ private:
TString ConfigString;
ui64 UniformPartitionsCount;
bool DeleteTableOnFinish;
- ui32 NumOfSessions;
+ ui32 CurrentSessions = 0;
+ size_t NumOfSessions = 0;
+ bool IncreaseSessions = false;
NYdbWorkload::EWorkload WorkloadClass;
NYdbWorkload::TQueryInfoList InitData;
diff --git a/ydb/core/protos/load_test.proto b/ydb/core/protos/load_test.proto
index e01cdd3b0c..0ab718b3f2 100644
--- a/ydb/core/protos/load_test.proto
+++ b/ydb/core/protos/load_test.proto
@@ -212,6 +212,7 @@ message TEvLoadTestRequest {
optional uint32 WindowDuration = 3;
optional string WorkingDir = 4;
optional uint32 NumOfSessions = 5;
+ optional bool IncreaseSessions = 11;
optional bool DeleteTableOnFinish = 6;
optional uint32 UniformPartitionsCount = 7;
optional uint32 WorkloadType = 8;