aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@yandex-team.ru>2022-02-16 13:00:00 +0300
committerhor911 <hor911@yandex-team.ru>2022-02-16 13:00:00 +0300
commit48684e2fe80e81a941bd3f9889f11c9dc9ac7fa4 (patch)
treedbb21247a612a4edd71b3d63ea65338ab46662fc
parent0f46dfd1395d095238ce05a88e7538202f51ae03 (diff)
downloadydb-48684e2fe80e81a941bd3f9889f11c9dc9ac7fa4.tar.gz
Correct metrics cleanup.YQ-694
ref:f43545073880111938bdc44ca026efc0559fe45f
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp120
-rw-r--r--ydb/core/yq/libs/actors/proxy.h3
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp86
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
4 files changed, 115 insertions, 96 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 0084f9c37d2..f849e9aef29 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -59,6 +59,8 @@
#define LOG_E(stream) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Fetcher: " << stream)
+#define LOG_W(stream) \
+ LOG_WARN_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Fetcher: " << stream)
#define LOG_I(stream) \
LOG_INFO_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Fetcher: " << stream)
#define LOG_D(stream) \
@@ -93,9 +95,14 @@ TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TEl
} // namespace
-class TYqlPendingFetcher : public NActors::TActorBootstrapped<TYqlPendingFetcher> {
+constexpr auto WAKEUP_TAG_FETCH = 1;
+constexpr auto WAKEUP_TAG_CLEANUP = 2;
+
+constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60);
+
+class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
public:
- TYqlPendingFetcher(
+ TPendingFetcher(
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const ::NYq::NConfig::TCommonConfig& commonConfig,
const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig,
@@ -147,12 +154,12 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- Become(&TYqlPendingFetcher::StateFunc);
+ Become(&TPendingFetcher::StateFunc);
Y_UNUSED(ctx);
DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory));
- Send(SelfId(), new NActors::TEvents::TEvWakeup());
+ Send(SelfId(), new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH));
LOG_I("STARTED");
LogScope.ConstructInPlace(NActors::TActivationContext::ActorSystem(), NKikimrServices::YQL_PROXY, Guid);
@@ -165,12 +172,33 @@ private:
HasRunningRequest = false;
}
- void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr&, const NActors::TActorContext&) {
- Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup());
-
- if (!HasRunningRequest) {
- HasRunningRequest = true;
- GetPendingTask();
+ void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) {
+ switch(ev->Get()->Tag) {
+ case WAKEUP_TAG_FETCH:
+ Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH));
+ if (!HasRunningRequest) {
+ HasRunningRequest = true;
+ GetPendingTask();
+ }
+ break;
+ case WAKEUP_TAG_CLEANUP:
+ auto now = Now();
+ std::vector<TString> erased;
+ for (auto& [queryId, counters] : CountersMap) {
+ if (counters.RunActorId == TActorId() && counters.CleanupDeadline <= now) {
+ if (counters.RootCountersParent) {
+ counters.RootCountersParent->RemoveSubgroup("query_id", queryId);
+ }
+ if (counters.PublicCountersParent) {
+ counters.PublicCountersParent->RemoveSubgroup("query_id", queryId);
+ }
+ erased.push_back(queryId);
+ }
+ }
+ for (auto q : erased) {
+ CountersMap.erase(q);
+ }
+ break;
}
}
@@ -192,6 +220,29 @@ private:
}
}
+ void HandlePoisonTaken(NActors::TEvents::TEvPoisonTaken::TPtr& ev) {
+ auto runActorId = ev->Sender;
+
+ auto itA = RunActorMap.find(runActorId);
+ if (itA == RunActorMap.end()) {
+ LOG_W("Unknown RunActor " << runActorId << " destroyed");
+ return;
+ }
+ auto queryId = itA->second;
+ RunActorMap.erase(itA);
+
+ auto itC = CountersMap.find(queryId);
+ if (itC != CountersMap.end()) {
+ auto& info = itC->second;
+ if (info.RunActorId == runActorId)
+ {
+ info.RunActorId = TActorId();
+ info.CleanupDeadline = Now() + CLEANUP_PERIOD;
+ Schedule(CLEANUP_PERIOD, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_CLEANUP));
+ }
+ }
+ }
+
void GetPendingTask() {
LOG_D("Request Private::GetTask" << ", Owner: " << Guid << ", Host: " << HostName());
Yq::Private::GetTaskRequest request;
@@ -265,16 +316,39 @@ private:
ClientCounters);
NDq::SetYqlLogLevels(NActors::NLog::PRI_TRACE);
- Register(CreateRunActor(ServiceCounters, std::move(params)));
- }
- STRICT_STFUNC(
- StateFunc,
+ const TVector<TString> path = StringSplitter(params.Scope.ToString()).Split('/').SkipEmpty(); // yandexcloud://{folder_id}
+ const TString folderId = path.size() == 2 && path.front().StartsWith(NYdb::NYq::TScope::YandexCloudScopeSchema)
+ ? path.back() : TString{};
+
+ ::NYq::NCommon::TServiceCounters queryCounters(ServiceCounters);
+ auto publicCountersParent = ServiceCounters.PublicCounters;
+
+ if (params.CloudId && folderId) {
+ publicCountersParent = publicCountersParent->GetSubgroup("cloud_id", params.CloudId)->GetSubgroup("folder_id", folderId);
+ }
+ queryCounters.PublicCounters = publicCountersParent->GetSubgroup("query_id",
+ params.Automatic ? (params.QueryName ? params.QueryName : "automatic") : params.QueryId);
+
+ auto rootCountersParent = ServiceCounters.RootCounters;
+ queryCounters.RootCounters = rootCountersParent->GetSubgroup("query_id",
+ params.Automatic ? (folderId ? "automatic_" + folderId : "automatic") : params.QueryId);
+ queryCounters.Counters = queryCounters.RootCounters;
+
+ auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params)));
- HFunc(NActors::TEvents::TEvWakeup, HandleWakeup)
+ RunActorMap[runActorId] = params.QueryId;
+ if (!params.Automatic) {
+ CountersMap[params.QueryId] = { rootCountersParent, publicCountersParent, runActorId, TInstant::Zero() };
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NActors::TEvents::TEvWakeup, HandleWakeup)
HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
hFunc(TEvGetTaskInternalResponse, HandleResponse)
- );
+ hFunc(NActors::TEvents::TEvPoisonTaken, HandlePoisonTaken)
+ );
NYq::TYqSharedResources::TPtr YqSharedResources;
NYq::NConfig::TCommonConfig CommonConfig;
@@ -306,6 +380,16 @@ private:
TPrivateClient Client;
TMaybe<NYql::NLog::TScopedBackend<NYql::NDq::TYqlLogScope>> LogScope;
+
+ struct TQueryCountersInfo {
+ NMonitoring::TDynamicCounterPtr RootCountersParent;
+ NMonitoring::TDynamicCounterPtr PublicCountersParent;
+ TActorId RunActorId;
+ TInstant CleanupDeadline;
+ };
+
+ TMap<TString, TQueryCountersInfo> CountersMap;
+ TMap<TActorId, TString> RunActorMap;
};
@@ -326,7 +410,7 @@ NActors::IActor* CreatePendingFetcher(
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
const NMonitoring::TDynamicCounterPtr& clientCounters)
{
- return new TYqlPendingFetcher(
+ return new TPendingFetcher(
yqSharedResources,
commonConfig,
checkpointCoordinatorConfig,
@@ -344,7 +428,7 @@ NActors::IActor* CreatePendingFetcher(
clientCounters);
}
-TActorId MakeYqlAnalyticsFetcherId(ui32 nodeId) {
+TActorId MakePendingFetcherId(ui32 nodeId) {
constexpr TStringBuf name = "YQLFETCHER";
return NActors::TActorId(nodeId, name);
}
diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h
index 442d29ac444..c535f76ffb4 100644
--- a/ydb/core/yq/libs/actors/proxy.h
+++ b/ydb/core/yq/libs/actors/proxy.h
@@ -31,7 +31,7 @@ namespace NKikimr {
namespace NYq {
NActors::TActorId MakeYqlAnalyticsHttpProxyId();
-NActors::TActorId MakeYqlAnalyticsFetcherId(ui32 nodeId);
+NActors::TActorId MakePendingFetcherId(ui32 nodeId);
NActors::IActor* CreatePendingFetcher(
const NYq::TYqSharedResources::TPtr& yqSharedResources,
@@ -52,6 +52,7 @@ NActors::IActor* CreatePendingFetcher(
);
NActors::IActor* CreateRunActor(
+ const NActors::TActorId& fetcherId,
const ::NYq::NCommon::TServiceCounters& serviceCounters,
TRunActorParams&& params
);
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index ef272b0404b..0ca3332432e 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -83,53 +83,16 @@ using namespace NActors;
using namespace NYql;
using namespace NDqs;
-class TDeferredCountersCleanupActor : public NActors::TActorBootstrapped<TDeferredCountersCleanupActor> {
-public:
- TDeferredCountersCleanupActor(
- const NMonitoring::TDynamicCounterPtr& rootCountersParent,
- const NMonitoring::TDynamicCounterPtr& publicCountersParent,
- const TString& queryId)
- : RootCountersParent(rootCountersParent)
- , PublicCountersParent(publicCountersParent)
- , QueryId(queryId)
- {
- }
-
- static constexpr char ActorName[] = "YQ_DEFERRED_COUNTERS_CLEANUP";
-
- void Bootstrap() {
- Become(&TDeferredCountersCleanupActor::StateFunc, TDuration::Seconds(60), new NActors::TEvents::TEvWakeup());
- }
-
- STRICT_STFUNC(StateFunc,
- hFunc(NActors::TEvents::TEvWakeup, Handle)
- )
-
- void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
- if (RootCountersParent) {
- RootCountersParent->RemoveSubgroup("query_id", QueryId);
- }
- if (PublicCountersParent) {
- PublicCountersParent->RemoveSubgroup("query_id", QueryId);
- }
- PassAway();
- }
-
-private:
- const NMonitoring::TDynamicCounterPtr RootCountersParent;
- const NMonitoring::TDynamicCounterPtr PublicCountersParent;
- const TString QueryId;
-};
-
class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
public:
explicit TRunActor(
- const ::NYq::NCommon::TServiceCounters& serviceCounters
+ const NActors::TActorId& fetcherId
+ , const ::NYq::NCommon::TServiceCounters& queryCounters
, TRunActorParams&& params)
- : Params(std::move(params))
+ : FetcherId(fetcherId)
+ , Params(std::move(params))
, CreatedAt(TInstant::Now())
- , ServiceCounters(serviceCounters)
- , QueryCounters(serviceCounters)
+ , QueryCounters(queryCounters)
, EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled())
, MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40)
{
@@ -237,13 +200,8 @@ private:
}
void PassAway() override {
- if (!Params.Automatic) {
- // Cleanup non-automatic counters only
- Register(new TDeferredCountersCleanupActor(RootCountersParent, PublicCountersParent, Params.QueryId));
- }
-
+ Send(FetcherId, new NActors::TEvents::TEvPoisonTaken());
KillChildrenActors();
-
NActors::TActorBootstrapped<TRunActor>::PassAway();
}
@@ -833,31 +791,9 @@ private:
return;
}
*/
- PrepareQueryCounters();
RunNextDqGraph();
}
- void PrepareQueryCounters() {
- const TVector<TString> path = StringSplitter(Params.Scope.ToString()).Split('/').SkipEmpty(); // yandexcloud://{folder_id}
- const TString folderId = path.size() == 2 && path.front().StartsWith(NYdb::NYq::TScope::YandexCloudScopeSchema)
- ? path.back() : TString{};
-
-
- QueryCounters = ServiceCounters;
- PublicCountersParent = ServiceCounters.PublicCounters;
-
- if (Params.CloudId && folderId) {
- PublicCountersParent = PublicCountersParent->GetSubgroup("cloud_id", Params.CloudId)->GetSubgroup("folder_id", folderId);
- }
- QueryCounters.PublicCounters = PublicCountersParent->GetSubgroup("query_id",
- Params.Automatic ? (Params.QueryName ? Params.QueryName : "automatic") : Params.QueryId);
-
- RootCountersParent = ServiceCounters.RootCounters;
- QueryCounters.RootCounters = RootCountersParent->GetSubgroup("query_id",
- Params.Automatic ? (folderId ? "automatic_" + folderId : "automatic") : Params.QueryId);
- QueryCounters.Counters = QueryCounters.RootCounters;
- }
-
void RunNextDqGraph() {
auto& dqGraphParams = DqGraphParams.at(DqGraphIndex);
TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>();
@@ -865,7 +801,7 @@ private:
dqConfiguration->FreezeDefaults();
dqConfiguration->FallbackPolicy = "never";
- ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeYqlNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, ServiceCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator));
+ ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeYqlNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator));
NActors::TActorId resultId;
if (dqGraphParams.GetResultType()) {
@@ -1112,7 +1048,6 @@ private:
}
DqGraphIndex = Params.DqGraphIndex;
UpdateResultIndices();
- PrepareQueryCounters();
RunNextDqGraph();
}
@@ -1323,6 +1258,7 @@ private:
}
private:
+ TActorId FetcherId;
TRunActorParams Params;
THashMap<TString, YandexQuery::Connection> YqConnections;
@@ -1337,13 +1273,10 @@ private:
std::vector<NYq::NProto::TGraphParams> DqGraphParams;
std::vector<i32> DqGrapResultIndices;
i32 DqGraphIndex = 0;
- NMonitoring::TDynamicCounterPtr RootCountersParent;
- NMonitoring::TDynamicCounterPtr PublicCountersParent;
NActors::TActorId ExecuterId;
NActors::TActorId ControlId;
NActors::TActorId CheckpointCoordinatorId;
TString SessionId;
- ::NYq::NCommon::TServiceCounters ServiceCounters;
::NYq::NCommon::TServiceCounters QueryCounters;
bool EnableCheckpointCoordinator = false;
bool RetryNeeded = false;
@@ -1376,10 +1309,11 @@ private:
IActor* CreateRunActor(
+ const NActors::TActorId& fetcherId,
const ::NYq::NCommon::TServiceCounters& serviceCounters,
TRunActorParams&& params
) {
- return new TRunActor(serviceCounters, std::move(params));
+ return new TRunActor(fetcherId, serviceCounters, std::move(params));
}
} /* NYq */
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index d66ac202bfa..4faa00b74aa 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -256,7 +256,7 @@ void Init(
clientCounters
);
- actorRegistrator(MakeYqlAnalyticsFetcherId(nodeId), fetcher);
+ actorRegistrator(MakePendingFetcherId(nodeId), fetcher);
}
if (protoConfig.GetPrivateProxy().GetEnabled()) {