diff options
author | t1mursadykov <t1mursadykov@ydb.tech> | 2022-11-01 19:46:22 +0300 |
---|---|---|
committer | t1mursadykov <t1mursadykov@ydb.tech> | 2022-11-01 19:46:22 +0300 |
commit | 9292fd6cefee14107e345c52c309a4b8f270d0d3 (patch) | |
tree | c2bd2ae74aa028950f5fa329bde5ca2f7d81424d | |
parent | 89a1197513c1d767967e489061db3fc25c56f890 (diff) | |
download | ydb-9292fd6cefee14107e345c52c309a4b8f270d0d3.tar.gz |
Remove InfoCollector creation for each walle task
-rw-r--r-- | ydb/core/cms/cms.cpp | 30 | ||||
-rw-r--r-- | ydb/core/cms/cms.h | 16 | ||||
-rw-r--r-- | ydb/core/cms/cms_impl.h | 10 | ||||
-rw-r--r-- | ydb/core/cms/walle_create_task_adapter.cpp | 10 |
4 files changed, 57 insertions, 9 deletions
diff --git a/ydb/core/cms/cms.cpp b/ydb/core/cms/cms.cpp index 37446cc70eb..6698252bdb9 100644 --- a/ydb/core/cms/cms.cpp +++ b/ydb/core/cms/cms.cpp @@ -1,6 +1,7 @@ #include "cms_impl.h" #include "info_collector.h" #include "library/cpp/actors/core/actor.h" +#include "library/cpp/actors/core/hfunc.h" #include "scheme.h" #include "sentinel.h" #include "erasure_checkers.h" @@ -1313,12 +1314,20 @@ bool TCms::RemoveNotification(const TString &id, const TString &user, bool remov void TCms::EnqueueRequest(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) { - if (Queue.empty()) { - auto collector = CreateInfoCollector(SelfId(), State->Config.InfoCollectionTimeout); - ctx.ExecutorThread.RegisterActor(collector); + if (Queue.empty() && NextQueue.empty()) { + ctx.Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvStartCollecting); } - Queue.push(ev); + NextQueue.push(ev); +} + +void TCms::StartCollecting(const TActorContext &ctx) +{ + Y_VERIFY(Queue.empty()); + std::swap(NextQueue, Queue); + + auto collector = CreateInfoCollector(SelfId(), State->Config.InfoCollectionTimeout); + ctx.ExecutorThread.RegisterActor(collector); } void TCms::CheckAndEnqueueRequest(TEvCms::TEvPermissionRequest::TPtr &ev, const TActorContext &ctx) @@ -1396,6 +1405,11 @@ void TCms::ProcessQueue(const TActorContext &ctx) ProcessRequest(Queue.front(), ctx); Queue.pop(); } + + // Process events received while collecting + if (!NextQueue.empty()) { + StartCollecting(ctx); + } } void TCms::ProcessRequest(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx) @@ -1409,6 +1423,7 @@ void TCms::ProcessRequest(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx) HFuncTraced(TEvCms::TEvNotification, Handle); HFuncTraced(TEvCms::TEvResetMarkerRequest, Handle); HFuncTraced(TEvCms::TEvSetMarkerRequest, Handle); + HFuncTraced(TEvCms::TEvGetClusterInfoRequest, Handle); default: Y_FAIL("Unexpected request type"); @@ -1428,6 +1443,13 @@ void TCms::OnBSCPipeDestroyed(const TActorContext &ctx) ctx.Send(State->Sentinel, new TEvSentinel::TEvBSCPipeDisconnected); } +void TCms::Handle(TEvCms::TEvGetClusterInfoRequest::TPtr &ev, const TActorContext &ctx) { + TAutoPtr<TEvCms::TEvGetClusterInfoResponse> resp = new TEvCms::TEvGetClusterInfoResponse; + resp->Info = ClusterInfo; + + ctx.Send(ev->Sender, resp.Release()); +} + void TCms::Handle(TEvStateStorage::TEvListStateStorageResult::TPtr& ev, const TActorContext &ctx) { auto& info = ev->Get()->Info; if (!info) { diff --git a/ydb/core/cms/cms.h b/ydb/core/cms/cms.h index 9084538a34f..c8826cba52b 100644 --- a/ydb/core/cms/cms.h +++ b/ydb/core/cms/cms.h @@ -74,6 +74,8 @@ struct TEvCms { EvSetMarkerResponse, EvResetMarkerResponse, EvStoreWalleTaskFailed, + EvGetClusterInfoRequest, + EvGetClusterInfoResponse, EvEnd }; @@ -255,6 +257,20 @@ struct TEvCms { return Sprintf("%s { TaskId: %s }", ToStringHeader().data(), TaskId.data()); } }; + + struct TEvGetClusterInfoRequest : public TEventLocal<TEvGetClusterInfoRequest, EvGetClusterInfoRequest> { + TString ToString() const override { + return "Get Cluster Info Request"; + } + }; + + struct TEvGetClusterInfoResponse : public TEventLocal<TEvGetClusterInfoResponse, EvGetClusterInfoResponse> { + TClusterInfoPtr Info; + + TString ToString() const override { + return "Get Cluster Info Response"; + } + }; struct TEvGetConfigRequest : public TEventPB<TEvGetConfigRequest, NKikimrCms::TGetConfigRequest, diff --git a/ydb/core/cms/cms_impl.h b/ydb/core/cms/cms_impl.h index 693c4fa09e3..a4e69f0b8ff 100644 --- a/ydb/core/cms/cms_impl.h +++ b/ydb/core/cms/cms_impl.h @@ -40,6 +40,7 @@ public: EvCleanupWalle, EvLogAndSend, EvCleanupLog, + EvStartCollecting, EvEnd }; @@ -58,6 +59,8 @@ public: struct TEvUpdateClusterInfo : public TEventLocal<TEvUpdateClusterInfo, EvUpdateClusterInfo> {}; + struct TEvStartCollecting : public TEventLocal<TEvStartCollecting, EvStartCollecting> {}; + struct TEvCleanupExpired : public TEventLocal<TEvCleanupExpired, EvCleanupExpired> {}; struct TEvCleanupWalle : public TEventLocal<TEvCleanupWalle, EvCleanupWalle> {}; @@ -217,6 +220,7 @@ private: CFunc(TEvPrivate::EvCleanupExpired, CleanupExpired); CFunc(TEvPrivate::EvCleanupLog, CleanupLog); CFunc(TEvPrivate::EvCleanupWalle, CleanupWalleTasks); + CFunc(TEvPrivate::EvStartCollecting, StartCollecting); FFunc(TEvCms::EvClusterStateRequest, EnqueueRequest); HFunc(TEvCms::TEvPermissionRequest, CheckAndEnqueueRequest); HFunc(TEvCms::TEvManageRequestRequest, Handle); @@ -237,6 +241,7 @@ private: HFunc(TEvCms::TEvSetMarkerRequest, Handle); HFunc(TEvCms::TEvGetLogTailRequest, Handle); HFunc(TEvCms::TEvGetSentinelStateRequest, Handle); + FFunc(TEvCms::EvGetClusterInfoRequest, EnqueueRequest); HFunc(TEvConsole::TEvConfigNotificationRequest, Handle); HFunc(TEvConsole::TEvReplaceConfigSubscriptionsResponse, Handle); HFunc(TEvents::TEvPoisonPill, Handle); @@ -330,6 +335,7 @@ private: void DoPermissionsCleanup(const TActorContext &ctx); void CleanupWalleTasks(const TActorContext &ctx); void RemoveEmptyWalleTasks(const TActorContext &ctx); + void StartCollecting(const TActorContext &ctx); bool CheckNotificationDeadline(const NKikimrCms::TAction &action, TInstant time, TErrorInfo &error, const TActorContext &ctx) const; bool CheckNotificationRestartServices(const NKikimrCms::TAction &action, TInstant time, @@ -394,6 +400,7 @@ private: void Handle(TEvCms::TEvSetMarkerRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvCms::TEvGetLogTailRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvCms::TEvGetSentinelStateRequest::TPtr &ev, const TActorContext &ctx); + void Handle(TEvCms::TEvGetClusterInfoRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvConsole::TEvConfigNotificationRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvConsole::TEvReplaceConfigSubscriptionsResponse::TPtr &ev, @@ -407,7 +414,10 @@ private: TStack<TInstant> ScheduledCleanups; TString NotSupportedReason; TQueue<TAutoPtr<IEventHandle>> InitQueue; + TQueue<TAutoPtr<IEventHandle>> Queue; + TQueue<TAutoPtr<IEventHandle>> NextQueue; + TCmsStatePtr State; TLogger Logger; // Shortcut to State->ClusterInfo. diff --git a/ydb/core/cms/walle_create_task_adapter.cpp b/ydb/core/cms/walle_create_task_adapter.cpp index 46a9fa52061..37035940f0a 100644 --- a/ydb/core/cms/walle_create_task_adapter.cpp +++ b/ydb/core/cms/walle_create_task_adapter.cpp @@ -51,8 +51,7 @@ public: return; } - auto collector = CreateInfoCollector(SelfId(), TDuration::Seconds(15)); - ctx.RegisterWithSameMailbox(collector); + ctx.Send(Cms, new TEvCms::TEvGetClusterInfoRequest); Become(&TThis::StateWork); } @@ -62,8 +61,9 @@ private: { switch (ev->GetTypeRewrite()) { HFunc(TEvCms::TEvPermissionResponse, Handle); - HFunc(TCms::TEvPrivate::TEvClusterInfo, Handle); + HFunc(TEvCms::TEvGetClusterInfoResponse, Handle); CFunc(TEvCms::EvWalleTaskStored, Finish); + HFunc(TEvCms::TEvStoreWalleTaskFailed, Handle); default: LOG_DEBUG(ctx, NKikimrServices::CMS, "TWalleCreateTaskAdapter::StateWork ignored event type: %" PRIx32 " event: %s", @@ -118,9 +118,9 @@ private: } - void Handle(TCms::TEvPrivate::TEvClusterInfo::TPtr &ev, const TActorContext &ctx) + void Handle(TEvCms::TEvGetClusterInfoResponse::TPtr &ev, const TActorContext &ctx) { - if (!ev->Get()->Success) { + if (ev->Get()->Info->IsOutdated()) { ReplyWithErrorAndDie(TStatus::ERROR_TEMP, "Cannot collect cluster info", ctx); return; } |