aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-12-29 13:45:41 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-12-29 13:45:41 +0300
commitb7f64a4cbbcf1013a148f05b41f7cf46aaa10489 (patch)
treec67a51df505f8db2b565338cf19faa9c4a53fb8b
parent49f67ead5717d06751bf522fb78d0e8b2c2494e1 (diff)
downloadydb-b7f64a4cbbcf1013a148f05b41f7cf46aaa10489.tar.gz
Rework Kqp Stats
-rw-r--r--ydb/core/load_test/kqp.cpp217
1 files changed, 53 insertions, 164 deletions
diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp
index f705d427ce7..6f36a096e9c 100644
--- a/ydb/core/load_test/kqp.cpp
+++ b/ydb/core/load_test/kqp.cpp
@@ -38,25 +38,22 @@ enum {
struct MonitoringData {
public:
MonitoringData()
- : WindowHist(60000, 2)
- , WindowErrors(0) {}
+ {}
MonitoringData(const NHdr::THistogram& hist, ui64 window_errors)
- : WindowHist(60000, 2)
- , WindowErrors(window_errors)
+ : Errors(window_errors)
{
- WindowHist.Add(hist);
+ LatencyHist.Add(hist);
}
void Add(const MonitoringData& other) {
- WindowHist.Add(other.WindowHist);
- WindowErrors += other.WindowErrors;
+ LatencyHist.Add(other.LatencyHist);
+ Errors += other.Errors;
}
public:
- NHdr::THistogram WindowHist;
- ui64 WindowErrors;
-
+ NHdr::THistogram LatencyHist{60000, 2};
+ ui64 Errors = 0;
};
void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) {
@@ -87,14 +84,12 @@ void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, con
struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerResponse> {
public:
- TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag)
- : Data(hist, window_errors)
- , Phase(phase)
- , WorkerTag(worker_tag) {}
+ TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 errors, ui64 workerTag)
+ : Data(hist, errors)
+ , WorkerTag(workerTag) {}
public:
MonitoringData Data;
- ui64 Phase;
ui64 WorkerTag;
};
@@ -107,8 +102,6 @@ public:
ui64 parentTag,
ui64 workerTag,
TInstant endTimestamp,
- ui64 windowDuration,
- ui64 windowCount,
NMonitoring::TDynamicCounters::TCounterPtr transactions,
NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)
: Parent(std::move(parent))
@@ -118,9 +111,7 @@ public:
, ParentTag(parentTag)
, WorkerTag(workerTag)
, EndTimestamp(endTimestamp)
- , WindowHist(60000, 2)
- , WindowDuration(windowDuration)
- , WindowCount(windowCount)
+ , LatencyHist(60000, 2)
, Transactions(transactions)
, TransactionsBytesWritten(transactionsBytesWritten)
{}
@@ -129,32 +120,21 @@ public:
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called");
ctx.Schedule(EndTimestamp, new TEvents::TEvPoisonPill);
- ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
Become(&TKqpLoadWorker::StateFunc);
CreateWorkingSession(ctx);
}
- STRICT_STFUNC(StateFunc,
- CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
- HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle)
- HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle)
- HFunc(TEvUpdateMonitoring, HandleWindowTimer)
- )
-
private:
-
// death
void HandlePoisonPill(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " HandlePoisonPill");
- if (Phase < WindowCount) {
- SendMonitoringEvent(ctx);
- }
+ Send(Parent, new TEvKqpWorkerResponse(LatencyHist, Errors, WorkerTag));
CloseSession(ctx);
- Die(ctx);
+ PassAway();
}
void CloseSession(const TActorContext& ctx) {
@@ -168,8 +148,6 @@ private:
ctx.Send(kqp_proxy, ev.Release());
}
-private:
-
// working
void CreateWorkingSession(const TActorContext& ctx) {
@@ -215,7 +193,6 @@ private:
SendQueryRequest(ctx, q, WorkerSession, WorkingDir);
}
-
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
auto& response = ev->Get()->Record.GetRef();
@@ -224,72 +201,35 @@ private:
if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " data request status: Success");
TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize());
- WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs());
+ LatencyHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs());
} else {
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
<< " data request status: Fail, Issue: " + ev->Get()->ToString());
- ++WindowErrors;
+ ++Errors;
}
- if (Phase < WindowCount) {
- CreateDataQuery(ctx);
- }
- }
-
-private:
-
- // monitoring
-
- void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
- << " handle TEvUpdateMonitoring, Phase: " << Phase);
-
- SendMonitoringEvent(ctx);
-
- if (Phase < WindowCount) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
- << " reschedule TEvUpdateMonitoring, Phase: " << Phase);
- ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
- }
+ CreateDataQuery(ctx);
}
-private:
-
- // common
-
- void SendMonitoringEvent(const TActorContext& ctx) {
- auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag);
-
- WindowHist.Reset();
- WindowErrors = 0;
- ++Phase;
-
- ctx.Send(Parent, ev.Release());
- }
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle)
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle)
+ )
-private:
TActorId Parent;
TString WorkingDir;
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;
ui64 WorkloadType;
ui64 ParentTag;
ui64 WorkerTag;
-
+ TInstant EndTimestamp;
NYdbWorkload::TQueryInfoList Queries;
-
TString WorkerSession = "wrong sessionId";
-private:
- TInstant EndTimestamp;
- // for monitoring
- NHdr::THistogram WindowHist;
- ui64 WindowErrors = 0;
-
- ui64 WindowDuration;
- ui64 WindowCount;
-
- ui64 Phase = 0;
-
+ // monitoring
+ NHdr::THistogram LatencyHist;
+ ui64 Errors = 0;
NMonitoring::TDynamicCounters::TCounterPtr Transactions;
NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
};
@@ -315,19 +255,11 @@ public:
WorkingDir = cmd.GetWorkingDir();
WorkloadType = cmd.GetWorkloadType();
DurationSeconds = cmd.GetDurationSeconds();
- WindowDuration = cmd.GetWindowDuration();
- WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration;
NumOfSessions = cmd.GetNumOfSessions();
IncreaseSessions = cmd.GetIncreaseSessions();
- AnswersReceived.resize(WindowCount);
- Chunk.reserve(WindowCount);
Total = std::make_unique<MonitoringData>();
- for (size_t i = 0; i < WindowCount; ++i) {
- Chunk.push_back(std::make_unique<MonitoringData>());
- }
NYdbWorkload::TWorkloadFactory factory;
-
if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kStock) {
WorkloadClass = NYdbWorkload::EWorkload::STOCK;
NYdbWorkload::TStockWorkloadParams params;
@@ -398,7 +330,8 @@ public:
}
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill");
- ctx.Schedule(TDuration::Seconds(DurationSeconds * 2), new TEvents::TEvPoisonPill);
+ // TODO: report error in case of such death
+ ctx.Schedule(TDuration::Seconds(DurationSeconds + 10), new TEvents::TEvPoisonPill);
CreateSessionForTablesDDL(ctx);
}
@@ -428,7 +361,7 @@ public:
CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
CFunc(TEvents::TSystem::Wakeup, HandleWakeup)
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDataQueryResponse)
- HFunc(TEvKqpWorkerResponse, HandleMonitoring)
+ HFunc(TEvKqpWorkerResponse, HandleResult)
HFunc(NMon::TEvHttpInfo, HandleHTML)
)
@@ -441,7 +374,8 @@ private:
// death
void HandlePoisonPill(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, starting death process");
+ LOG_CRIT_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, "
+ << "but it is supposed to pass away by receiving TEvKqpWorkerResponse from all of the workers");
StartDeathProcess(ctx);
}
@@ -490,11 +424,11 @@ private:
TIntrusivePtr<TEvLoad::TLoadReport> Report(new TEvLoad::TLoadReport());
Report->Duration = TDuration::Seconds(DurationSeconds);
- auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK called StartDeathProcess");
+ auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, Report, "OK, called StartDeathProcess");
finishEv->LastHtmlPage = RenderHTML();
finishEv->JsonResult = GetJsonResult();
ctx.Send(Parent, finishEv);
- Die(ctx);
+ PassAway();
}
private:
@@ -502,50 +436,34 @@ private:
NJson::TJsonValue GetJsonResult() const {
NJson::TJsonValue value;
value["duration_s"] = DurationSeconds;
- value["tx/s"] = Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0);
- value["errors"] = Total->WindowErrors;
+ value["rps"] = Total->LatencyHist.GetTotalCount() / static_cast<double>(DurationSeconds);
+ value["errors"] = Total->Errors;
{
auto& p = value["percentile"];
- p["50"] = Total->WindowHist.GetValueAtPercentile(50.0) / 1000.0;
- p["95"] = Total->WindowHist.GetValueAtPercentile(95.0) / 1000.0;
- p["99"] = Total->WindowHist.GetValueAtPercentile(99.0) / 1000.0;
- p["100"] = Total->WindowHist.GetMax() / 1000.0;
+ p["50"] = Total->LatencyHist.GetValueAtPercentile(50.0) / 1000.0;
+ p["95"] = Total->LatencyHist.GetValueAtPercentile(95.0) / 1000.0;
+ p["99"] = Total->LatencyHist.GetValueAtPercentile(99.0) / 1000.0;
+ p["100"] = Total->LatencyHist.GetMax() / 1000.0;
}
value["config"] = ConfigString;
return value;
}
// monitoring
- void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) {
+ void HandleResult(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) {
const auto& response = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag
- << " Phase: " << response->Phase
- << " Min: " << response->Data.WindowHist.GetMin()
- << " Max: " << response->Data.WindowHist.GetMax()
- << " Count: " << response->Data.WindowHist.GetTotalCount());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# "
+ << response->WorkerTag);
- Chunk[response->Phase]->Add(response->Data);
Total->Add(response->Data);
- AnswersReceived[response->Phase] += 1;
-
- if (AnswersReceived[Phase] == NumOfSessions) {
- UpdatePhase(ctx);
- }
- }
-
- void UpdatePhase(const TActorContext& ctx) {
- Phase += 1;
-
- LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " total: Phase: " << Phase);
-
- if (Phase >= WindowCount) {
+ ++ResultsReceived;
+ if (ResultsReceived == NumOfSessions) {
StartDeathProcess(ctx);
}
}
-private:
- // creating tables
+ // tables creation
void CreateSessionForTablesDDL(const TActorContext& ctx) {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for session creation");
@@ -599,8 +517,6 @@ private:
}
}
-private:
-
// table initialization
void InsertInitData(const TActorContext& ctx) {
@@ -637,8 +553,6 @@ private:
}
}
-private:
-
TString RenderHTML() {
TStringStream str;
HTML(str) {
@@ -674,25 +588,13 @@ private:
TABLEBODY() {
TABLER() {
TABLED() { str << "total"; };
- TABLED() { str << Total->WindowHist.GetTotalCount(); };
- TABLED() { str << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0); };
- TABLED() { str << Total->WindowErrors; };
- TABLED() { str << Total->WindowHist.GetValueAtPercentile(50.0) / 1000.0; };
- TABLED() { str << Total->WindowHist.GetValueAtPercentile(95.0) / 1000.0; };
- TABLED() { str << Total->WindowHist.GetValueAtPercentile(99.0) / 1000.0; };
- TABLED() { str << Total->WindowHist.GetMax() / 1000.0; };
- }
- for (size_t i = Phase; i >= 1; --i) {
- TABLER() {
- TABLED() { str << i; };
- TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); };
- TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); };
- TABLED() { str << Chunk[i - 1]->WindowErrors; };
- TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(50.0) / 1000.0; };
- TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(95.0) / 1000.0; };
- TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(99.0) / 1000.0; };
- TABLED() { str << Chunk[i - 1]->WindowHist.GetMax() / 1000.0; };
- }
+ TABLED() { str << Total->LatencyHist.GetTotalCount(); };
+ TABLED() { str << Total->LatencyHist.GetTotalCount() / static_cast<double>(DurationSeconds); };
+ TABLED() { str << Total->Errors; };
+ TABLED() { str << Total->LatencyHist.GetValueAtPercentile(50.0) / 1000.0; };
+ TABLED() { str << Total->LatencyHist.GetValueAtPercentile(95.0) / 1000.0; };
+ TABLED() { str << Total->LatencyHist.GetValueAtPercentile(99.0) / 1000.0; };
+ TABLED() { str << Total->LatencyHist.GetMax() / 1000.0; };
}
}
}
@@ -707,9 +609,6 @@ private:
ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(RenderHTML(), ev->Get()->SubRequestId));
}
-
-private:
-
// common
void AppendWorker(const TActorContext& ctx) {
@@ -719,10 +618,8 @@ private:
WorkloadQueryGen,
WorkloadType,
Tag,
- CurrentSessions++,
+ Workers.size(),
Start + TDuration::Seconds(DurationSeconds),
- WindowDuration,
- WindowCount,
Transactions,
TransactionsBytesWritten);
Workers.push_back(ctx.Register(worker));
@@ -739,20 +636,17 @@ private:
ctx.Send(kqp_proxy, ev.Release());
}
-private:
TInstant Start;
TString TableSession = "wrong sessionId";
TString WorkingDir;
ui64 WorkloadType;
- ui64 WindowCount;
- ui64 WindowDuration;
std::vector<TActorId> Workers;
TString ConfigString;
ui64 UniformPartitionsCount;
bool DeleteTableOnFinish;
- ui32 CurrentSessions = 0;
size_t NumOfSessions = 0;
bool IncreaseSessions = false;
+ size_t ResultsReceived = 0;
NYdbWorkload::EWorkload WorkloadClass;
NYdbWorkload::TQueryInfoList InitData;
@@ -763,12 +657,7 @@ private:
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;
// Monitoring
- std::vector<std::unique_ptr<MonitoringData>> Chunk;
- std::vector<ui64> AnswersReceived;
std::unique_ptr<MonitoringData> Total;
- ui64 Phase = 0;
-
- // counters
TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters;
NMonitoring::TDynamicCounters::TCounterPtr Transactions;
NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;