diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-16 14:44:28 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-16 14:44:28 +0300 |
commit | a40a42963c070dac90d92674b640a3e2a13c4ed1 (patch) | |
tree | d3f931da880e913e1760732396ac057d9a5671ec | |
parent | 878e605686b4bf9b99124dde8df0fd048a8098b5 (diff) | |
download | ydb-a40a42963c070dac90d92674b640a3e2a13c4ed1.tar.gz |
Add IncreaseSessions mode to KqpLoad
-rw-r--r-- | ydb/core/load_test/kqp.cpp | 192 | ||||
-rw-r--r-- | ydb/core/protos/load_test.proto | 1 |
2 files changed, 110 insertions, 83 deletions
diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp index 4a5a5475da..f705d427ce 100644 --- a/ydb/core/load_test/kqp.cpp +++ b/ydb/core/load_test/kqp.cpp @@ -35,6 +35,30 @@ enum { EvKqpWorkerResponse }; +struct MonitoringData { +public: + MonitoringData() + : WindowHist(60000, 2) + , WindowErrors(0) {} + + MonitoringData(const NHdr::THistogram& hist, ui64 window_errors) + : WindowHist(60000, 2) + , WindowErrors(window_errors) + { + WindowHist.Add(hist); + } + + void Add(const MonitoringData& other) { + WindowHist.Add(other.WindowHist); + WindowErrors += other.WindowErrors; + } + +public: + NHdr::THistogram WindowHist; + ui64 WindowErrors; + +}; + void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) { TString query_text = TString(q.Query); auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); @@ -59,31 +83,7 @@ void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, con auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); ctx.Send(kqp_proxy, request.Release()); -}; - -struct MonitoringData { -public: - MonitoringData() - : WindowHist(60000, 2) - , WindowErrors(0) {} - - MonitoringData(const NHdr::THistogram& hist, ui64 window_errors) - : WindowHist(60000, 2) - , WindowErrors(window_errors) - { - WindowHist.Add(hist); - } - - void Add(const MonitoringData& other) { - WindowHist.Add(other.WindowHist); - WindowErrors += other.WindowErrors; - } - -public: - NHdr::THistogram WindowHist; - ui64 WindowErrors; - -}; +} struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerResponse> { public: @@ -96,39 +96,39 @@ public: MonitoringData Data; ui64 Phase; ui64 WorkerTag; - }; class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> { public: TKqpLoadWorker(TActorId parent, - TString working_dir, - std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen, - ui64 workload_type, - ui64 parentTag, - ui64 workerTag, - ui64 durationSeconds, - ui64 windowDuration, - ui64 windowCount, - NMonitoring::TDynamicCounters::TCounterPtr transactions, - NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten) + TString working_dir, + std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen, + ui64 workload_type, + ui64 parentTag, + ui64 workerTag, + TInstant endTimestamp, + ui64 windowDuration, + ui64 windowCount, + NMonitoring::TDynamicCounters::TCounterPtr transactions, + NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten) : Parent(std::move(parent)) , WorkingDir(std::move(working_dir)) , WorkloadQueryGen(workload_query_gen) , WorkloadType(workload_type) , ParentTag(parentTag) , WorkerTag(workerTag) - , DurationSeconds(durationSeconds) + , EndTimestamp(endTimestamp) , WindowHist(60000, 2) , WindowDuration(windowDuration) , WindowCount(windowCount) , Transactions(transactions) - , TransactionsBytesWritten(transactionsBytesWritten) {} + , TransactionsBytesWritten(transactionsBytesWritten) + {} void Bootstrap(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called"); - ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); + ctx.Schedule(EndTimestamp, new TEvents::TEvPoisonPill); ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); Become(&TKqpLoadWorker::StateFunc); @@ -137,8 +137,8 @@ public: STRICT_STFUNC(StateFunc, CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) - HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleResponse) - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse) + HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle) + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle) HFunc(TEvUpdateMonitoring, HandleWindowTimer) ) @@ -183,7 +183,7 @@ private: Send(kqp_proxy, ev.Release()); } - void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { auto& response = ev->Get()->Record; if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { @@ -197,24 +197,26 @@ private: } void CreateDataQuery(const TActorContext& ctx) { - if (queries.empty()) { - queries = WorkloadQueryGen->GetWorkload(WorkloadType); + if (Queries.empty()) { + Queries = WorkloadQueryGen->GetWorkload(WorkloadType); } - Y_VERIFY(!queries.empty()); - auto q = std::move(queries.front()); - queries.pop_front(); + Y_VERIFY(!Queries.empty()); + auto q = std::move(Queries.front()); + Queries.pop_front(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag - << " session: " << WorkerSession << " query type: " << WorkloadType - << ", params size: " << q.Params.GetValues().size()); + << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size()); Transactions->Inc(); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession); + SendQueryRequest(ctx, q, WorkerSession, WorkingDir); } - void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { auto& response = ev->Get()->Record.GetRef(); Transactions->Dec(); @@ -273,12 +275,12 @@ private: ui64 ParentTag; ui64 WorkerTag; - NYdbWorkload::TQueryInfoList queries; + NYdbWorkload::TQueryInfoList Queries; TString WorkerSession = "wrong sessionId"; - ui64 DurationSeconds = 1; private: + TInstant EndTimestamp; // for monitoring NHdr::THistogram WindowHist; ui64 WindowErrors = 0; @@ -298,9 +300,8 @@ public: return NKikimrServices::TActivity::KQP_TEST_WORKLOAD; } - TKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoad& cmd, - const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, - ui64 index, ui64 tag) + TKqpLoadActor(const NKikimr::TEvLoadTestRequest::TKqpLoad& cmd, const TActorId& parent, + const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) : Parent(parent) , Tag(tag) { @@ -317,6 +318,7 @@ public: WindowDuration = cmd.GetWindowDuration(); WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration; NumOfSessions = cmd.GetNumOfSessions(); + IncreaseSessions = cmd.GetIncreaseSessions(); AnswersReceived.resize(WindowCount); Chunk.reserve(WindowCount); Total = std::make_unique<MonitoringData>(); @@ -369,6 +371,7 @@ public: void Bootstrap(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpLoadActor Bootstrap called"); + Become(&TKqpLoadActor::StateStart); if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) { @@ -400,15 +403,30 @@ public: CreateSessionForTablesDDL(ctx); } + void HandleWakeup(const TActorContext& ctx) { + size_t targetSessions; + if (IncreaseSessions) { + targetSessions = 1 + NumOfSessions * (TInstant::Now() - Start).Seconds() / DurationSeconds; + targetSessions = std::min(targetSessions, NumOfSessions); + } else { + targetSessions = NumOfSessions; + } + while (Workers.size() < targetSessions) { + AppendWorker(ctx); + } + ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); + } + STRICT_STFUNC(StateStart, CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse) + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle) HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleCreateTableResponse) HFunc(NMon::TEvHttpInfo, HandleHTML) ) STRICT_STFUNC(StateMain, CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) + CFunc(TEvents::TSystem::Wakeup, HandleWakeup) HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDataQueryResponse) HFunc(TEvKqpWorkerResponse, HandleMonitoring) HFunc(NMon::TEvHttpInfo, HandleHTML) @@ -508,10 +526,10 @@ private: << " Count: " << response->Data.WindowHist.GetTotalCount()); Chunk[response->Phase]->Add(response->Data); + Total->Add(response->Data); AnswersReceived[response->Phase] += 1; if (AnswersReceived[Phase] == NumOfSessions) { - Total->Add(*Chunk[Phase]); UpdatePhase(ctx); } } @@ -519,13 +537,14 @@ private: void UpdatePhase(const TActorContext& ctx) { Phase += 1; + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " total: Phase: " << Phase); + if (Phase >= WindowCount) { StartDeathProcess(ctx); } } private: - // creating tables void CreateSessionForTablesDDL(const TActorContext& ctx) { @@ -539,7 +558,7 @@ private: Send(kqp_proxy, ev.Release()); } - void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { auto& response = ev->Get()->Record; if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { @@ -585,11 +604,6 @@ private: // table initialization void InsertInitData(const TActorContext& ctx) { - if (InitData.empty()) { - InitWorkers(ctx); - return; - } - Y_VERIFY(!InitData.empty()); auto q = std::move(InitData.front()); InitData.pop_front(); @@ -606,10 +620,21 @@ private: if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: SUCCESS"); } else { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: FAIL, reason: " + ev->Get()->ToString()); + LOG_ERROR_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: FAIL, reason: " + ev->Get()->ToString()); } - InsertInitData(ctx); + if (InitData.empty()) { + Start = TInstant::Now(); + if (IncreaseSessions) { + ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); + } else { + for (ui64 i = 0; i < NumOfSessions; ++i) { + AppendWorker(ctx); + } + } + } else { + InsertInitData(ctx); + } } private: @@ -687,22 +712,20 @@ private: // common - void InitWorkers(const TActorContext& ctx) { - for (ui64 i = 0; i < NumOfSessions; ++i) { - auto* worker = new TKqpLoadWorker( - SelfId(), - WorkingDir, - WorkloadQueryGen, - WorkloadType, - Tag, - i, - DurationSeconds, - WindowDuration, - WindowCount, - Transactions, - TransactionsBytesWritten); - Workers.push_back(ctx.Register(worker)); - } + void AppendWorker(const TActorContext& ctx) { + auto* worker = new TKqpLoadWorker( + SelfId(), + WorkingDir, + WorkloadQueryGen, + WorkloadType, + Tag, + CurrentSessions++, + Start + TDuration::Seconds(DurationSeconds), + WindowDuration, + WindowCount, + Transactions, + TransactionsBytesWritten); + Workers.push_back(ctx.Register(worker)); } void CloseSession(const TActorContext& ctx) { @@ -717,6 +740,7 @@ private: } private: + TInstant Start; TString TableSession = "wrong sessionId"; TString WorkingDir; ui64 WorkloadType; @@ -726,7 +750,9 @@ private: TString ConfigString; ui64 UniformPartitionsCount; bool DeleteTableOnFinish; - ui32 NumOfSessions; + ui32 CurrentSessions = 0; + size_t NumOfSessions = 0; + bool IncreaseSessions = false; NYdbWorkload::EWorkload WorkloadClass; NYdbWorkload::TQueryInfoList InitData; diff --git a/ydb/core/protos/load_test.proto b/ydb/core/protos/load_test.proto index e01cdd3b0c..0ab718b3f2 100644 --- a/ydb/core/protos/load_test.proto +++ b/ydb/core/protos/load_test.proto @@ -212,6 +212,7 @@ message TEvLoadTestRequest { optional uint32 WindowDuration = 3; optional string WorkingDir = 4; optional uint32 NumOfSessions = 5; + optional bool IncreaseSessions = 11; optional bool DeleteTableOnFinish = 6; optional uint32 UniformPartitionsCount = 7; optional uint32 WorkloadType = 8; |