aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-18 18:40:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-18 18:40:09 +0300
commit12328e5654d72ab12dd1d4af6e88426969fd33e9 (patch)
tree77ad874a96e7890d4f95674cd031c533f0400682
parentb3825bd66b590d894faeb04a3db47e4c82bdc0d8 (diff)
downloadydb-12328e5654d72ab12dd1d4af6e88426969fd33e9.tar.gz
22-2: Batch processing of sending notifications upon commit KIKIMR-14597
merge from trunk: r9294377 REVIEW: 2427442 x-ydb-stable-ref: 57b7333387ffa67d0c728f84e8832ab205ecada5
-rw-r--r--ydb/core/tx/scheme_board/replica.cpp78
1 files changed, 72 insertions, 6 deletions
diff --git a/ydb/core/tx/scheme_board/replica.cpp b/ydb/core/tx/scheme_board/replica.cpp
index 638656e91d..7aa4b2d4d3 100644
--- a/ydb/core/tx/scheme_board/replica.cpp
+++ b/ydb/core/tx/scheme_board/replica.cpp
@@ -36,6 +36,26 @@ class TReplica: public TMonitorableActor<TReplica> {
using TDescribeSchemeResult = NKikimrScheme::TEvDescribeSchemeResult;
using TCapabilities = NKikimrSchemeBoard::TEvSubscribe::TCapabilities;
+ struct TEvPrivate {
+ enum EEv {
+ EvSendStrongNotifications = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+
+ EvEnd,
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)");
+
+ struct TEvSendStrongNotifications: public TEventLocal<TEvSendStrongNotifications, EvSendStrongNotifications> {
+ static constexpr ui32 BatchSize = 1000;
+ const ui64 Owner;
+
+ explicit TEvSendStrongNotifications(ui64 owner)
+ : Owner(owner)
+ {
+ }
+ };
+ };
+
public:
enum ESubscriptionType {
SUBSCRIPTION_UNSPECIFIED, // for filtration
@@ -692,6 +712,8 @@ private:
if (!notify->Record.GetStrong()) {
auto& info = desc->GetSubscriberInfo(subscriber);
info.NeedStrongNotification();
+
+ WaitStrongNotifications[domainOwnerId].insert(subscriber);
}
Send(subscriber, std::move(notify), flags);
@@ -1048,22 +1070,58 @@ private:
info.Generation = info.PendingGeneration;
Send(ev->Sender, new TSchemeBoardEvents::TEvCommitResponse(owner, info.Generation), 0, ev->Cookie);
+ Send(SelfId(), new TEvPrivate::TEvSendStrongNotifications(owner));
+ }
+
+ void Handle(TEvPrivate::TEvSendStrongNotifications::TPtr& ev) {
+ const auto limit = ev->Get()->BatchSize;
+ const auto owner = ev->Get()->Owner;
+
+ SBR_LOG_D("Handle TEvPrivate::TEvSendStrongNotifications"
+ << ": self# " << SelfId()
+ << ", owner# " << owner);
+
+ if (!IsPopulatorCommited(owner)) {
+ SBR_LOG_N("Populator is not commited"
+ << ": self# " << SelfId()
+ << ", owner# " << owner);
+ return;
+ }
+
+ auto itSubscribers = WaitStrongNotifications.find(owner);
+ if (itSubscribers == WaitStrongNotifications.end()) {
+ SBR_LOG_E("Invalid owner"
+ << ": self# " << SelfId()
+ << ", owner# " << owner);
+ return;
+ }
+
+ auto& subscribers = itSubscribers->second;
+ auto it = subscribers.begin();
+ ui32 count = 0;
+
+ while (count++ < limit && it != subscribers.end()) {
+ const TActorId subscriber = *it;
+ it = subscribers.erase(it);
+
+ auto jt = Subscribers.find(subscriber);
+ if (jt == Subscribers.end()) {
+ continue;
+ }
- for (const auto& [subscriber, id] : Subscribers) {
TDescription* desc = nullptr;
- if (const TString* path = std::get_if<TString>(&id)) {
+ if (const TString* path = std::get_if<TString>(&jt->second)) {
desc = Descriptions.FindPtr(*path);
- } else if (const TPathId* pathId = std::get_if<TPathId>(&id)) {
+ } else if (const TPathId* pathId = std::get_if<TPathId>(&jt->second)) {
desc = Descriptions.FindPtr(*pathId);
}
Y_VERIFY(desc);
auto& info = desc->GetSubscriberInfo(subscriber);
- if (info.GetDomainOwnerId() != owner
- || info.IsNotifiedStrongly()
- || info.IsWaitForAck()) {
+ Y_VERIFY(info.GetDomainOwnerId() == owner);
+ if (info.IsNotifiedStrongly() || info.IsWaitForAck()) {
continue;
}
@@ -1071,6 +1129,12 @@ private:
info.EnqueueVersion(notify.Get());
Send(subscriber, std::move(notify));
}
+
+ if (subscribers) {
+ Send(SelfId(), new TEvPrivate::TEvSendStrongNotifications(owner));
+ } else {
+ WaitStrongNotifications.erase(itSubscribers);
+ }
}
void Handle(TSchemeBoardEvents::TEvSubscribe::TPtr& ev) {
@@ -1313,6 +1377,7 @@ public:
hFunc(TSchemeBoardEvents::TEvHandshakeRequest, Handle);
hFunc(TSchemeBoardEvents::TEvUpdate, Handle);
hFunc(TSchemeBoardEvents::TEvCommitRequest, Handle);
+ hFunc(TEvPrivate::TEvSendStrongNotifications, Handle);
hFunc(TSchemeBoardEvents::TEvSubscribe, Handle);
hFunc(TSchemeBoardEvents::TEvUnsubscribe, Handle);
hFunc(TSchemeBoardEvents::TEvNotifyAck, Handle);
@@ -1330,6 +1395,7 @@ private:
THashMap<ui64, TPopulatorInfo> Populators;
TDoubleIndexedMap<TString, TPathId, TDescription, TMerger, THashMap, TMap> Descriptions;
TMap<TActorId, std::variant<TString, TPathId>, TActorId::TOrderedCmp> Subscribers;
+ THashMap<ui64, TSet<TActorId>> WaitStrongNotifications;
}; // TReplica