summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlia Shakhov <[email protected]>2025-05-26 21:12:09 +0300
committerGitHub <[email protected]>2025-05-26 21:12:09 +0300
commit8a6842a271888b4f365c2797e24c93ff239bc8e0 (patch)
tree1d89f89a2987e02b347d9952c15a5c6e5e01bd79
parent5053f6d8c3a36e298fd3806e0f3a02eb0dc01650 (diff)
Make processing subscribers queue more granular (#18790)
-rw-r--r--ydb/core/mind/node_broker.cpp20
-rw-r--r--ydb/core/mind/node_broker__extend_lease.cpp1
-rw-r--r--ydb/core/mind/node_broker__register_node.cpp1
-rw-r--r--ydb/core/mind/node_broker__update_epoch.cpp1
-rw-r--r--ydb/core/mind/node_broker_impl.h1
5 files changed, 13 insertions, 11 deletions
diff --git a/ydb/core/mind/node_broker.cpp b/ydb/core/mind/node_broker.cpp
index 77e980851e4..e979427833d 100644
--- a/ydb/core/mind/node_broker.cpp
+++ b/ydb/core/mind/node_broker.cpp
@@ -459,7 +459,10 @@ void TNodeBroker::ScheduleEpochUpdate(const TActorContext &ctx)
void TNodeBroker::ScheduleProcessSubscribersQueue(const TActorContext &ctx)
{
- ctx.Schedule(TDuration::Seconds(1), new TEvPrivate::TEvProcessSubscribersQueue);
+ if (!ScheduledProcessSubscribersQueue && !SubscribersQueue.Empty()) {
+ ctx.Schedule(TDuration::MilliSeconds(1), new TEvPrivate::TEvProcessSubscribersQueue);
+ ScheduledProcessSubscribersQueue = true;
+ }
}
void TNodeBroker::FillNodeInfo(const TNodeInfo &node,
@@ -1687,19 +1690,14 @@ void TNodeBroker::Handle(TEvPrivate::TEvResolvedRegistrationRequest::TPtr &ev,
void TNodeBroker::Handle(TEvPrivate::TEvProcessSubscribersQueue::TPtr &, const TActorContext &ctx)
{
- constexpr size_t MAX_BATCH_SIZE = 1000;
-
- size_t batchSize = 0;
- while (batchSize < SubscribersQueue.Size() && batchSize < MAX_BATCH_SIZE) {
+ ScheduledProcessSubscribersQueue = false;
+ if (!SubscribersQueue.Empty()) {
auto& subscriber = *SubscribersQueue.Front();
- if (subscriber.SentVersion >= Committed.Epoch.Version) {
- break;
+ if (subscriber.SentVersion < Committed.Epoch.Version) {
+ SendUpdateNodes(subscriber, ctx);
+ ScheduleProcessSubscribersQueue(ctx);
}
- SendUpdateNodes(subscriber, ctx);
- ++batchSize;
}
-
- ScheduleProcessSubscribersQueue(ctx);
}
TNodeBroker::TState::TState(TNodeBroker* self)
diff --git a/ydb/core/mind/node_broker__extend_lease.cpp b/ydb/core/mind/node_broker__extend_lease.cpp
index ed5d6b22f16..de797e0b4c4 100644
--- a/ydb/core/mind/node_broker__extend_lease.cpp
+++ b/ydb/core/mind/node_broker__extend_lease.cpp
@@ -88,6 +88,7 @@ public:
Self->Committed.UpdateEpochVersion();
Self->AddNodeToEpochCache(node);
Self->AddNodeToUpdateNodesLog(node);
+ Self->ScheduleProcessSubscribersQueue(ctx);
}
}
diff --git a/ydb/core/mind/node_broker__register_node.cpp b/ydb/core/mind/node_broker__register_node.cpp
index 2688a945f0c..8d2ddc86c8a 100644
--- a/ydb/core/mind/node_broker__register_node.cpp
+++ b/ydb/core/mind/node_broker__register_node.cpp
@@ -222,6 +222,7 @@ public:
Self->Committed.UpdateEpochVersion();
Self->AddNodeToEpochCache(node);
Self->AddNodeToUpdateNodesLog(node);
+ Self->ScheduleProcessSubscribersQueue(ctx);
}
Reply(ctx);
diff --git a/ydb/core/mind/node_broker__update_epoch.cpp b/ydb/core/mind/node_broker__update_epoch.cpp
index 15ffa583447..b1742cd43f6 100644
--- a/ydb/core/mind/node_broker__update_epoch.cpp
+++ b/ydb/core/mind/node_broker__update_epoch.cpp
@@ -35,6 +35,7 @@ public:
Self->PrepareEpochCache();
Self->PrepareUpdateNodesLog();
Self->ProcessDelayedListNodesRequests();
+ Self->ScheduleProcessSubscribersQueue(ctx);
}
private:
diff --git a/ydb/core/mind/node_broker_impl.h b/ydb/core/mind/node_broker_impl.h
index 546f726009d..4c201ea4423 100644
--- a/ydb/core/mind/node_broker_impl.h
+++ b/ydb/core/mind/node_broker_impl.h
@@ -350,6 +350,7 @@ private:
THashMap<TActorId, TPipeServerInfo> PipeServers;
THashMap<TActorId, TSubscriberInfo> Subscribers;
TIntrusiveList<TSubscriberInfo> SubscribersQueue; // sorted by version
+ bool ScheduledProcessSubscribersQueue = false;
TTabletCountersBase* TabletCounters;
TAutoPtr<TTabletCountersBase> TabletCountersPtr;