diff options
4 files changed, 20 insertions, 0 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index fd27f0010f5..82f967a16e3 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -260,6 +260,9 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0)); } else if (ev->Get()->Tag == 1) { WriteTasksQueue->Drain(true, ctx); + } else if (ev->Get()->Tag == 2) { + OverloadSubscribers.NotifyAllOverloadSubscribers(SelfId(), TabletID()); + OverloadSubscribers.ProcessNotification(); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 57240c0bb7a..a5670f2553c 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -499,6 +499,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor if (!outOfSpace && record.HasOverloadSubscribe()) { const auto rejectReasons = NOverload::MakeRejectReasons(overloadStatus); OverloadSubscribers.SetOverloadSubscribed(record.GetOverloadSubscribe(), ev->Recipient, ev->Sender, rejectReasons, result->Record); + OverloadSubscribers.ScheduleNotification(SelfId()); } OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString(), diff --git a/ydb/core/tx/columnshard/overload/overload_subscribers.cpp b/ydb/core/tx/columnshard/overload/overload_subscribers.cpp index 81f37291119..b4727417b4a 100644 --- a/ydb/core/tx/columnshard/overload/overload_subscribers.cpp +++ b/ydb/core/tx/columnshard/overload/overload_subscribers.cpp @@ -161,4 +161,16 @@ void TOverloadSubscribers::RemoveOverloadSubscriber(TSeqNo seqNo, const TActorId } } +void TOverloadSubscribers::ScheduleNotification(const TActorId& actorId) { + if (InFlightNotification) { + return; + } + InFlightNotification = true; + TActivationContext::Schedule(TDuration::MilliSeconds(200), new IEventHandle(actorId, actorId, new NActors::TEvents::TEvWakeup(2))); +} + +void TOverloadSubscribers::ProcessNotification() { + InFlightNotification = false; +} + } // namespace NKikimr::NColumnShard::NOverload diff --git a/ydb/core/tx/columnshard/overload/overload_subscribers.h b/ydb/core/tx/columnshard/overload/overload_subscribers.h index d755a8e25b2..edbfea1428b 100644 --- a/ydb/core/tx/columnshard/overload/overload_subscribers.h +++ b/ydb/core/tx/columnshard/overload/overload_subscribers.h @@ -36,6 +36,9 @@ public: } } + void ScheduleNotification(const TActorId& actorId); + void ProcessNotification(); + private: struct TPipeServerInfoOverloadSubscribersTag {}; @@ -61,6 +64,7 @@ private: using TPipeServersWithOverloadSubscribers = TIntrusiveList<TPipeServerInfo, TPipeServerInfoOverloadSubscribersTag>; TPipeServersWithOverloadSubscribers PipeServersWithOverloadSubscribers; size_t OverloadSubscribersByReason[RejectReasonCount] = {0}; + bool InFlightNotification = false; }; } // namespace NKikimr::NColumnShard::NOverload |