diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-18 18:40:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-18 18:40:09 +0300 |
commit | 12328e5654d72ab12dd1d4af6e88426969fd33e9 (patch) | |
tree | 77ad874a96e7890d4f95674cd031c533f0400682 | |
parent | b3825bd66b590d894faeb04a3db47e4c82bdc0d8 (diff) | |
download | ydb-12328e5654d72ab12dd1d4af6e88426969fd33e9.tar.gz |
22-2: Batch processing of sending notifications upon commit KIKIMR-14597
merge from trunk: r9294377
REVIEW: 2427442
x-ydb-stable-ref: 57b7333387ffa67d0c728f84e8832ab205ecada5
-rw-r--r-- | ydb/core/tx/scheme_board/replica.cpp | 78 |
1 files changed, 72 insertions, 6 deletions
diff --git a/ydb/core/tx/scheme_board/replica.cpp b/ydb/core/tx/scheme_board/replica.cpp index 638656e91d..7aa4b2d4d3 100644 --- a/ydb/core/tx/scheme_board/replica.cpp +++ b/ydb/core/tx/scheme_board/replica.cpp @@ -36,6 +36,26 @@ class TReplica: public TMonitorableActor<TReplica> { using TDescribeSchemeResult = NKikimrScheme::TEvDescribeSchemeResult; using TCapabilities = NKikimrSchemeBoard::TEvSubscribe::TCapabilities; + struct TEvPrivate { + enum EEv { + EvSendStrongNotifications = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)"); + + struct TEvSendStrongNotifications: public TEventLocal<TEvSendStrongNotifications, EvSendStrongNotifications> { + static constexpr ui32 BatchSize = 1000; + const ui64 Owner; + + explicit TEvSendStrongNotifications(ui64 owner) + : Owner(owner) + { + } + }; + }; + public: enum ESubscriptionType { SUBSCRIPTION_UNSPECIFIED, // for filtration @@ -692,6 +712,8 @@ private: if (!notify->Record.GetStrong()) { auto& info = desc->GetSubscriberInfo(subscriber); info.NeedStrongNotification(); + + WaitStrongNotifications[domainOwnerId].insert(subscriber); } Send(subscriber, std::move(notify), flags); @@ -1048,22 +1070,58 @@ private: info.Generation = info.PendingGeneration; Send(ev->Sender, new TSchemeBoardEvents::TEvCommitResponse(owner, info.Generation), 0, ev->Cookie); + Send(SelfId(), new TEvPrivate::TEvSendStrongNotifications(owner)); + } + + void Handle(TEvPrivate::TEvSendStrongNotifications::TPtr& ev) { + const auto limit = ev->Get()->BatchSize; + const auto owner = ev->Get()->Owner; + + SBR_LOG_D("Handle TEvPrivate::TEvSendStrongNotifications" + << ": self# " << SelfId() + << ", owner# " << owner); + + if (!IsPopulatorCommited(owner)) { + SBR_LOG_N("Populator is not commited" + << ": self# " << SelfId() + << ", owner# " << owner); + return; + } + + auto itSubscribers = WaitStrongNotifications.find(owner); + if (itSubscribers == WaitStrongNotifications.end()) { + SBR_LOG_E("Invalid owner" + << ": self# " << SelfId() + << ", owner# " << owner); + return; + } + + auto& subscribers = itSubscribers->second; + auto it = subscribers.begin(); + ui32 count = 0; + + while (count++ < limit && it != subscribers.end()) { + const TActorId subscriber = *it; + it = subscribers.erase(it); + + auto jt = Subscribers.find(subscriber); + if (jt == Subscribers.end()) { + continue; + } - for (const auto& [subscriber, id] : Subscribers) { TDescription* desc = nullptr; - if (const TString* path = std::get_if<TString>(&id)) { + if (const TString* path = std::get_if<TString>(&jt->second)) { desc = Descriptions.FindPtr(*path); - } else if (const TPathId* pathId = std::get_if<TPathId>(&id)) { + } else if (const TPathId* pathId = std::get_if<TPathId>(&jt->second)) { desc = Descriptions.FindPtr(*pathId); } Y_VERIFY(desc); auto& info = desc->GetSubscriberInfo(subscriber); - if (info.GetDomainOwnerId() != owner - || info.IsNotifiedStrongly() - || info.IsWaitForAck()) { + Y_VERIFY(info.GetDomainOwnerId() == owner); + if (info.IsNotifiedStrongly() || info.IsWaitForAck()) { continue; } @@ -1071,6 +1129,12 @@ private: info.EnqueueVersion(notify.Get()); Send(subscriber, std::move(notify)); } + + if (subscribers) { + Send(SelfId(), new TEvPrivate::TEvSendStrongNotifications(owner)); + } else { + WaitStrongNotifications.erase(itSubscribers); + } } void Handle(TSchemeBoardEvents::TEvSubscribe::TPtr& ev) { @@ -1313,6 +1377,7 @@ public: hFunc(TSchemeBoardEvents::TEvHandshakeRequest, Handle); hFunc(TSchemeBoardEvents::TEvUpdate, Handle); hFunc(TSchemeBoardEvents::TEvCommitRequest, Handle); + hFunc(TEvPrivate::TEvSendStrongNotifications, Handle); hFunc(TSchemeBoardEvents::TEvSubscribe, Handle); hFunc(TSchemeBoardEvents::TEvUnsubscribe, Handle); hFunc(TSchemeBoardEvents::TEvNotifyAck, Handle); @@ -1330,6 +1395,7 @@ private: THashMap<ui64, TPopulatorInfo> Populators; TDoubleIndexedMap<TString, TPathId, TDescription, TMerger, THashMap, TMap> Descriptions; TMap<TActorId, std::variant<TString, TPathId>, TActorId::TOrderedCmp> Subscribers; + THashMap<ui64, TSet<TActorId>> WaitStrongNotifications; }; // TReplica |