aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authort1mursadykov <t1mursadykov@ydb.tech>2022-11-01 19:46:22 +0300
committert1mursadykov <t1mursadykov@ydb.tech>2022-11-01 19:46:22 +0300
commit9292fd6cefee14107e345c52c309a4b8f270d0d3 (patch)
treec2bd2ae74aa028950f5fa329bde5ca2f7d81424d
parent89a1197513c1d767967e489061db3fc25c56f890 (diff)
downloadydb-9292fd6cefee14107e345c52c309a4b8f270d0d3.tar.gz
Remove InfoCollector creation for each walle task
-rw-r--r--ydb/core/cms/cms.cpp30
-rw-r--r--ydb/core/cms/cms.h16
-rw-r--r--ydb/core/cms/cms_impl.h10
-rw-r--r--ydb/core/cms/walle_create_task_adapter.cpp10
4 files changed, 57 insertions, 9 deletions
diff --git a/ydb/core/cms/cms.cpp b/ydb/core/cms/cms.cpp
index 37446cc70eb..6698252bdb9 100644
--- a/ydb/core/cms/cms.cpp
+++ b/ydb/core/cms/cms.cpp
@@ -1,6 +1,7 @@
#include "cms_impl.h"
#include "info_collector.h"
#include "library/cpp/actors/core/actor.h"
+#include "library/cpp/actors/core/hfunc.h"
#include "scheme.h"
#include "sentinel.h"
#include "erasure_checkers.h"
@@ -1313,12 +1314,20 @@ bool TCms::RemoveNotification(const TString &id, const TString &user, bool remov
void TCms::EnqueueRequest(TAutoPtr<IEventHandle> ev, const TActorContext &ctx)
{
- if (Queue.empty()) {
- auto collector = CreateInfoCollector(SelfId(), State->Config.InfoCollectionTimeout);
- ctx.ExecutorThread.RegisterActor(collector);
+ if (Queue.empty() && NextQueue.empty()) {
+ ctx.Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvStartCollecting);
}
- Queue.push(ev);
+ NextQueue.push(ev);
+}
+
+void TCms::StartCollecting(const TActorContext &ctx)
+{
+ Y_VERIFY(Queue.empty());
+ std::swap(NextQueue, Queue);
+
+ auto collector = CreateInfoCollector(SelfId(), State->Config.InfoCollectionTimeout);
+ ctx.ExecutorThread.RegisterActor(collector);
}
void TCms::CheckAndEnqueueRequest(TEvCms::TEvPermissionRequest::TPtr &ev, const TActorContext &ctx)
@@ -1396,6 +1405,11 @@ void TCms::ProcessQueue(const TActorContext &ctx)
ProcessRequest(Queue.front(), ctx);
Queue.pop();
}
+
+ // Process events received while collecting
+ if (!NextQueue.empty()) {
+ StartCollecting(ctx);
+ }
}
void TCms::ProcessRequest(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx)
@@ -1409,6 +1423,7 @@ void TCms::ProcessRequest(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx)
HFuncTraced(TEvCms::TEvNotification, Handle);
HFuncTraced(TEvCms::TEvResetMarkerRequest, Handle);
HFuncTraced(TEvCms::TEvSetMarkerRequest, Handle);
+ HFuncTraced(TEvCms::TEvGetClusterInfoRequest, Handle);
default:
Y_FAIL("Unexpected request type");
@@ -1428,6 +1443,13 @@ void TCms::OnBSCPipeDestroyed(const TActorContext &ctx)
ctx.Send(State->Sentinel, new TEvSentinel::TEvBSCPipeDisconnected);
}
+void TCms::Handle(TEvCms::TEvGetClusterInfoRequest::TPtr &ev, const TActorContext &ctx) {
+ TAutoPtr<TEvCms::TEvGetClusterInfoResponse> resp = new TEvCms::TEvGetClusterInfoResponse;
+ resp->Info = ClusterInfo;
+
+ ctx.Send(ev->Sender, resp.Release());
+}
+
void TCms::Handle(TEvStateStorage::TEvListStateStorageResult::TPtr& ev, const TActorContext &ctx) {
auto& info = ev->Get()->Info;
if (!info) {
diff --git a/ydb/core/cms/cms.h b/ydb/core/cms/cms.h
index 9084538a34f..c8826cba52b 100644
--- a/ydb/core/cms/cms.h
+++ b/ydb/core/cms/cms.h
@@ -74,6 +74,8 @@ struct TEvCms {
EvSetMarkerResponse,
EvResetMarkerResponse,
EvStoreWalleTaskFailed,
+ EvGetClusterInfoRequest,
+ EvGetClusterInfoResponse,
EvEnd
};
@@ -255,6 +257,20 @@ struct TEvCms {
return Sprintf("%s { TaskId: %s }", ToStringHeader().data(), TaskId.data());
}
};
+
+ struct TEvGetClusterInfoRequest : public TEventLocal<TEvGetClusterInfoRequest, EvGetClusterInfoRequest> {
+ TString ToString() const override {
+ return "Get Cluster Info Request";
+ }
+ };
+
+ struct TEvGetClusterInfoResponse : public TEventLocal<TEvGetClusterInfoResponse, EvGetClusterInfoResponse> {
+ TClusterInfoPtr Info;
+
+ TString ToString() const override {
+ return "Get Cluster Info Response";
+ }
+ };
struct TEvGetConfigRequest : public TEventPB<TEvGetConfigRequest,
NKikimrCms::TGetConfigRequest,
diff --git a/ydb/core/cms/cms_impl.h b/ydb/core/cms/cms_impl.h
index 693c4fa09e3..a4e69f0b8ff 100644
--- a/ydb/core/cms/cms_impl.h
+++ b/ydb/core/cms/cms_impl.h
@@ -40,6 +40,7 @@ public:
EvCleanupWalle,
EvLogAndSend,
EvCleanupLog,
+ EvStartCollecting,
EvEnd
};
@@ -58,6 +59,8 @@ public:
struct TEvUpdateClusterInfo : public TEventLocal<TEvUpdateClusterInfo, EvUpdateClusterInfo> {};
+ struct TEvStartCollecting : public TEventLocal<TEvStartCollecting, EvStartCollecting> {};
+
struct TEvCleanupExpired : public TEventLocal<TEvCleanupExpired, EvCleanupExpired> {};
struct TEvCleanupWalle : public TEventLocal<TEvCleanupWalle, EvCleanupWalle> {};
@@ -217,6 +220,7 @@ private:
CFunc(TEvPrivate::EvCleanupExpired, CleanupExpired);
CFunc(TEvPrivate::EvCleanupLog, CleanupLog);
CFunc(TEvPrivate::EvCleanupWalle, CleanupWalleTasks);
+ CFunc(TEvPrivate::EvStartCollecting, StartCollecting);
FFunc(TEvCms::EvClusterStateRequest, EnqueueRequest);
HFunc(TEvCms::TEvPermissionRequest, CheckAndEnqueueRequest);
HFunc(TEvCms::TEvManageRequestRequest, Handle);
@@ -237,6 +241,7 @@ private:
HFunc(TEvCms::TEvSetMarkerRequest, Handle);
HFunc(TEvCms::TEvGetLogTailRequest, Handle);
HFunc(TEvCms::TEvGetSentinelStateRequest, Handle);
+ FFunc(TEvCms::EvGetClusterInfoRequest, EnqueueRequest);
HFunc(TEvConsole::TEvConfigNotificationRequest, Handle);
HFunc(TEvConsole::TEvReplaceConfigSubscriptionsResponse, Handle);
HFunc(TEvents::TEvPoisonPill, Handle);
@@ -330,6 +335,7 @@ private:
void DoPermissionsCleanup(const TActorContext &ctx);
void CleanupWalleTasks(const TActorContext &ctx);
void RemoveEmptyWalleTasks(const TActorContext &ctx);
+ void StartCollecting(const TActorContext &ctx);
bool CheckNotificationDeadline(const NKikimrCms::TAction &action, TInstant time,
TErrorInfo &error, const TActorContext &ctx) const;
bool CheckNotificationRestartServices(const NKikimrCms::TAction &action, TInstant time,
@@ -394,6 +400,7 @@ private:
void Handle(TEvCms::TEvSetMarkerRequest::TPtr &ev, const TActorContext &ctx);
void Handle(TEvCms::TEvGetLogTailRequest::TPtr &ev, const TActorContext &ctx);
void Handle(TEvCms::TEvGetSentinelStateRequest::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvCms::TEvGetClusterInfoRequest::TPtr &ev, const TActorContext &ctx);
void Handle(TEvConsole::TEvConfigNotificationRequest::TPtr &ev,
const TActorContext &ctx);
void Handle(TEvConsole::TEvReplaceConfigSubscriptionsResponse::TPtr &ev,
@@ -407,7 +414,10 @@ private:
TStack<TInstant> ScheduledCleanups;
TString NotSupportedReason;
TQueue<TAutoPtr<IEventHandle>> InitQueue;
+
TQueue<TAutoPtr<IEventHandle>> Queue;
+ TQueue<TAutoPtr<IEventHandle>> NextQueue;
+
TCmsStatePtr State;
TLogger Logger;
// Shortcut to State->ClusterInfo.
diff --git a/ydb/core/cms/walle_create_task_adapter.cpp b/ydb/core/cms/walle_create_task_adapter.cpp
index 46a9fa52061..37035940f0a 100644
--- a/ydb/core/cms/walle_create_task_adapter.cpp
+++ b/ydb/core/cms/walle_create_task_adapter.cpp
@@ -51,8 +51,7 @@ public:
return;
}
- auto collector = CreateInfoCollector(SelfId(), TDuration::Seconds(15));
- ctx.RegisterWithSameMailbox(collector);
+ ctx.Send(Cms, new TEvCms::TEvGetClusterInfoRequest);
Become(&TThis::StateWork);
}
@@ -62,8 +61,9 @@ private:
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvCms::TEvPermissionResponse, Handle);
- HFunc(TCms::TEvPrivate::TEvClusterInfo, Handle);
+ HFunc(TEvCms::TEvGetClusterInfoResponse, Handle);
CFunc(TEvCms::EvWalleTaskStored, Finish);
+ HFunc(TEvCms::TEvStoreWalleTaskFailed, Handle);
default:
LOG_DEBUG(ctx, NKikimrServices::CMS,
"TWalleCreateTaskAdapter::StateWork ignored event type: %" PRIx32 " event: %s",
@@ -118,9 +118,9 @@ private:
}
- void Handle(TCms::TEvPrivate::TEvClusterInfo::TPtr &ev, const TActorContext &ctx)
+ void Handle(TEvCms::TEvGetClusterInfoResponse::TPtr &ev, const TActorContext &ctx)
{
- if (!ev->Get()->Success) {
+ if (ev->Get()->Info->IsOutdated()) {
ReplyWithErrorAndDie(TStatus::ERROR_TEMP, "Cannot collect cluster info", ctx);
return;
}