diff options
| author | grigoriypisar <[email protected]> | 2023-08-31 14:13:45 +0300 |
|---|---|---|
| committer | grigoriypisar <[email protected]> | 2023-08-31 14:39:15 +0300 |
| commit | 3e428d48d3eb01a7ee492f0c28dc96fe6a15f600 (patch) | |
| tree | 997c014db1803337a68009dbc95dd04c4d2d8493 | |
| parent | 7cf7f8667d7e892b7ee24f2c349373faf4a14fd3 (diff) | |
Counters in lease updater
Passed counters and normalized lease updates.
| -rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 7 | ||||
| -rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 23 | ||||
| -rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.h | 5 | ||||
| -rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/kqp/run_script_actor/kqp_run_script_actor.h | 4 |
8 files changed, 55 insertions, 16 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index fdf85085c8f..a01abce3b14 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -738,6 +738,12 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co } } + /* Lease updates counters */ + LeaseUpdateLatency = KqpGroup->GetHistogram( + "LeaseUpdatesLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 1)); + RunActorLeaseUpdateBacklog = KqpGroup->GetHistogram( + "LeaseUpdatesBacklogMs", NMonitoring::LinearHistogram(30, 0, 1000)); + /* Transactions */ CreateTxKindCounters(TKqpTransactionInfo::EKind::Pure, "Pure"); CreateTxKindCounters(TKqpTransactionInfo::EKind::ReadOnly, "ReadOnly"); @@ -1000,6 +1006,14 @@ void TKqpCounters::ReportTransaction(TKqpDbCountersPtr dbCounters, const TKqpTra } } +void TKqpCounters::ReportLeaseUpdateLatency(const TDuration& duration) { + LeaseUpdateLatency->Collect(duration.MilliSeconds()); +} + +void TKqpCounters::ReportRunActorLeaseUpdateBacklog(const TDuration& duration) { + RunActorLeaseUpdateBacklog->Collect(duration.MilliSeconds()); +} + void TKqpCounters::ReportSqlVersion(TKqpDbCountersPtr dbCounters, ui16 sqlVersion) { TKqpCountersBase::ReportSqlVersion(sqlVersion); if (dbCounters) { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index d638e706d76..c7d69185982 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -294,6 +294,9 @@ public: void ReportSqlVersion(TKqpDbCountersPtr dbCounters, ui16 sqlVersion); void ReportTransaction(TKqpDbCountersPtr dbCounters, const TKqpTransactionInfo& txInfo); + void ReportLeaseUpdateLatency(const TDuration& duration); + void ReportRunActorLeaseUpdateBacklog(const TDuration& duration); + void ReportWorkerCreated(TKqpDbCountersPtr dbCounters); void ReportWorkerFinished(TKqpDbCountersPtr dbCounters, TDuration lifeSpan); void ReportWorkerCleanupLatency(TKqpDbCountersPtr dbCounters, TDuration cleanupTime); @@ -341,6 +344,10 @@ public: void RemoveDbCounters(const TString& database); public: + // Lease updates counters + ::NMonitoring::THistogramPtr LeaseUpdateLatency; + ::NMonitoring::THistogramPtr RunActorLeaseUpdateBacklog; + // Transactions THashMap<TKqpTransactionInfo::EKind, TTxByKindCounters> TxByKind; ::NMonitoring::TDynamicCounters::TCounterPtr TxReplySizeExceededError; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index b5594b83ad3..e3f6fd077db 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -669,7 +669,7 @@ public: if (req->GetCancelAfterMs()) { maxRunTime = TDuration::MilliSeconds(Min(req->GetCancelAfterMs(), maxRunTime.MilliSeconds())); } - Register(CreateScriptExecutionCreatorActor(std::move(ev), QueryServiceConfig, maxRunTime), TMailboxType::HTSwap, AppData()->SystemPoolId); + Register(CreateScriptExecutionCreatorActor(std::move(ev), QueryServiceConfig, Counters, maxRunTime), TMailboxType::HTSwap, AppData()->SystemPoolId); } } diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index f33e4d100ec..5f9a25b9349 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -326,9 +326,10 @@ private: }; struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecutionActor> { - TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT, TDuration leaseDuration = TDuration::Zero()) + TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT, TDuration leaseDuration = TDuration::Zero()) : Event(std::move(ev)) , QueryServiceConfig(queryServiceConfig) + , Counters(counters) , LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION) , MaxRunTime(maxRunTime) { @@ -346,7 +347,7 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec } // Start request - RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration, QueryServiceConfig)); + RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration, QueryServiceConfig, Counters)); Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration, MaxRunTime)); } @@ -367,6 +368,7 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec private: TEvKqp::TEvScriptRequest::TPtr Event; const NKikimrConfig::TQueryServiceConfig QueryServiceConfig; + TIntrusivePtr<TKqpCounters> Counters; TString ExecutionId; NActors::TActorId RunScriptActorId; const TDuration LeaseDuration; @@ -465,11 +467,13 @@ class TScriptLeaseUpdateActor : public TActorBootstrapped<TScriptLeaseUpdateActo public: using IRetryPolicy = IRetryPolicy<const Ydb::StatusIds::StatusCode&>; - TScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration) + TScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters) : RunScriptActorId(runScriptActorId) , Database(database) , ExecutionId(executionId) , LeaseDuration(leaseDuration) + , Counters(counters) + , LeaseUpdateStartTime(TInstant::Now()) {} void CreateScriptLeaseUpdater() { @@ -510,6 +514,9 @@ public: } void Reply(TEvScriptLeaseUpdateResponse::TPtr&& ev) { + if (Counters) { + Counters->ReportLeaseUpdateLatency(TInstant::Now() - LeaseUpdateStartTime); + } Send(RunScriptActorId, ev->Release().Release()); PassAway(); } @@ -546,6 +553,8 @@ private: TString Database; TString ExecutionId; TDuration LeaseDuration; + TIntrusivePtr<TKqpCounters> Counters; + TInstant LeaseUpdateStartTime; IRetryPolicy::IRetryState::TPtr RetryState = nullptr; }; @@ -2081,8 +2090,8 @@ private: } // anonymous namespace -NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime) { - return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, maxRunTime); +NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime) { + return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, counters, maxRunTime); } NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent) { @@ -2120,8 +2129,8 @@ NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecut return new TCancelScriptExecutionOperationActor(std::move(ev)); } -NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration) { - return new TScriptLeaseUpdateActor(runScriptActorId, database, executionId, leaseDuration); +NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters) { + return new TScriptLeaseUpdateActor(runScriptActorId, database, executionId, leaseDuration, counters); } NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMeta) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 84c6594d31f..cda7cbe179d 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -15,7 +15,7 @@ namespace NKikimr::NKqp { NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent); // Create script execution and run it. -NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT); +NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT); // Operation API impl. NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev); @@ -37,7 +37,8 @@ NActors::IActor* CreateScriptExecutionFinisher( ); // Updates lease deadline in database. -NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration); +NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters); + // Store and fetch results. NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta); NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp index e4ba0d1a738..28ecd4aa7c0 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp @@ -234,7 +234,7 @@ struct TScriptExecutionsYdbSetup { } THolder<TEvScriptLeaseUpdateResponse> UpdateLease(const TString& executionId, TDuration leaseDuration) { - GetRuntime()->Register(CreateScriptLeaseUpdateActor(GetRuntime()->AllocateEdgeActor(), TestDatabase, executionId, leaseDuration)); + GetRuntime()->Register(CreateScriptLeaseUpdateActor(GetRuntime()->AllocateEdgeActor(), TestDatabase, executionId, leaseDuration, nullptr)); auto reply = GetRuntime()->GrabEdgeEvent<TEvScriptLeaseUpdateResponse>(); UNIT_ASSERT(reply != nullptr); diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 37427c93723..df18cb0b808 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -48,13 +48,14 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { }; public: - TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig) + TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters) : ExecutionId(executionId) , Request(request) , Database(database) , LeaseGeneration(leaseGeneration) , LeaseDuration(leaseDuration) , QueryServiceConfig(queryServiceConfig) + , Counters(counters) {} static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR"; @@ -138,14 +139,16 @@ private: } void RunLeaseUpdater() { - Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration)); + Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration, Counters)); LeaseUpdateQueryRunning = true; + Counters->ReportRunActorLeaseUpdateBacklog(TInstant::Now() - LeaseUpdateScheduleTime); } // TODO: remove this after there will be a normal way to store results and generate execution id void Handle(NActors::TEvents::TEvWakeup::TPtr& ev) { switch (ev->Get()->Tag) { case EWakeUp::RunEvent: + LeaseUpdateScheduleTime = TInstant::Now(); Schedule(LeaseDuration / LEASE_UPDATE_FREQUENCY, new NActors::TEvents::TEvWakeup(EWakeUp::UpdateLeaseEvent)); RunState = ERunState::Running; CreateSession(); @@ -198,6 +201,7 @@ private: if (IsExecuting()) { TInstant leaseUpdateTime = ev->Get()->CurrentDeadline - LeaseDuration / LEASE_UPDATE_FREQUENCY; + LeaseUpdateScheduleTime = TInstant::Now(); if (TInstant::Now() >= leaseUpdateTime) { RunLeaseUpdater(); } else { @@ -478,7 +482,9 @@ private: const ui64 LeaseGeneration; const TDuration LeaseDuration; const NKikimrConfig::TQueryServiceConfig QueryServiceConfig; + TIntrusivePtr<TKqpCounters> Counters; TString SessionId; + TInstant LeaseUpdateScheduleTime; bool LeaseUpdateQueryRunning = false; bool FinalStatusIsSaved = false; bool FinishAfterLeaseUpdate = false; @@ -506,8 +512,8 @@ private: } // namespace -NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig) { - return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration, std::move(queryServiceConfig)); +NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig, TIntrusivePtr<TKqpCounters> counters) { + return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration, std::move(queryServiceConfig), counters); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h index aeffbe6c49d..5db2f092a47 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h @@ -1,5 +1,7 @@ #pragma once +#include <ydb/core/kqp/counters/kqp_counters.h> + #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/base/appdata.h> @@ -10,6 +12,6 @@ namespace NKikimr::NKqp { struct TEvKqpRunScriptActor { }; -NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig); +NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig, TIntrusivePtr<TKqpCounters> counters); } // namespace NKikimr::NKqp |
