diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-29 13:45:41 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-29 13:45:41 +0300 |
commit | b7f64a4cbbcf1013a148f05b41f7cf46aaa10489 (patch) | |
tree | c67a51df505f8db2b565338cf19faa9c4a53fb8b | |
parent | 49f67ead5717d06751bf522fb78d0e8b2c2494e1 (diff) | |
download | ydb-b7f64a4cbbcf1013a148f05b41f7cf46aaa10489.tar.gz |
Rework Kqp Stats
-rw-r--r-- | ydb/core/load_test/kqp.cpp | 217 |
1 files changed, 53 insertions, 164 deletions
diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp index f705d427ce7..6f36a096e9c 100644 --- a/ydb/core/load_test/kqp.cpp +++ b/ydb/core/load_test/kqp.cpp @@ -38,25 +38,22 @@ enum { struct MonitoringData { public: MonitoringData() - : WindowHist(60000, 2) - , WindowErrors(0) {} + {} MonitoringData(const NHdr::THistogram& hist, ui64 window_errors) - : WindowHist(60000, 2) - , WindowErrors(window_errors) + : Errors(window_errors) { - WindowHist.Add(hist); + LatencyHist.Add(hist); } void Add(const MonitoringData& other) { - WindowHist.Add(other.WindowHist); - WindowErrors += other.WindowErrors; + LatencyHist.Add(other.LatencyHist); + Errors += other.Errors; } public: - NHdr::THistogram WindowHist; - ui64 WindowErrors; - + NHdr::THistogram LatencyHist{60000, 2}; + ui64 Errors = 0; }; void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) { @@ -87,14 +84,12 @@ void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, con struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerResponse> { public: - TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag) - : Data(hist, window_errors) - , Phase(phase) - , WorkerTag(worker_tag) {} + TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 errors, ui64 workerTag) + : Data(hist, errors) + , WorkerTag(workerTag) {} public: MonitoringData Data; - ui64 Phase; ui64 WorkerTag; }; @@ -107,8 +102,6 @@ public: ui64 parentTag, ui64 workerTag, TInstant endTimestamp, - ui64 windowDuration, - ui64 windowCount, NMonitoring::TDynamicCounters::TCounterPtr transactions, NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten) : Parent(std::move(parent)) @@ -118,9 +111,7 @@ public: , ParentTag(parentTag) , WorkerTag(workerTag) , EndTimestamp(endTimestamp) - , WindowHist(60000, 2) - , WindowDuration(windowDuration) - , WindowCount(windowCount) + , LatencyHist(60000, 2) , Transactions(transactions) , TransactionsBytesWritten(transactionsBytesWritten) {} @@ -129,32 +120,21 @@ public: LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called"); ctx.Schedule(EndTimestamp, new TEvents::TEvPoisonPill); - ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); Become(&TKqpLoadWorker::StateFunc); CreateWorkingSession(ctx); } - STRICT_STFUNC(StateFunc, - CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) - HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle) - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle) - HFunc(TEvUpdateMonitoring, HandleWindowTimer) - ) - private: - // death void HandlePoisonPill(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " HandlePoisonPill"); - if (Phase < WindowCount) { - SendMonitoringEvent(ctx); - } + Send(Parent, new TEvKqpWorkerResponse(LatencyHist, Errors, WorkerTag)); CloseSession(ctx); - Die(ctx); + PassAway(); } void CloseSession(const TActorContext& ctx) { @@ -168,8 +148,6 @@ private: ctx.Send(kqp_proxy, ev.Release()); } -private: - // working void CreateWorkingSession(const TActorContext& ctx) { @@ -215,7 +193,6 @@ private: SendQueryRequest(ctx, q, WorkerSession, WorkingDir); } - void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { auto& response = ev->Get()->Record.GetRef(); @@ -224,72 +201,35 @@ private: if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " data request status: Success"); TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize()); - WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs()); + LatencyHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs()); } else { LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " data request status: Fail, Issue: " + ev->Get()->ToString()); - ++WindowErrors; + ++Errors; } - if (Phase < WindowCount) { - CreateDataQuery(ctx); - } - } - -private: - - // monitoring - - void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag - << " handle TEvUpdateMonitoring, Phase: " << Phase); - - SendMonitoringEvent(ctx); - - if (Phase < WindowCount) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag - << " reschedule TEvUpdateMonitoring, Phase: " << Phase); - ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); - } + CreateDataQuery(ctx); } -private: - - // common - - void SendMonitoringEvent(const TActorContext& ctx) { - auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag); - - WindowHist.Reset(); - WindowErrors = 0; - ++Phase; - - ctx.Send(Parent, ev.Release()); - } + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) + HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle) + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle) + ) -private: TActorId Parent; TString WorkingDir; std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen; ui64 WorkloadType; ui64 ParentTag; ui64 WorkerTag; - + TInstant EndTimestamp; NYdbWorkload::TQueryInfoList Queries; - TString WorkerSession = "wrong sessionId"; -private: - TInstant EndTimestamp; - // for monitoring - NHdr::THistogram WindowHist; - ui64 WindowErrors = 0; - - ui64 WindowDuration; - ui64 WindowCount; - - ui64 Phase = 0; - + // monitoring + NHdr::THistogram LatencyHist; + ui64 Errors = 0; NMonitoring::TDynamicCounters::TCounterPtr Transactions; NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten; }; @@ -315,19 +255,11 @@ public: WorkingDir = cmd.GetWorkingDir(); WorkloadType = cmd.GetWorkloadType(); DurationSeconds = cmd.GetDurationSeconds(); - WindowDuration = cmd.GetWindowDuration(); - WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration; NumOfSessions = cmd.GetNumOfSessions(); IncreaseSessions = cmd.GetIncreaseSessions(); - AnswersReceived.resize(WindowCount); - Chunk.reserve(WindowCount); Total = std::make_unique<MonitoringData>(); - for (size_t i = 0; i < WindowCount; ++i) { - Chunk.push_back(std::make_unique<MonitoringData>()); - } NYdbWorkload::TWorkloadFactory factory; - if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kStock) { WorkloadClass = NYdbWorkload::EWorkload::STOCK; NYdbWorkload::TStockWorkloadParams params; @@ -398,7 +330,8 @@ public: } LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); - ctx.Schedule(TDuration::Seconds(DurationSeconds * 2), new TEvents::TEvPoisonPill); + // TODO: report error in case of such death + ctx.Schedule(TDuration::Seconds(DurationSeconds + 10), new TEvents::TEvPoisonPill); CreateSessionForTablesDDL(ctx); } @@ -428,7 +361,7 @@ public: CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) CFunc(TEvents::TSystem::Wakeup, HandleWakeup) HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDataQueryResponse) - HFunc(TEvKqpWorkerResponse, HandleMonitoring) + HFunc(TEvKqpWorkerResponse, HandleResult) HFunc(NMon::TEvHttpInfo, HandleHTML) ) @@ -441,7 +374,8 @@ private: // death void HandlePoisonPill(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, starting death process"); + LOG_CRIT_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, " + << "but it is supposed to pass away by receiving TEvKqpWorkerResponse from all of the workers"); StartDeathProcess(ctx); } @@ -490,11 +424,11 @@ private: TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport()); Report->Duration = TDuration::Seconds(DurationSeconds); - auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK called StartDeathProcess"); + auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK, called StartDeathProcess"); finishEv->LastHtmlPage = RenderHTML(); finishEv->JsonResult = GetJsonResult(); ctx.Send(Parent, finishEv); - Die(ctx); + PassAway(); } private: @@ -502,50 +436,34 @@ private: NJson::TJsonValue GetJsonResult() const { NJson::TJsonValue value; value["duration_s"] = DurationSeconds; - value["tx/s"] = Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0); - value["errors"] = Total->WindowErrors; + value["rps"] = Total->LatencyHist.GetTotalCount() / static_cast<double>(DurationSeconds); + value["errors"] = Total->Errors; { auto& p = value["percentile"]; - p["50"] = Total->WindowHist.GetValueAtPercentile(50.0) / 1000.0; - p["95"] = Total->WindowHist.GetValueAtPercentile(95.0) / 1000.0; - p["99"] = Total->WindowHist.GetValueAtPercentile(99.0) / 1000.0; - p["100"] = Total->WindowHist.GetMax() / 1000.0; + p["50"] = Total->LatencyHist.GetValueAtPercentile(50.0) / 1000.0; + p["95"] = Total->LatencyHist.GetValueAtPercentile(95.0) / 1000.0; + p["99"] = Total->LatencyHist.GetValueAtPercentile(99.0) / 1000.0; + p["100"] = Total->LatencyHist.GetMax() / 1000.0; } value["config"] = ConfigString; return value; } // monitoring - void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) { + void HandleResult(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) { const auto& response = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag - << " Phase: " << response->Phase - << " Min: " << response->Data.WindowHist.GetMin() - << " Max: " << response->Data.WindowHist.GetMax() - << " Count: " << response->Data.WindowHist.GetTotalCount()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " + << response->WorkerTag); - Chunk[response->Phase]->Add(response->Data); Total->Add(response->Data); - AnswersReceived[response->Phase] += 1; - - if (AnswersReceived[Phase] == NumOfSessions) { - UpdatePhase(ctx); - } - } - - void UpdatePhase(const TActorContext& ctx) { - Phase += 1; - - LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " total: Phase: " << Phase); - - if (Phase >= WindowCount) { + ++ResultsReceived; + if (ResultsReceived == NumOfSessions) { StartDeathProcess(ctx); } } -private: - // creating tables + // tables creation void CreateSessionForTablesDDL(const TActorContext& ctx) { LOG_NOTICE_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for session creation"); @@ -599,8 +517,6 @@ private: } } -private: - // table initialization void InsertInitData(const TActorContext& ctx) { @@ -637,8 +553,6 @@ private: } } -private: - TString RenderHTML() { TStringStream str; HTML(str) { @@ -674,25 +588,13 @@ private: TABLEBODY() { TABLER() { TABLED() { str << "total"; }; - TABLED() { str << Total->WindowHist.GetTotalCount(); }; - TABLED() { str << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0); }; - TABLED() { str << Total->WindowErrors; }; - TABLED() { str << Total->WindowHist.GetValueAtPercentile(50.0) / 1000.0; }; - TABLED() { str << Total->WindowHist.GetValueAtPercentile(95.0) / 1000.0; }; - TABLED() { str << Total->WindowHist.GetValueAtPercentile(99.0) / 1000.0; }; - TABLED() { str << Total->WindowHist.GetMax() / 1000.0; }; - } - for (size_t i = Phase; i >= 1; --i) { - TABLER() { - TABLED() { str << i; }; - TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); }; - TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); }; - TABLED() { str << Chunk[i - 1]->WindowErrors; }; - TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(50.0) / 1000.0; }; - TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(95.0) / 1000.0; }; - TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(99.0) / 1000.0; }; - TABLED() { str << Chunk[i - 1]->WindowHist.GetMax() / 1000.0; }; - } + TABLED() { str << Total->LatencyHist.GetTotalCount(); }; + TABLED() { str << Total->LatencyHist.GetTotalCount() / static_cast<double>(DurationSeconds); }; + TABLED() { str << Total->Errors; }; + TABLED() { str << Total->LatencyHist.GetValueAtPercentile(50.0) / 1000.0; }; + TABLED() { str << Total->LatencyHist.GetValueAtPercentile(95.0) / 1000.0; }; + TABLED() { str << Total->LatencyHist.GetValueAtPercentile(99.0) / 1000.0; }; + TABLED() { str << Total->LatencyHist.GetMax() / 1000.0; }; } } } @@ -707,9 +609,6 @@ private: ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(RenderHTML(), ev->Get()->SubRequestId)); } - -private: - // common void AppendWorker(const TActorContext& ctx) { @@ -719,10 +618,8 @@ private: WorkloadQueryGen, WorkloadType, Tag, - CurrentSessions++, + Workers.size(), Start + TDuration::Seconds(DurationSeconds), - WindowDuration, - WindowCount, Transactions, TransactionsBytesWritten); Workers.push_back(ctx.Register(worker)); @@ -739,20 +636,17 @@ private: ctx.Send(kqp_proxy, ev.Release()); } -private: TInstant Start; TString TableSession = "wrong sessionId"; TString WorkingDir; ui64 WorkloadType; - ui64 WindowCount; - ui64 WindowDuration; std::vector<TActorId> Workers; TString ConfigString; ui64 UniformPartitionsCount; bool DeleteTableOnFinish; - ui32 CurrentSessions = 0; size_t NumOfSessions = 0; bool IncreaseSessions = false; + size_t ResultsReceived = 0; NYdbWorkload::EWorkload WorkloadClass; NYdbWorkload::TQueryInfoList InitData; @@ -763,12 +657,7 @@ private: std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen; // Monitoring - std::vector<std::unique_ptr<MonitoringData>> Chunk; - std::vector<ui64> AnswersReceived; std::unique_ptr<MonitoringData> Total; - ui64 Phase = 0; - - // counters TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters; NMonitoring::TDynamicCounters::TCounterPtr Transactions; NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten; |