diff options
author | hor911 <hor911@yandex-team.ru> | 2022-02-16 13:00:00 +0300 |
---|---|---|
committer | hor911 <hor911@yandex-team.ru> | 2022-02-16 13:00:00 +0300 |
commit | 48684e2fe80e81a941bd3f9889f11c9dc9ac7fa4 (patch) | |
tree | dbb21247a612a4edd71b3d63ea65338ab46662fc | |
parent | 0f46dfd1395d095238ce05a88e7538202f51ae03 (diff) | |
download | ydb-48684e2fe80e81a941bd3f9889f11c9dc9ac7fa4.tar.gz |
Correct metrics cleanup.YQ-694
ref:f43545073880111938bdc44ca026efc0559fe45f
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 120 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/proxy.h | 3 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 86 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 2 |
4 files changed, 115 insertions, 96 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 0084f9c37d2..f849e9aef29 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -59,6 +59,8 @@ #define LOG_E(stream) \ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Fetcher: " << stream) +#define LOG_W(stream) \ + LOG_WARN_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Fetcher: " << stream) #define LOG_I(stream) \ LOG_INFO_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Fetcher: " << stream) #define LOG_D(stream) \ @@ -93,9 +95,14 @@ TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TEl } // namespace -class TYqlPendingFetcher : public NActors::TActorBootstrapped<TYqlPendingFetcher> { +constexpr auto WAKEUP_TAG_FETCH = 1; +constexpr auto WAKEUP_TAG_CLEANUP = 2; + +constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60); + +class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> { public: - TYqlPendingFetcher( + TPendingFetcher( const NYq::TYqSharedResources::TPtr& yqSharedResources, const ::NYq::NConfig::TCommonConfig& commonConfig, const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, @@ -147,12 +154,12 @@ public: } void Bootstrap(const TActorContext& ctx) { - Become(&TYqlPendingFetcher::StateFunc); + Become(&TPendingFetcher::StateFunc); Y_UNUSED(ctx); DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory)); - Send(SelfId(), new NActors::TEvents::TEvWakeup()); + Send(SelfId(), new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH)); LOG_I("STARTED"); LogScope.ConstructInPlace(NActors::TActivationContext::ActorSystem(), NKikimrServices::YQL_PROXY, Guid); @@ -165,12 +172,33 @@ private: HasRunningRequest = false; } - void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr&, const NActors::TActorContext&) { - Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup()); - - if (!HasRunningRequest) { - HasRunningRequest = true; - GetPendingTask(); + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { + switch(ev->Get()->Tag) { + case WAKEUP_TAG_FETCH: + Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH)); + if (!HasRunningRequest) { + HasRunningRequest = true; + GetPendingTask(); + } + break; + case WAKEUP_TAG_CLEANUP: + auto now = Now(); + std::vector<TString> erased; + for (auto& [queryId, counters] : CountersMap) { + if (counters.RunActorId == TActorId() && counters.CleanupDeadline <= now) { + if (counters.RootCountersParent) { + counters.RootCountersParent->RemoveSubgroup("query_id", queryId); + } + if (counters.PublicCountersParent) { + counters.PublicCountersParent->RemoveSubgroup("query_id", queryId); + } + erased.push_back(queryId); + } + } + for (auto q : erased) { + CountersMap.erase(q); + } + break; } } @@ -192,6 +220,29 @@ private: } } + void HandlePoisonTaken(NActors::TEvents::TEvPoisonTaken::TPtr& ev) { + auto runActorId = ev->Sender; + + auto itA = RunActorMap.find(runActorId); + if (itA == RunActorMap.end()) { + LOG_W("Unknown RunActor " << runActorId << " destroyed"); + return; + } + auto queryId = itA->second; + RunActorMap.erase(itA); + + auto itC = CountersMap.find(queryId); + if (itC != CountersMap.end()) { + auto& info = itC->second; + if (info.RunActorId == runActorId) + { + info.RunActorId = TActorId(); + info.CleanupDeadline = Now() + CLEANUP_PERIOD; + Schedule(CLEANUP_PERIOD, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_CLEANUP)); + } + } + } + void GetPendingTask() { LOG_D("Request Private::GetTask" << ", Owner: " << Guid << ", Host: " << HostName()); Yq::Private::GetTaskRequest request; @@ -265,16 +316,39 @@ private: ClientCounters); NDq::SetYqlLogLevels(NActors::NLog::PRI_TRACE); - Register(CreateRunActor(ServiceCounters, std::move(params))); - } - STRICT_STFUNC( - StateFunc, + const TVector<TString> path = StringSplitter(params.Scope.ToString()).Split('/').SkipEmpty(); // yandexcloud://{folder_id} + const TString folderId = path.size() == 2 && path.front().StartsWith(NYdb::NYq::TScope::YandexCloudScopeSchema) + ? path.back() : TString{}; + + ::NYq::NCommon::TServiceCounters queryCounters(ServiceCounters); + auto publicCountersParent = ServiceCounters.PublicCounters; + + if (params.CloudId && folderId) { + publicCountersParent = publicCountersParent->GetSubgroup("cloud_id", params.CloudId)->GetSubgroup("folder_id", folderId); + } + queryCounters.PublicCounters = publicCountersParent->GetSubgroup("query_id", + params.Automatic ? (params.QueryName ? params.QueryName : "automatic") : params.QueryId); + + auto rootCountersParent = ServiceCounters.RootCounters; + queryCounters.RootCounters = rootCountersParent->GetSubgroup("query_id", + params.Automatic ? (folderId ? "automatic_" + folderId : "automatic") : params.QueryId); + queryCounters.Counters = queryCounters.RootCounters; + + auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params))); - HFunc(NActors::TEvents::TEvWakeup, HandleWakeup) + RunActorMap[runActorId] = params.QueryId; + if (!params.Automatic) { + CountersMap[params.QueryId] = { rootCountersParent, publicCountersParent, runActorId, TInstant::Zero() }; + } + } + + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvWakeup, HandleWakeup) HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) hFunc(TEvGetTaskInternalResponse, HandleResponse) - ); + hFunc(NActors::TEvents::TEvPoisonTaken, HandlePoisonTaken) + ); NYq::TYqSharedResources::TPtr YqSharedResources; NYq::NConfig::TCommonConfig CommonConfig; @@ -306,6 +380,16 @@ private: TPrivateClient Client; TMaybe<NYql::NLog::TScopedBackend<NYql::NDq::TYqlLogScope>> LogScope; + + struct TQueryCountersInfo { + NMonitoring::TDynamicCounterPtr RootCountersParent; + NMonitoring::TDynamicCounterPtr PublicCountersParent; + TActorId RunActorId; + TInstant CleanupDeadline; + }; + + TMap<TString, TQueryCountersInfo> CountersMap; + TMap<TActorId, TString> RunActorMap; }; @@ -326,7 +410,7 @@ NActors::IActor* CreatePendingFetcher( ::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections, const NMonitoring::TDynamicCounterPtr& clientCounters) { - return new TYqlPendingFetcher( + return new TPendingFetcher( yqSharedResources, commonConfig, checkpointCoordinatorConfig, @@ -344,7 +428,7 @@ NActors::IActor* CreatePendingFetcher( clientCounters); } -TActorId MakeYqlAnalyticsFetcherId(ui32 nodeId) { +TActorId MakePendingFetcherId(ui32 nodeId) { constexpr TStringBuf name = "YQLFETCHER"; return NActors::TActorId(nodeId, name); } diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h index 442d29ac444..c535f76ffb4 100644 --- a/ydb/core/yq/libs/actors/proxy.h +++ b/ydb/core/yq/libs/actors/proxy.h @@ -31,7 +31,7 @@ namespace NKikimr { namespace NYq { NActors::TActorId MakeYqlAnalyticsHttpProxyId(); -NActors::TActorId MakeYqlAnalyticsFetcherId(ui32 nodeId); +NActors::TActorId MakePendingFetcherId(ui32 nodeId); NActors::IActor* CreatePendingFetcher( const NYq::TYqSharedResources::TPtr& yqSharedResources, @@ -52,6 +52,7 @@ NActors::IActor* CreatePendingFetcher( ); NActors::IActor* CreateRunActor( + const NActors::TActorId& fetcherId, const ::NYq::NCommon::TServiceCounters& serviceCounters, TRunActorParams&& params ); diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index ef272b0404b..0ca3332432e 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -83,53 +83,16 @@ using namespace NActors; using namespace NYql; using namespace NDqs; -class TDeferredCountersCleanupActor : public NActors::TActorBootstrapped<TDeferredCountersCleanupActor> { -public: - TDeferredCountersCleanupActor( - const NMonitoring::TDynamicCounterPtr& rootCountersParent, - const NMonitoring::TDynamicCounterPtr& publicCountersParent, - const TString& queryId) - : RootCountersParent(rootCountersParent) - , PublicCountersParent(publicCountersParent) - , QueryId(queryId) - { - } - - static constexpr char ActorName[] = "YQ_DEFERRED_COUNTERS_CLEANUP"; - - void Bootstrap() { - Become(&TDeferredCountersCleanupActor::StateFunc, TDuration::Seconds(60), new NActors::TEvents::TEvWakeup()); - } - - STRICT_STFUNC(StateFunc, - hFunc(NActors::TEvents::TEvWakeup, Handle) - ) - - void Handle(NActors::TEvents::TEvWakeup::TPtr&) { - if (RootCountersParent) { - RootCountersParent->RemoveSubgroup("query_id", QueryId); - } - if (PublicCountersParent) { - PublicCountersParent->RemoveSubgroup("query_id", QueryId); - } - PassAway(); - } - -private: - const NMonitoring::TDynamicCounterPtr RootCountersParent; - const NMonitoring::TDynamicCounterPtr PublicCountersParent; - const TString QueryId; -}; - class TRunActor : public NActors::TActorBootstrapped<TRunActor> { public: explicit TRunActor( - const ::NYq::NCommon::TServiceCounters& serviceCounters + const NActors::TActorId& fetcherId + , const ::NYq::NCommon::TServiceCounters& queryCounters , TRunActorParams&& params) - : Params(std::move(params)) + : FetcherId(fetcherId) + , Params(std::move(params)) , CreatedAt(TInstant::Now()) - , ServiceCounters(serviceCounters) - , QueryCounters(serviceCounters) + , QueryCounters(queryCounters) , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled()) , MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40) { @@ -237,13 +200,8 @@ private: } void PassAway() override { - if (!Params.Automatic) { - // Cleanup non-automatic counters only - Register(new TDeferredCountersCleanupActor(RootCountersParent, PublicCountersParent, Params.QueryId)); - } - + Send(FetcherId, new NActors::TEvents::TEvPoisonTaken()); KillChildrenActors(); - NActors::TActorBootstrapped<TRunActor>::PassAway(); } @@ -833,31 +791,9 @@ private: return; } */ - PrepareQueryCounters(); RunNextDqGraph(); } - void PrepareQueryCounters() { - const TVector<TString> path = StringSplitter(Params.Scope.ToString()).Split('/').SkipEmpty(); // yandexcloud://{folder_id} - const TString folderId = path.size() == 2 && path.front().StartsWith(NYdb::NYq::TScope::YandexCloudScopeSchema) - ? path.back() : TString{}; - - - QueryCounters = ServiceCounters; - PublicCountersParent = ServiceCounters.PublicCounters; - - if (Params.CloudId && folderId) { - PublicCountersParent = PublicCountersParent->GetSubgroup("cloud_id", Params.CloudId)->GetSubgroup("folder_id", folderId); - } - QueryCounters.PublicCounters = PublicCountersParent->GetSubgroup("query_id", - Params.Automatic ? (Params.QueryName ? Params.QueryName : "automatic") : Params.QueryId); - - RootCountersParent = ServiceCounters.RootCounters; - QueryCounters.RootCounters = RootCountersParent->GetSubgroup("query_id", - Params.Automatic ? (folderId ? "automatic_" + folderId : "automatic") : Params.QueryId); - QueryCounters.Counters = QueryCounters.RootCounters; - } - void RunNextDqGraph() { auto& dqGraphParams = DqGraphParams.at(DqGraphIndex); TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); @@ -865,7 +801,7 @@ private: dqConfiguration->FreezeDefaults(); dqConfiguration->FallbackPolicy = "never"; - ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeYqlNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, ServiceCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator)); + ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeYqlNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator)); NActors::TActorId resultId; if (dqGraphParams.GetResultType()) { @@ -1112,7 +1048,6 @@ private: } DqGraphIndex = Params.DqGraphIndex; UpdateResultIndices(); - PrepareQueryCounters(); RunNextDqGraph(); } @@ -1323,6 +1258,7 @@ private: } private: + TActorId FetcherId; TRunActorParams Params; THashMap<TString, YandexQuery::Connection> YqConnections; @@ -1337,13 +1273,10 @@ private: std::vector<NYq::NProto::TGraphParams> DqGraphParams; std::vector<i32> DqGrapResultIndices; i32 DqGraphIndex = 0; - NMonitoring::TDynamicCounterPtr RootCountersParent; - NMonitoring::TDynamicCounterPtr PublicCountersParent; NActors::TActorId ExecuterId; NActors::TActorId ControlId; NActors::TActorId CheckpointCoordinatorId; TString SessionId; - ::NYq::NCommon::TServiceCounters ServiceCounters; ::NYq::NCommon::TServiceCounters QueryCounters; bool EnableCheckpointCoordinator = false; bool RetryNeeded = false; @@ -1376,10 +1309,11 @@ private: IActor* CreateRunActor( + const NActors::TActorId& fetcherId, const ::NYq::NCommon::TServiceCounters& serviceCounters, TRunActorParams&& params ) { - return new TRunActor(serviceCounters, std::move(params)); + return new TRunActor(fetcherId, serviceCounters, std::move(params)); } } /* NYq */ diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index d66ac202bfa..4faa00b74aa 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -256,7 +256,7 @@ void Init( clientCounters ); - actorRegistrator(MakeYqlAnalyticsFetcherId(nodeId), fetcher); + actorRegistrator(MakePendingFetcherId(nodeId), fetcher); } if (protoConfig.GetPrivateProxy().GetEnabled()) { |