aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp1
-rw-r--r--ydb/core/tx/columnshard/overload/overload_subscribers.cpp12
-rw-r--r--ydb/core/tx/columnshard/overload/overload_subscribers.h4
4 files changed, 20 insertions, 0 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index fd27f0010f5..82f967a16e3 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -260,6 +260,9 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon
ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0));
} else if (ev->Get()->Tag == 1) {
WriteTasksQueue->Drain(true, ctx);
+ } else if (ev->Get()->Tag == 2) {
+ OverloadSubscribers.NotifyAllOverloadSubscribers(SelfId(), TabletID());
+ OverloadSubscribers.ProcessNotification();
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 57240c0bb7a..a5670f2553c 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -499,6 +499,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (!outOfSpace && record.HasOverloadSubscribe()) {
const auto rejectReasons = NOverload::MakeRejectReasons(overloadStatus);
OverloadSubscribers.SetOverloadSubscribed(record.GetOverloadSubscribe(), ev->Recipient, ev->Sender, rejectReasons, result->Record);
+ OverloadSubscribers.ScheduleNotification(SelfId());
}
OverloadWriteFail(overloadStatus,
NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString(),
diff --git a/ydb/core/tx/columnshard/overload/overload_subscribers.cpp b/ydb/core/tx/columnshard/overload/overload_subscribers.cpp
index 81f37291119..b4727417b4a 100644
--- a/ydb/core/tx/columnshard/overload/overload_subscribers.cpp
+++ b/ydb/core/tx/columnshard/overload/overload_subscribers.cpp
@@ -161,4 +161,16 @@ void TOverloadSubscribers::RemoveOverloadSubscriber(TSeqNo seqNo, const TActorId
}
}
+void TOverloadSubscribers::ScheduleNotification(const TActorId& actorId) {
+ if (InFlightNotification) {
+ return;
+ }
+ InFlightNotification = true;
+ TActivationContext::Schedule(TDuration::MilliSeconds(200), new IEventHandle(actorId, actorId, new NActors::TEvents::TEvWakeup(2)));
+}
+
+void TOverloadSubscribers::ProcessNotification() {
+ InFlightNotification = false;
+}
+
} // namespace NKikimr::NColumnShard::NOverload
diff --git a/ydb/core/tx/columnshard/overload/overload_subscribers.h b/ydb/core/tx/columnshard/overload/overload_subscribers.h
index d755a8e25b2..edbfea1428b 100644
--- a/ydb/core/tx/columnshard/overload/overload_subscribers.h
+++ b/ydb/core/tx/columnshard/overload/overload_subscribers.h
@@ -36,6 +36,9 @@ public:
}
}
+ void ScheduleNotification(const TActorId& actorId);
+ void ProcessNotification();
+
private:
struct TPipeServerInfoOverloadSubscribersTag {};
@@ -61,6 +64,7 @@ private:
using TPipeServersWithOverloadSubscribers = TIntrusiveList<TPipeServerInfo, TPipeServerInfoOverloadSubscribersTag>;
TPipeServersWithOverloadSubscribers PipeServersWithOverloadSubscribers;
size_t OverloadSubscribersByReason[RejectReasonCount] = {0};
+ bool InFlightNotification = false;
};
} // namespace NKikimr::NColumnShard::NOverload