aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com>2024-07-30 20:08:14 +0300
committerGitHub <noreply@github.com>2024-07-30 20:08:14 +0300
commitea1b205d749cc1c3e37c7ae42817534ede33e390 (patch)
treef6f835847a0d8af1aebf3794951bdd0be91d599f
parent18c3f7ac94fafeafc9428a185b0111435ccb7d84 (diff)
downloadydb-ea1b205d749cc1c3e37c7ae42817534ede33e390.tar.gz
YQ-3446 add queued time into query stats (#6965)
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp33
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_stats.cpp1
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_stats.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp4
-rw-r--r--ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp12
-rw-r--r--ydb/core/protos/kqp_stats.proto1
7 files changed, 49 insertions, 4 deletions
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index ad7893cbd9..f9935c156e 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
return planJson.GetStringRobust();
}
-TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "") {
+TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") {
NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);
@@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NO
writer.BeginObject();
writer.WriteKey("Node Type").WriteString("Query");
writer.WriteKey("PlanNodeType").WriteString("Query");
+
+ if (queryStats) {
+ NJson::TJsonValue queryStatsJson;
+ NJson::ReadJsonTree(queryStats, &queryStatsJson, true);
+
+ writer.WriteKey("Stats");
+ writer.WriteJsonValue(&queryStatsJson);
+ }
+
writer.WriteKey("Plans");
writer.BeginList();
@@ -2717,7 +2726,27 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
txPlans.push_back(txPlan);
}
}
- return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>());
+
+ NJsonWriter::TBuf writer;
+ writer.BeginObject();
+
+ if (queryStats.HasCompilation()) {
+ const auto& compilation = queryStats.GetCompilation();
+
+ writer.WriteKey("Compilation");
+ writer.BeginObject();
+ writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache());
+ writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs());
+ writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs());
+ writer.EndObject();
+ }
+
+ writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
+ writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
+ writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
+ writer.EndObject();
+
+ return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
}
TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 9aae171d57..ea52bf1d0f 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -114,6 +114,7 @@ public:
bool IsDocumentApiRestricted_ = false;
TInstant StartTime;
+ TInstant ContinueTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;
TKqpQueryStats QueryStats;
bool KeepSession = false;
diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.cpp b/ydb/core/kqp/session_actor/kqp_query_stats.cpp
index e26d6b5e7b..922b788419 100644
--- a/ydb/core/kqp/session_actor/kqp_query_stats.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_stats.cpp
@@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
NKqpProto::TKqpStatsQuery result;
result.SetDurationUs(DurationUs);
+ result.SetQueuedTimeUs(QueuedTimeUs);
if (Compilation) {
result.MutableCompilation()->SetFromCache(Compilation->FromCache);
diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.h b/ydb/core/kqp/session_actor/kqp_query_stats.h
index f73ce6316f..9cda3417be 100644
--- a/ydb/core/kqp/session_actor/kqp_query_stats.h
+++ b/ydb/core/kqp/session_actor/kqp_query_stats.h
@@ -8,6 +8,7 @@ namespace NKikimr::NKqp {
struct TKqpQueryStats {
ui64 DurationUs = 0;
+ ui64 QueuedTimeUs = 0;
std::optional<TKqpStatsCompile> Compilation;
ui64 WorkerCpuTimeUs = 0;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 7d0fcd8de8..a2c708a81f 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -475,6 +475,7 @@ public:
void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
YQL_ENSURE(QueryState);
+ QueryState->ContinueTime = TInstant::Now();
if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
LOG_T("Failed to place request in resource pool, feature flag is disabled");
@@ -1551,6 +1552,9 @@ public:
stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
+ if (const auto continueTime = QueryState->ContinueTime) {
+ stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
+ }
if (QueryState->CompileResult) {
stats->Compilation.emplace();
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);
diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
index f0f11628a0..c6b4ef3821 100644
--- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
+++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
@@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
UpdateConfigCounters(poolConfig);
}
+ void CollectRequestLatency(TInstant continueTime) {
+ if (continueTime) {
+ RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
+ }
+ }
+
void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
@@ -106,6 +112,7 @@ protected:
const TActorId WorkerActorId;
const TString SessionId;
const TInstant StartTime = TInstant::Now();
+ TInstant ContinueTime;
EState State = EState::Pending;
bool Started = false; // after TEvContinueRequest success
@@ -267,6 +274,7 @@ public:
if (status == Ydb::StatusIds::SUCCESS) {
LocalInFlight++;
request->Started = true;
+ request->ContinueTime = TInstant::Now();
Counters.LocalInFly->Inc();
Counters.ContinueOk->Inc();
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
@@ -387,7 +395,7 @@ private:
if (status == Ydb::StatusIds::SUCCESS) {
Counters.CleanupOk->Inc();
- Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
+ Counters.CollectRequestLatency(request->ContinueTime);
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
} else {
Counters.CleanupError->Inc();
@@ -401,7 +409,7 @@ private:
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());
Counters.Cancelled->Inc();
- Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
+ Counters.CollectRequestLatency(request->ContinueTime);
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
}
diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto
index 24e09f2dcd..df70faff1a 100644
--- a/ydb/core/protos/kqp_stats.proto
+++ b/ydb/core/protos/kqp_stats.proto
@@ -73,6 +73,7 @@ message TKqpExecutionExtraStats {
message TKqpStatsQuery {
// Basic stats
uint64 DurationUs = 1;
+ uint64 QueuedTimeUs = 9;
TKqpStatsCompile Compilation = 2;
reserved 3; // repeated TKqpStatsExecution Executions = 3;