summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/helpers/flow_controlled_queue.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/helpers/flow_controlled_queue.cpp
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/helpers/flow_controlled_queue.cpp')
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.cpp215
1 files changed, 215 insertions, 0 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp
new file mode 100644
index 00000000000..d75cc540236
--- /dev/null
+++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp
@@ -0,0 +1,215 @@
+#include "flow_controlled_queue.h"
+
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/util/datetime.h>
+
+#include <util/generic/deque.h>
+#include <util/datetime/cputimer.h>
+#include <util/generic/algorithm.h>
+
+namespace NActors {
+
+class TFlowControlledRequestQueue;
+
+class TFlowControlledRequestActor : public IActor {
+ TFlowControlledRequestQueue * const QueueActor;
+
+ void HandleReply(TAutoPtr<IEventHandle> &ev);
+ void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
+public:
+ const TActorId Source;
+ const ui64 Cookie;
+ const ui32 Flags;
+ const ui64 StartCounter;
+
+ TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
+ : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
+ , QueueActor(queue)
+ , Source(source)
+ , Cookie(cookie)
+ , Flags(flags)
+ , StartCounter(GetCycleCountFast())
+ {}
+
+ STATEFN(StateWait) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvents::TEvUndelivered, HandleUndelivered);
+ default:
+ HandleReply(ev);
+ }
+ }
+
+ TDuration AccumulatedLatency() const {
+ const ui64 cc = GetCycleCountFast() - StartCounter;
+ return CyclesToDuration(cc);
+ }
+
+ using IActor::PassAway;
+};
+
+class TFlowControlledRequestQueue : public IActor {
+ const TActorId Target;
+ const TFlowControlledQueueConfig Config;
+
+ TDeque<THolder<IEventHandle>> UnhandledRequests;
+ TDeque<TFlowControlledRequestActor *> RegisteredRequests;
+
+ bool Subscribed = false;
+
+ TDuration MinimalSeenLatency;
+
+ bool CanRegister() {
+ const ui64 inFly = RegisteredRequests.size();
+ if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0
+ return true;
+
+ if (inFly >= Config.MaxAllowedInFly)
+ return false;
+
+ if (Config.TargetDynamicRate) {
+ if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) {
+ if (inFly >= dynMax)
+ return false;
+ }
+ }
+
+ const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency();
+ if (currentLatency <= Config.MinTrackedLatency)
+ return true;
+
+ if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor)
+ return true;
+
+ return false;
+ }
+
+ void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) {
+ if (CanRegister()) {
+ RegisterReqActor(ev);
+ } else {
+ UnhandledRequests.emplace_back(ev.Release());
+ }
+ }
+
+ void RegisterReqActor(THolder<IEventHandle> ev) {
+ TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
+ const TActorId reqActorId = RegisterWithSameMailbox(reqActor);
+ RegisteredRequests.emplace_back(reqActor);
+
+ if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {
+ Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
+ Subscribed = true;
+ }
+
+ TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
+ }
+
+ void PumpQueue() {
+ while (RegisteredRequests && RegisteredRequests.front() == nullptr)
+ RegisteredRequests.pop_front();
+
+ while (UnhandledRequests && CanRegister()) {
+ RegisterReqActor(std::move(UnhandledRequests.front()));
+ UnhandledRequests.pop_front();
+ }
+ }
+
+ void HandleDisconnected() {
+ Subscribed = false;
+
+ const ui32 nodeid = Target.NodeId();
+ for (TFlowControlledRequestActor *reqActor : RegisteredRequests) {
+ if (reqActor) {
+ if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
+ TActivationContext::Send(
+ new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
+ );
+ }
+ reqActor->PassAway();
+ }
+ }
+
+ RegisteredRequests.clear();
+
+ for (auto &ev : UnhandledRequests) {
+ const auto reason = TEvents::TEvUndelivered::Disconnected;
+ if (ev->Flags & IEventHandle::FlagTrackDelivery) {
+ TActivationContext::Send(
+ new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
+ );
+ }
+ }
+
+ UnhandledRequests.clear();
+ }
+
+ void HandlePoison() {
+ HandleDisconnected();
+
+ if (SelfId().NodeId() != Target.NodeId())
+ Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe());
+
+ PassAway();
+ }
+public:
+ TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config)
+ : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
+ , Target(target)
+ , Config(config)
+ , MinimalSeenLatency(TDuration::Seconds(1))
+ {}
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected);
+ IgnoreFunc(TEvInterconnect::TEvNodeConnected);
+ cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected);
+ cFunc(TEvents::TEvPoison::EventType, HandlePoison);
+ default:
+ HandleForwardedEvent(ev);
+ }
+ }
+
+ void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) {
+ auto it = Find(RegisteredRequests, reqActor);
+ if (it == RegisteredRequests.end())
+ return;
+
+ TActivationContext::Send(ev->Forward(reqActor->Source));
+ const TDuration reqLatency = reqActor->AccumulatedLatency();
+ if (reqLatency < MinimalSeenLatency)
+ MinimalSeenLatency = reqLatency;
+
+ *it = nullptr;
+ PumpQueue();
+ }
+
+ void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) {
+ auto it = Find(RegisteredRequests, reqActor);
+ if (it == RegisteredRequests.end())
+ return;
+
+ TActivationContext::Send(ev->Forward(reqActor->Source));
+
+ *it = nullptr;
+ PumpQueue();
+ }
+};
+
+void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) {
+ QueueActor->HandleRequestReply(ev, this);
+ PassAway();
+}
+
+void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) {
+ QueueActor->HandleRequestUndelivered(ev, this);
+ PassAway();
+}
+
+
+IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) {
+ return new TFlowControlledRequestQueue(targetId, activity, config);
+}
+
+}