summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <[email protected]>2022-05-19 16:53:21 +0300
committerVladislav Kuznetsov <[email protected]>2022-05-19 16:53:21 +0300
commit4c76e660dfc0633423adb03f3af4d11741ff1f63 (patch)
treeb0f3d523810cb9f22c2d26ee57eed7b19abfb9e0
parent9b18c778aaca9023c365db7f100490faa5aa8ceb (diff)
Add missing session actor's counters KIKIMR-14934
ref:57d03b73f4224b00b6739ffb5b46789ea4cddae6
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp14
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h1
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp65
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp39
-rw-r--r--ydb/core/kqp/kqp_worker_common.h39
5 files changed, 107 insertions, 51 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index f62713bdd2c..00730e98f40 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -203,6 +203,8 @@ void TKqpCountersBase::Init() {
SessionActorsClosedError = KqpGroup->GetCounter("SessionActors/ClosedError", true);
SessionActorsClosedRequest = KqpGroup->GetCounter("SessionActors/ClosedRequest", true);
ActiveSessionActors = KqpGroup->GetCounter("SessionActors/Active", false);
+ SessionActorCleanupLatency = KqpGroup->GetHistogram(
+ "SessionActors/CleanupLatencyMs", NMonitoring::ExponentialHistogram(10, 2, 1));
SessionBalancerCV = KqpGroup->GetCounter("SessionBalancer/CV", false);
SessionBalancerShutdowns = KqpGroup->GetCounter("SessionBalancer/Shutdown", true);
@@ -452,7 +454,6 @@ void TKqpCountersBase::ReportWorkerFinished(TDuration lifeSpan) {
YdbSessionsActiveCount->Dec();
}
-
void TKqpCountersBase::ReportWorkerCleanupLatency(TDuration cleanupTime) {
WorkerCleanupLatency->Collect(cleanupTime.MilliSeconds());
}
@@ -491,6 +492,10 @@ void TKqpCountersBase::ReportSessionActorFinished(TDuration lifeSpan) {
YdbSessionsActiveCount->Dec();
}
+void TKqpCountersBase::ReportSessionActorCleanupLatency(TDuration cleanupTime) {
+ SessionActorCleanupLatency->Collect(cleanupTime.MilliSeconds());
+}
+
void TKqpCountersBase::ReportSessionActorClosedError() {
SessionActorsClosedError->Inc();
}
@@ -1018,6 +1023,13 @@ void TKqpCounters::ReportSessionActorFinished(TKqpDbCountersPtr dbCounters, TDur
}
}
+void TKqpCounters::ReportSessionActorCleanupLatency(TKqpDbCountersPtr dbCounters, TDuration cleanupTime) {
+ TKqpCountersBase::ReportSessionActorCleanupLatency(cleanupTime);
+ if (dbCounters) {
+ dbCounters->ReportSessionActorCleanupLatency(cleanupTime);
+ }
+}
+
void TKqpCounters::ReportSessionActorClosedError(TKqpDbCountersPtr dbCounters) {
TKqpCountersBase::ReportSessionActorClosedError();
if (dbCounters) {
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 36b99703cbb..f5815d97cff 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -180,6 +180,7 @@ protected:
NMonitoring::TDynamicCounters::TCounterPtr SessionActorsClosedError;
NMonitoring::TDynamicCounters::TCounterPtr SessionActorsClosedRequest;
NMonitoring::TDynamicCounters::TCounterPtr ActiveSessionActors;
+ NMonitoring::THistogramPtr SessionActorCleanupLatency;
// Transactions
NMonitoring::TDynamicCounters::TCounterPtr TxCreated;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 2322ea88432..663e9877ad7 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -93,7 +93,7 @@ struct TKqpCleanupCtx {
ui64 TransactionsToBeAborted = 0;
std::vector<IKqpGateway::TExecPhysicalRequest> ExecuterAbortRequests;
bool Final = false;
- TInstant Start;
+ TInstant Start = TInstant::Now();
};
EKikimrStatsMode GetStatsModeInt(const NKikimrKqp::TQueryRequest& queryRequest, EKikimrStatsMode minMode) {
@@ -558,6 +558,7 @@ public:
it.Value()->Invalidate();
TransactionsToBeAborted.emplace_back(std::move(it.Value()));
ExplicitTransactions.Erase(it);
+ ++EvictedTx;
} else {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
std::vector<TIssue> issues{
@@ -581,6 +582,10 @@ public:
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false);
SetIsolationLevel(settings);
CreateNewTx();
+
+ Counters->ReportTxCreated(Settings.DbCounters);
+ Counters->ReportBeginTransaction(Settings.DbCounters, EvictedTx, ExplicitTransactions.Size(),
+ TransactionsToBeAborted.size());
}
std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
@@ -1033,7 +1038,6 @@ public:
TransactionsToBeAborted.emplace_back(txCtx);
RemoveTransaction(QueryState->TxId);
-
TIssues issues;
issues.AddIssue(YqlIssue({}, TIssuesIds::CORE_EXEC, "Execution"));
TIssues subIssues;
@@ -1271,6 +1275,41 @@ public:
}
}
+ void UpdateQueryExecutionCountes() {
+ auto now = TInstant::Now();
+ auto queryDuration = now - QueryState->StartTime;
+
+ Counters->ReportQueryLatency(Settings.DbCounters, QueryState->Request.GetAction(), queryDuration);
+
+ auto& stats = QueryState->Stats;
+ auto plan = SerializeAnalyzePlan(stats);
+
+ auto maxReadType = ExtractMostHeavyReadType(plan);
+ if (maxReadType == ETableReadType::FullScan) {
+ Counters->ReportQueryWithFullScan(Settings.DbCounters);
+ } else if (maxReadType == ETableReadType::Scan) {
+ Counters->ReportQueryWithRangeScan(Settings.DbCounters);
+ }
+
+ ui32 affectedShardsCount = 0;
+ ui64 readBytesCount = 0;
+ ui64 readRowsCount = 0;
+ for (const auto& exec : stats.GetExecutions()) {
+ for (const auto& table : exec.GetTables()) {
+ affectedShardsCount = std::max(affectedShardsCount, table.GetAffectedPartitions());
+ readBytesCount += table.GetReadBytes();
+ readRowsCount += table.GetReadRows();
+ }
+ }
+
+ Counters->ReportQueryAffectedShards(Settings.DbCounters, affectedShardsCount);
+ Counters->ReportQueryReadRows(Settings.DbCounters, readRowsCount);
+ Counters->ReportQueryReadBytes(Settings.DbCounters, readBytesCount);
+ Counters->ReportQueryReadSets(Settings.DbCounters, stats.GetReadSetsCount());
+ Counters->ReportQueryMaxShardReplySize(Settings.DbCounters, stats.GetMaxShardReplySize());
+ Counters->ReportQueryMaxShardProgramSize(Settings.DbCounters, stats.GetMaxShardProgramSize());
+ }
+
void ReplySuccess() {
auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena());
@@ -1291,6 +1330,8 @@ public:
FillTxInfo(response);
+ UpdateQueryExecutionCountes();
+
bool replyQueryId = false;
bool replyQueryParameters = false;
auto& queryRequest = QueryState->Request;
@@ -1422,10 +1463,7 @@ public:
YQL_ENSURE(QueryState);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- auto& queryRequest = QueryState->Request;
- auto queryDuration = TInstant::Now() - QueryState->StartTime;
YQL_ENSURE(Counters);
- Counters->ReportQueryLatency(Settings.DbCounters, queryRequest.GetAction(), queryDuration);
auto& record = QueryResponse->Record.GetRef();
auto& response = *record.MutableResponse();
@@ -1444,11 +1482,8 @@ public:
LOG_D(requestInfo << "Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId
<< ", proxyId: " << QueryState->Sender.ToString());
- if (status == Ydb::StatusIds::INTERNAL_ERROR) {
- LOG_D(requestInfo << "SessionActor destroyed due to internal error");
- Counters->ReportSessionActorClosedError(Settings.DbCounters);
- } else if (status == Ydb::StatusIds::BAD_SESSION) {
- LOG_D(requestInfo << "SessionActor destroyed due to session error");
+ if (IsFatalError(status)) {
+ LOG_N(requestInfo << "SessionActor destroyed due to " << status);
Counters->ReportSessionActorClosedError(Settings.DbCounters);
}
}
@@ -1608,11 +1643,15 @@ public:
void Cleanup(bool isFinal = false) {
isFinal = isFinal || !QueryState->KeepSession;
+ if (isFinal)
+ Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
+
if (isFinal) {
for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) {
it.Value()->Invalidate();
TransactionsToBeAborted.emplace_back(std::move(it.Value()));
}
+ Counters->ReportTxAborted(Settings.DbCounters, TransactionsToBeAborted.size());
ExplicitTransactions.Clear();
}
@@ -1661,6 +1700,9 @@ public:
if (QueryResponse)
Reply();
+ if (CleanupCtx)
+ Counters->ReportSessionActorCleanupLatency(Settings.DbCounters, TInstant::Now() - CleanupCtx->Start);
+
if (isFinal) {
auto lifeSpan = TInstant::Now() - CreationTime;
Counters->ReportSessionActorFinished(Settings.DbCounters, lifeSpan);
@@ -1831,7 +1873,7 @@ private:
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
ReplyQueryError(requestInfo, Ydb::StatusIds::INTERNAL_ERROR, message);
} else {
- PassAway();
+ FinalCleanup();
}
}
@@ -1854,6 +1896,7 @@ private:
TKikimrConfiguration::TPtr Config;
TLRUCache<TString, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions;
std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
+ ui64 EvictedTx = 0;
std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
TActorId IdleTimerActorId;
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 7a05f12b229..17c4e8e4170 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -22,7 +22,6 @@
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
-#include <library/cpp/json/json_reader.h>
#include <util/string/escape.h>
@@ -84,12 +83,6 @@ struct TKqpCleanupState {
TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncResult;
};
-enum ETableReadType {
- Other = 0,
- Scan = 1,
- FullScan = 2,
-};
-
EKikimrStatsMode GetStatsMode(const NKikimrKqp::TQueryRequest& queryRequest, EKikimrStatsMode minMode) {
if (queryRequest.GetProfile()) {
// TODO: Deprecate, StatsMode is the new way to enable stats.
@@ -1542,38 +1535,6 @@ private:
return Reply(std::move(responseEv), ctx);
}
- ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) {
- ETableReadType maxReadType = ETableReadType::Other;
-
- if (queryPlan.empty()) {
- return maxReadType;
- }
-
- NJson::TJsonValue root;
- NJson::ReadJsonTree(queryPlan, &root, false);
-
- if (root.Has("tables")) {
- for (const auto& table : root["tables"].GetArray()) {
- if (!table.Has("reads")) {
- continue;
- }
-
- for (const auto& read : table["reads"].GetArray()) {
- Y_VERIFY(read.Has("type"));
- const auto& type = read["type"].GetString();
-
- if (type == "Scan") {
- maxReadType = Max(maxReadType, ETableReadType::Scan);
- } else if (type == "FullScan") {
- return ETableReadType::FullScan;
- }
- }
- }
- }
-
- return maxReadType;
- }
-
bool ReplyQueryResult(const TActorContext& ctx) {
Y_VERIFY(QueryState);
auto& queryRequest = QueryState->Request;
diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h
index 9885cdb476a..bc29a7e93d1 100644
--- a/ydb/core/kqp/kqp_worker_common.h
+++ b/ydb/core/kqp/kqp_worker_common.h
@@ -1,5 +1,6 @@
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/json/json_reader.h>
#include <util/datetime/base.h>
#include <util/generic/string.h>
@@ -81,4 +82,42 @@ inline TIntrusivePtr<NYql::TKikimrConfiguration> CreateConfig(const TKqpSettings
return cfg;
}
+enum ETableReadType {
+ Other = 0,
+ Scan = 1,
+ FullScan = 2,
+};
+
+inline ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) {
+ ETableReadType maxReadType = ETableReadType::Other;
+
+ if (queryPlan.empty()) {
+ return maxReadType;
+ }
+
+ NJson::TJsonValue root;
+ NJson::ReadJsonTree(queryPlan, &root, false);
+
+ if (root.Has("tables")) {
+ for (const auto& table : root["tables"].GetArray()) {
+ if (!table.Has("reads")) {
+ continue;
+ }
+
+ for (const auto& read : table["reads"].GetArray()) {
+ Y_VERIFY(read.Has("type"));
+ const auto& type = read["type"].GetString();
+
+ if (type == "Scan") {
+ maxReadType = Max(maxReadType, ETableReadType::Scan);
+ } else if (type == "FullScan") {
+ return ETableReadType::FullScan;
+ }
+ }
+ }
+ }
+
+ return maxReadType;
+}
+
}