diff options
| author | Ilia Shakhov <[email protected]> | 2025-05-26 21:12:09 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-05-26 21:12:09 +0300 |
| commit | 8a6842a271888b4f365c2797e24c93ff239bc8e0 (patch) | |
| tree | 1d89f89a2987e02b347d9952c15a5c6e5e01bd79 | |
| parent | 5053f6d8c3a36e298fd3806e0f3a02eb0dc01650 (diff) | |
Make processing subscribers queue more granular (#18790)
| -rw-r--r-- | ydb/core/mind/node_broker.cpp | 20 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker__extend_lease.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker__register_node.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker__update_epoch.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker_impl.h | 1 |
5 files changed, 13 insertions, 11 deletions
diff --git a/ydb/core/mind/node_broker.cpp b/ydb/core/mind/node_broker.cpp index 77e980851e4..e979427833d 100644 --- a/ydb/core/mind/node_broker.cpp +++ b/ydb/core/mind/node_broker.cpp @@ -459,7 +459,10 @@ void TNodeBroker::ScheduleEpochUpdate(const TActorContext &ctx) void TNodeBroker::ScheduleProcessSubscribersQueue(const TActorContext &ctx) { - ctx.Schedule(TDuration::Seconds(1), new TEvPrivate::TEvProcessSubscribersQueue); + if (!ScheduledProcessSubscribersQueue && !SubscribersQueue.Empty()) { + ctx.Schedule(TDuration::MilliSeconds(1), new TEvPrivate::TEvProcessSubscribersQueue); + ScheduledProcessSubscribersQueue = true; + } } void TNodeBroker::FillNodeInfo(const TNodeInfo &node, @@ -1687,19 +1690,14 @@ void TNodeBroker::Handle(TEvPrivate::TEvResolvedRegistrationRequest::TPtr &ev, void TNodeBroker::Handle(TEvPrivate::TEvProcessSubscribersQueue::TPtr &, const TActorContext &ctx) { - constexpr size_t MAX_BATCH_SIZE = 1000; - - size_t batchSize = 0; - while (batchSize < SubscribersQueue.Size() && batchSize < MAX_BATCH_SIZE) { + ScheduledProcessSubscribersQueue = false; + if (!SubscribersQueue.Empty()) { auto& subscriber = *SubscribersQueue.Front(); - if (subscriber.SentVersion >= Committed.Epoch.Version) { - break; + if (subscriber.SentVersion < Committed.Epoch.Version) { + SendUpdateNodes(subscriber, ctx); + ScheduleProcessSubscribersQueue(ctx); } - SendUpdateNodes(subscriber, ctx); - ++batchSize; } - - ScheduleProcessSubscribersQueue(ctx); } TNodeBroker::TState::TState(TNodeBroker* self) diff --git a/ydb/core/mind/node_broker__extend_lease.cpp b/ydb/core/mind/node_broker__extend_lease.cpp index ed5d6b22f16..de797e0b4c4 100644 --- a/ydb/core/mind/node_broker__extend_lease.cpp +++ b/ydb/core/mind/node_broker__extend_lease.cpp @@ -88,6 +88,7 @@ public: Self->Committed.UpdateEpochVersion(); Self->AddNodeToEpochCache(node); Self->AddNodeToUpdateNodesLog(node); + Self->ScheduleProcessSubscribersQueue(ctx); } } diff --git a/ydb/core/mind/node_broker__register_node.cpp b/ydb/core/mind/node_broker__register_node.cpp index 2688a945f0c..8d2ddc86c8a 100644 --- a/ydb/core/mind/node_broker__register_node.cpp +++ b/ydb/core/mind/node_broker__register_node.cpp @@ -222,6 +222,7 @@ public: Self->Committed.UpdateEpochVersion(); Self->AddNodeToEpochCache(node); Self->AddNodeToUpdateNodesLog(node); + Self->ScheduleProcessSubscribersQueue(ctx); } Reply(ctx); diff --git a/ydb/core/mind/node_broker__update_epoch.cpp b/ydb/core/mind/node_broker__update_epoch.cpp index 15ffa583447..b1742cd43f6 100644 --- a/ydb/core/mind/node_broker__update_epoch.cpp +++ b/ydb/core/mind/node_broker__update_epoch.cpp @@ -35,6 +35,7 @@ public: Self->PrepareEpochCache(); Self->PrepareUpdateNodesLog(); Self->ProcessDelayedListNodesRequests(); + Self->ScheduleProcessSubscribersQueue(ctx); } private: diff --git a/ydb/core/mind/node_broker_impl.h b/ydb/core/mind/node_broker_impl.h index 546f726009d..4c201ea4423 100644 --- a/ydb/core/mind/node_broker_impl.h +++ b/ydb/core/mind/node_broker_impl.h @@ -350,6 +350,7 @@ private: THashMap<TActorId, TPipeServerInfo> PipeServers; THashMap<TActorId, TSubscriberInfo> Subscribers; TIntrusiveList<TSubscriberInfo> SubscribersQueue; // sorted by version + bool ScheduledProcessSubscribersQueue = false; TTabletCountersBase* TabletCounters; TAutoPtr<TTabletCountersBase> TabletCountersPtr; |
