diff options
| author | Vladislav Kuznetsov <[email protected]> | 2022-05-19 16:53:21 +0300 |
|---|---|---|
| committer | Vladislav Kuznetsov <[email protected]> | 2022-05-19 16:53:21 +0300 |
| commit | 4c76e660dfc0633423adb03f3af4d11741ff1f63 (patch) | |
| tree | b0f3d523810cb9f22c2d26ee57eed7b19abfb9e0 | |
| parent | 9b18c778aaca9023c365db7f100490faa5aa8ceb (diff) | |
Add missing session actor's counters KIKIMR-14934
ref:57d03b73f4224b00b6739ffb5b46789ea4cddae6
| -rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 65 | ||||
| -rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 39 | ||||
| -rw-r--r-- | ydb/core/kqp/kqp_worker_common.h | 39 |
5 files changed, 107 insertions, 51 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index f62713bdd2c..00730e98f40 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -203,6 +203,8 @@ void TKqpCountersBase::Init() { SessionActorsClosedError = KqpGroup->GetCounter("SessionActors/ClosedError", true); SessionActorsClosedRequest = KqpGroup->GetCounter("SessionActors/ClosedRequest", true); ActiveSessionActors = KqpGroup->GetCounter("SessionActors/Active", false); + SessionActorCleanupLatency = KqpGroup->GetHistogram( + "SessionActors/CleanupLatencyMs", NMonitoring::ExponentialHistogram(10, 2, 1)); SessionBalancerCV = KqpGroup->GetCounter("SessionBalancer/CV", false); SessionBalancerShutdowns = KqpGroup->GetCounter("SessionBalancer/Shutdown", true); @@ -452,7 +454,6 @@ void TKqpCountersBase::ReportWorkerFinished(TDuration lifeSpan) { YdbSessionsActiveCount->Dec(); } - void TKqpCountersBase::ReportWorkerCleanupLatency(TDuration cleanupTime) { WorkerCleanupLatency->Collect(cleanupTime.MilliSeconds()); } @@ -491,6 +492,10 @@ void TKqpCountersBase::ReportSessionActorFinished(TDuration lifeSpan) { YdbSessionsActiveCount->Dec(); } +void TKqpCountersBase::ReportSessionActorCleanupLatency(TDuration cleanupTime) { + SessionActorCleanupLatency->Collect(cleanupTime.MilliSeconds()); +} + void TKqpCountersBase::ReportSessionActorClosedError() { SessionActorsClosedError->Inc(); } @@ -1018,6 +1023,13 @@ void TKqpCounters::ReportSessionActorFinished(TKqpDbCountersPtr dbCounters, TDur } } +void TKqpCounters::ReportSessionActorCleanupLatency(TKqpDbCountersPtr dbCounters, TDuration cleanupTime) { + TKqpCountersBase::ReportSessionActorCleanupLatency(cleanupTime); + if (dbCounters) { + dbCounters->ReportSessionActorCleanupLatency(cleanupTime); + } +} + void TKqpCounters::ReportSessionActorClosedError(TKqpDbCountersPtr dbCounters) { TKqpCountersBase::ReportSessionActorClosedError(); if (dbCounters) { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 36b99703cbb..f5815d97cff 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -180,6 +180,7 @@ protected: NMonitoring::TDynamicCounters::TCounterPtr SessionActorsClosedError; NMonitoring::TDynamicCounters::TCounterPtr SessionActorsClosedRequest; NMonitoring::TDynamicCounters::TCounterPtr ActiveSessionActors; + NMonitoring::THistogramPtr SessionActorCleanupLatency; // Transactions NMonitoring::TDynamicCounters::TCounterPtr TxCreated; diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 2322ea88432..663e9877ad7 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -93,7 +93,7 @@ struct TKqpCleanupCtx { ui64 TransactionsToBeAborted = 0; std::vector<IKqpGateway::TExecPhysicalRequest> ExecuterAbortRequests; bool Final = false; - TInstant Start; + TInstant Start = TInstant::Now(); }; EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest, EKikimrStatsMode minMode) { @@ -558,6 +558,7 @@ public: it.Value()->Invalidate(); TransactionsToBeAborted.emplace_back(std::move(it.Value())); ExplicitTransactions.Erase(it); + ++EvictedTx; } else { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); std::vector<TIssue> issues{ @@ -581,6 +582,10 @@ public: QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false); SetIsolationLevel(settings); CreateNewTx(); + + Counters->ReportTxCreated(Settings.DbCounters); + Counters->ReportBeginTransaction(Settings.DbCounters, EvictedTx, ExplicitTransactions.Size(), + TransactionsToBeAborted.size()); } std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { @@ -1033,7 +1038,6 @@ public: TransactionsToBeAborted.emplace_back(txCtx); RemoveTransaction(QueryState->TxId); - TIssues issues; issues.AddIssue(YqlIssue({}, TIssuesIds::CORE_EXEC, "Execution")); TIssues subIssues; @@ -1271,6 +1275,41 @@ public: } } + void UpdateQueryExecutionCountes() { + auto now = TInstant::Now(); + auto queryDuration = now - QueryState->StartTime; + + Counters->ReportQueryLatency(Settings.DbCounters, QueryState->Request.GetAction(), queryDuration); + + auto& stats = QueryState->Stats; + auto plan = SerializeAnalyzePlan(stats); + + auto maxReadType = ExtractMostHeavyReadType(plan); + if (maxReadType == ETableReadType::FullScan) { + Counters->ReportQueryWithFullScan(Settings.DbCounters); + } else if (maxReadType == ETableReadType::Scan) { + Counters->ReportQueryWithRangeScan(Settings.DbCounters); + } + + ui32 affectedShardsCount = 0; + ui64 readBytesCount = 0; + ui64 readRowsCount = 0; + for (const auto& exec : stats.GetExecutions()) { + for (const auto& table : exec.GetTables()) { + affectedShardsCount = std::max(affectedShardsCount, table.GetAffectedPartitions()); + readBytesCount += table.GetReadBytes(); + readRowsCount += table.GetReadRows(); + } + } + + Counters->ReportQueryAffectedShards(Settings.DbCounters, affectedShardsCount); + Counters->ReportQueryReadRows(Settings.DbCounters, readRowsCount); + Counters->ReportQueryReadBytes(Settings.DbCounters, readBytesCount); + Counters->ReportQueryReadSets(Settings.DbCounters, stats.GetReadSetsCount()); + Counters->ReportQueryMaxShardReplySize(Settings.DbCounters, stats.GetMaxShardReplySize()); + Counters->ReportQueryMaxShardProgramSize(Settings.DbCounters, stats.GetMaxShardProgramSize()); + } + void ReplySuccess() { auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena()); @@ -1291,6 +1330,8 @@ public: FillTxInfo(response); + UpdateQueryExecutionCountes(); + bool replyQueryId = false; bool replyQueryParameters = false; auto& queryRequest = QueryState->Request; @@ -1422,10 +1463,7 @@ public: YQL_ENSURE(QueryState); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - auto& queryRequest = QueryState->Request; - auto queryDuration = TInstant::Now() - QueryState->StartTime; YQL_ENSURE(Counters); - Counters->ReportQueryLatency(Settings.DbCounters, queryRequest.GetAction(), queryDuration); auto& record = QueryResponse->Record.GetRef(); auto& response = *record.MutableResponse(); @@ -1444,11 +1482,8 @@ public: LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId << ", proxyId: " << QueryState->Sender.ToString()); - if (status == Ydb::StatusIds::INTERNAL_ERROR) { - LOG_D(requestInfo << "SessionActor destroyed due to internal error"); - Counters->ReportSessionActorClosedError(Settings.DbCounters); - } else if (status == Ydb::StatusIds::BAD_SESSION) { - LOG_D(requestInfo << "SessionActor destroyed due to session error"); + if (IsFatalError(status)) { + LOG_N(requestInfo << "SessionActor destroyed due to " << status); Counters->ReportSessionActorClosedError(Settings.DbCounters); } } @@ -1608,11 +1643,15 @@ public: void Cleanup(bool isFinal = false) { isFinal = isFinal || !QueryState->KeepSession; + if (isFinal) + Counters->ReportSessionActorClosedRequest(Settings.DbCounters); + if (isFinal) { for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) { it.Value()->Invalidate(); TransactionsToBeAborted.emplace_back(std::move(it.Value())); } + Counters->ReportTxAborted(Settings.DbCounters, TransactionsToBeAborted.size()); ExplicitTransactions.Clear(); } @@ -1661,6 +1700,9 @@ public: if (QueryResponse) Reply(); + if (CleanupCtx) + Counters->ReportSessionActorCleanupLatency(Settings.DbCounters, TInstant::Now() - CleanupCtx->Start); + if (isFinal) { auto lifeSpan = TInstant::Now() - CreationTime; Counters->ReportSessionActorFinished(Settings.DbCounters, lifeSpan); @@ -1831,7 +1873,7 @@ private: auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); ReplyQueryError(requestInfo, Ydb::StatusIds::INTERNAL_ERROR, message); } else { - PassAway(); + FinalCleanup(); } } @@ -1854,6 +1896,7 @@ private: TKikimrConfiguration::TPtr Config; TLRUCache<TString, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions; std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; + ui64 EvictedTx = 0; std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse; TActorId IdleTimerActorId; diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 7a05f12b229..17c4e8e4170 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -22,7 +22,6 @@ #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/json/json_reader.h> #include <util/string/escape.h> @@ -84,12 +83,6 @@ struct TKqpCleanupState { TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncResult; }; -enum ETableReadType { - Other = 0, - Scan = 1, - FullScan = 2, -}; - EKikimrStatsMode GetStatsMode(const NKikimrKqp::TQueryRequest& queryRequest, EKikimrStatsMode minMode) { if (queryRequest.GetProfile()) { // TODO: Deprecate, StatsMode is the new way to enable stats. @@ -1542,38 +1535,6 @@ private: return Reply(std::move(responseEv), ctx); } - ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) { - ETableReadType maxReadType = ETableReadType::Other; - - if (queryPlan.empty()) { - return maxReadType; - } - - NJson::TJsonValue root; - NJson::ReadJsonTree(queryPlan, &root, false); - - if (root.Has("tables")) { - for (const auto& table : root["tables"].GetArray()) { - if (!table.Has("reads")) { - continue; - } - - for (const auto& read : table["reads"].GetArray()) { - Y_VERIFY(read.Has("type")); - const auto& type = read["type"].GetString(); - - if (type == "Scan") { - maxReadType = Max(maxReadType, ETableReadType::Scan); - } else if (type == "FullScan") { - return ETableReadType::FullScan; - } - } - } - } - - return maxReadType; - } - bool ReplyQueryResult(const TActorContext& ctx) { Y_VERIFY(QueryState); auto& queryRequest = QueryState->Request; diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h index 9885cdb476a..bc29a7e93d1 100644 --- a/ydb/core/kqp/kqp_worker_common.h +++ b/ydb/core/kqp/kqp_worker_common.h @@ -1,5 +1,6 @@ #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/json/json_reader.h> #include <util/datetime/base.h> #include <util/generic/string.h> @@ -81,4 +82,42 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings return cfg; } +enum ETableReadType { + Other = 0, + Scan = 1, + FullScan = 2, +}; + +inline ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) { + ETableReadType maxReadType = ETableReadType::Other; + + if (queryPlan.empty()) { + return maxReadType; + } + + NJson::TJsonValue root; + NJson::ReadJsonTree(queryPlan, &root, false); + + if (root.Has("tables")) { + for (const auto& table : root["tables"].GetArray()) { + if (!table.Has("reads")) { + continue; + } + + for (const auto& read : table["reads"].GetArray()) { + Y_VERIFY(read.Has("type")); + const auto& type = read["type"].GetString(); + + if (type == "Scan") { + maxReadType = Max(maxReadType, ETableReadType::Scan); + } else if (type == "FullScan") { + return ETableReadType::FullScan; + } + } + } + } + + return maxReadType; +} + } |
