summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2023-08-31 14:13:45 +0300
committergrigoriypisar <[email protected]>2023-08-31 14:39:15 +0300
commit3e428d48d3eb01a7ee492f0c28dc96fe6a15f600 (patch)
tree997c014db1803337a68009dbc95dd04c4d2d8493
parent7cf7f8667d7e892b7ee24f2c349373faf4a14fd3 (diff)
Counters in lease updater
Passed counters and normalized lease updates.
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp14
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h7
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp23
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h5
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp2
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp14
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.h4
8 files changed, 55 insertions, 16 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index fdf85085c8f..a01abce3b14 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -738,6 +738,12 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
}
}
+ /* Lease updates counters */
+ LeaseUpdateLatency = KqpGroup->GetHistogram(
+ "LeaseUpdatesLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 1));
+ RunActorLeaseUpdateBacklog = KqpGroup->GetHistogram(
+ "LeaseUpdatesBacklogMs", NMonitoring::LinearHistogram(30, 0, 1000));
+
/* Transactions */
CreateTxKindCounters(TKqpTransactionInfo::EKind::Pure, "Pure");
CreateTxKindCounters(TKqpTransactionInfo::EKind::ReadOnly, "ReadOnly");
@@ -1000,6 +1006,14 @@ void TKqpCounters::ReportTransaction(TKqpDbCountersPtr dbCounters, const TKqpTra
}
}
+void TKqpCounters::ReportLeaseUpdateLatency(const TDuration& duration) {
+ LeaseUpdateLatency->Collect(duration.MilliSeconds());
+}
+
+void TKqpCounters::ReportRunActorLeaseUpdateBacklog(const TDuration& duration) {
+ RunActorLeaseUpdateBacklog->Collect(duration.MilliSeconds());
+}
+
void TKqpCounters::ReportSqlVersion(TKqpDbCountersPtr dbCounters, ui16 sqlVersion) {
TKqpCountersBase::ReportSqlVersion(sqlVersion);
if (dbCounters) {
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index d638e706d76..c7d69185982 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -294,6 +294,9 @@ public:
void ReportSqlVersion(TKqpDbCountersPtr dbCounters, ui16 sqlVersion);
void ReportTransaction(TKqpDbCountersPtr dbCounters, const TKqpTransactionInfo& txInfo);
+ void ReportLeaseUpdateLatency(const TDuration& duration);
+ void ReportRunActorLeaseUpdateBacklog(const TDuration& duration);
+
void ReportWorkerCreated(TKqpDbCountersPtr dbCounters);
void ReportWorkerFinished(TKqpDbCountersPtr dbCounters, TDuration lifeSpan);
void ReportWorkerCleanupLatency(TKqpDbCountersPtr dbCounters, TDuration cleanupTime);
@@ -341,6 +344,10 @@ public:
void RemoveDbCounters(const TString& database);
public:
+ // Lease updates counters
+ ::NMonitoring::THistogramPtr LeaseUpdateLatency;
+ ::NMonitoring::THistogramPtr RunActorLeaseUpdateBacklog;
+
// Transactions
THashMap<TKqpTransactionInfo::EKind, TTxByKindCounters> TxByKind;
::NMonitoring::TDynamicCounters::TCounterPtr TxReplySizeExceededError;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index b5594b83ad3..e3f6fd077db 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -669,7 +669,7 @@ public:
if (req->GetCancelAfterMs()) {
maxRunTime = TDuration::MilliSeconds(Min(req->GetCancelAfterMs(), maxRunTime.MilliSeconds()));
}
- Register(CreateScriptExecutionCreatorActor(std::move(ev), QueryServiceConfig, maxRunTime), TMailboxType::HTSwap, AppData()->SystemPoolId);
+ Register(CreateScriptExecutionCreatorActor(std::move(ev), QueryServiceConfig, Counters, maxRunTime), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index f33e4d100ec..5f9a25b9349 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -326,9 +326,10 @@ private:
};
struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecutionActor> {
- TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT, TDuration leaseDuration = TDuration::Zero())
+ TCreateScriptExecutionActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT, TDuration leaseDuration = TDuration::Zero())
: Event(std::move(ev))
, QueryServiceConfig(queryServiceConfig)
+ , Counters(counters)
, LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION)
, MaxRunTime(maxRunTime)
{
@@ -346,7 +347,7 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
}
// Start request
- RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration, QueryServiceConfig));
+ RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration, QueryServiceConfig, Counters));
Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration, MaxRunTime));
}
@@ -367,6 +368,7 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
private:
TEvKqp::TEvScriptRequest::TPtr Event;
const NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
+ TIntrusivePtr<TKqpCounters> Counters;
TString ExecutionId;
NActors::TActorId RunScriptActorId;
const TDuration LeaseDuration;
@@ -465,11 +467,13 @@ class TScriptLeaseUpdateActor : public TActorBootstrapped<TScriptLeaseUpdateActo
public:
using IRetryPolicy = IRetryPolicy<const Ydb::StatusIds::StatusCode&>;
- TScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration)
+ TScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters)
: RunScriptActorId(runScriptActorId)
, Database(database)
, ExecutionId(executionId)
, LeaseDuration(leaseDuration)
+ , Counters(counters)
+ , LeaseUpdateStartTime(TInstant::Now())
{}
void CreateScriptLeaseUpdater() {
@@ -510,6 +514,9 @@ public:
}
void Reply(TEvScriptLeaseUpdateResponse::TPtr&& ev) {
+ if (Counters) {
+ Counters->ReportLeaseUpdateLatency(TInstant::Now() - LeaseUpdateStartTime);
+ }
Send(RunScriptActorId, ev->Release().Release());
PassAway();
}
@@ -546,6 +553,8 @@ private:
TString Database;
TString ExecutionId;
TDuration LeaseDuration;
+ TIntrusivePtr<TKqpCounters> Counters;
+ TInstant LeaseUpdateStartTime;
IRetryPolicy::IRetryState::TPtr RetryState = nullptr;
};
@@ -2081,8 +2090,8 @@ private:
} // anonymous namespace
-NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime) {
- return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, maxRunTime);
+NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime) {
+ return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, counters, maxRunTime);
}
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent) {
@@ -2120,8 +2129,8 @@ NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecut
return new TCancelScriptExecutionOperationActor(std::move(ev));
}
-NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration) {
- return new TScriptLeaseUpdateActor(runScriptActorId, database, executionId, leaseDuration);
+NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters) {
+ return new TScriptLeaseUpdateActor(runScriptActorId, database, executionId, leaseDuration, counters);
}
NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMeta) {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index 84c6594d31f..cda7cbe179d 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -15,7 +15,7 @@ namespace NKikimr::NKqp {
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent);
// Create script execution and run it.
-NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT);
+NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT);
// Operation API impl.
NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev);
@@ -37,7 +37,8 @@ NActors::IActor* CreateScriptExecutionFinisher(
);
// Updates lease deadline in database.
-NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration);
+NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters);
+
// Store and fetch results.
NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta);
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows);
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
index e4ba0d1a738..28ecd4aa7c0 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
@@ -234,7 +234,7 @@ struct TScriptExecutionsYdbSetup {
}
THolder<TEvScriptLeaseUpdateResponse> UpdateLease(const TString& executionId, TDuration leaseDuration) {
- GetRuntime()->Register(CreateScriptLeaseUpdateActor(GetRuntime()->AllocateEdgeActor(), TestDatabase, executionId, leaseDuration));
+ GetRuntime()->Register(CreateScriptLeaseUpdateActor(GetRuntime()->AllocateEdgeActor(), TestDatabase, executionId, leaseDuration, nullptr));
auto reply = GetRuntime()->GrabEdgeEvent<TEvScriptLeaseUpdateResponse>();
UNIT_ASSERT(reply != nullptr);
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 37427c93723..df18cb0b808 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -48,13 +48,14 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
};
public:
- TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig)
+ TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters)
: ExecutionId(executionId)
, Request(request)
, Database(database)
, LeaseGeneration(leaseGeneration)
, LeaseDuration(leaseDuration)
, QueryServiceConfig(queryServiceConfig)
+ , Counters(counters)
{}
static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR";
@@ -138,14 +139,16 @@ private:
}
void RunLeaseUpdater() {
- Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration));
+ Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration, Counters));
LeaseUpdateQueryRunning = true;
+ Counters->ReportRunActorLeaseUpdateBacklog(TInstant::Now() - LeaseUpdateScheduleTime);
}
// TODO: remove this after there will be a normal way to store results and generate execution id
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev) {
switch (ev->Get()->Tag) {
case EWakeUp::RunEvent:
+ LeaseUpdateScheduleTime = TInstant::Now();
Schedule(LeaseDuration / LEASE_UPDATE_FREQUENCY, new NActors::TEvents::TEvWakeup(EWakeUp::UpdateLeaseEvent));
RunState = ERunState::Running;
CreateSession();
@@ -198,6 +201,7 @@ private:
if (IsExecuting()) {
TInstant leaseUpdateTime = ev->Get()->CurrentDeadline - LeaseDuration / LEASE_UPDATE_FREQUENCY;
+ LeaseUpdateScheduleTime = TInstant::Now();
if (TInstant::Now() >= leaseUpdateTime) {
RunLeaseUpdater();
} else {
@@ -478,7 +482,9 @@ private:
const ui64 LeaseGeneration;
const TDuration LeaseDuration;
const NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
+ TIntrusivePtr<TKqpCounters> Counters;
TString SessionId;
+ TInstant LeaseUpdateScheduleTime;
bool LeaseUpdateQueryRunning = false;
bool FinalStatusIsSaved = false;
bool FinishAfterLeaseUpdate = false;
@@ -506,8 +512,8 @@ private:
} // namespace
-NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig) {
- return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration, std::move(queryServiceConfig));
+NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig, TIntrusivePtr<TKqpCounters> counters) {
+ return new TRunScriptActor(executionId, request, database, leaseGeneration, leaseDuration, std::move(queryServiceConfig), counters);
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
index aeffbe6c49d..5db2f092a47 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
@@ -1,5 +1,7 @@
#pragma once
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/base/appdata.h>
@@ -10,6 +12,6 @@ namespace NKikimr::NKqp {
struct TEvKqpRunScriptActor {
};
-NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig);
+NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, NKikimrConfig::TQueryServiceConfig queryServiceConfig, TIntrusivePtr<TKqpCounters> counters);
} // namespace NKikimr::NKqp