diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/helpers/flow_controlled_queue.cpp |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/helpers/flow_controlled_queue.cpp')
-rw-r--r-- | library/cpp/actors/helpers/flow_controlled_queue.cpp | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp new file mode 100644 index 00000000000..d75cc540236 --- /dev/null +++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp @@ -0,0 +1,215 @@ +#include "flow_controlled_queue.h" + +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/util/datetime.h> + +#include <util/generic/deque.h> +#include <util/datetime/cputimer.h> +#include <util/generic/algorithm.h> + +namespace NActors { + +class TFlowControlledRequestQueue; + +class TFlowControlledRequestActor : public IActor { + TFlowControlledRequestQueue * const QueueActor; + + void HandleReply(TAutoPtr<IEventHandle> &ev); + void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev); +public: + const TActorId Source; + const ui64 Cookie; + const ui32 Flags; + const ui64 StartCounter; + + TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags) + : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity) + , QueueActor(queue) + , Source(source) + , Cookie(cookie) + , Flags(flags) + , StartCounter(GetCycleCountFast()) + {} + + STATEFN(StateWait) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvUndelivered, HandleUndelivered); + default: + HandleReply(ev); + } + } + + TDuration AccumulatedLatency() const { + const ui64 cc = GetCycleCountFast() - StartCounter; + return CyclesToDuration(cc); + } + + using IActor::PassAway; +}; + +class TFlowControlledRequestQueue : public IActor { + const TActorId Target; + const TFlowControlledQueueConfig Config; + + TDeque<THolder<IEventHandle>> UnhandledRequests; + TDeque<TFlowControlledRequestActor *> RegisteredRequests; + + bool Subscribed = false; + + TDuration MinimalSeenLatency; + + bool CanRegister() { + const ui64 inFly = RegisteredRequests.size(); + if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0 + return true; + + if (inFly >= Config.MaxAllowedInFly) + return false; + + if (Config.TargetDynamicRate) { + if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) { + if (inFly >= dynMax) + return false; + } + } + + const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency(); + if (currentLatency <= Config.MinTrackedLatency) + return true; + + if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor) + return true; + + return false; + } + + void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) { + if (CanRegister()) { + RegisterReqActor(ev); + } else { + UnhandledRequests.emplace_back(ev.Release()); + } + } + + void RegisterReqActor(THolder<IEventHandle> ev) { + TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags); + const TActorId reqActorId = RegisterWithSameMailbox(reqActor); + RegisteredRequests.emplace_back(reqActor); + + if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) { + Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery); + Subscribed = true; + } + + TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); + } + + void PumpQueue() { + while (RegisteredRequests && RegisteredRequests.front() == nullptr) + RegisteredRequests.pop_front(); + + while (UnhandledRequests && CanRegister()) { + RegisterReqActor(std::move(UnhandledRequests.front())); + UnhandledRequests.pop_front(); + } + } + + void HandleDisconnected() { + Subscribed = false; + + const ui32 nodeid = Target.NodeId(); + for (TFlowControlledRequestActor *reqActor : RegisteredRequests) { + if (reqActor) { + if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) { + TActivationContext::Send( + new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) + ); + } + reqActor->PassAway(); + } + } + + RegisteredRequests.clear(); + + for (auto &ev : UnhandledRequests) { + const auto reason = TEvents::TEvUndelivered::Disconnected; + if (ev->Flags & IEventHandle::FlagTrackDelivery) { + TActivationContext::Send( + new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) + ); + } + } + + UnhandledRequests.clear(); + } + + void HandlePoison() { + HandleDisconnected(); + + if (SelfId().NodeId() != Target.NodeId()) + Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe()); + + PassAway(); + } +public: + TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config) + : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity) + , Target(target) + , Config(config) + , MinimalSeenLatency(TDuration::Seconds(1)) + {} + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected); + IgnoreFunc(TEvInterconnect::TEvNodeConnected); + cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected); + cFunc(TEvents::TEvPoison::EventType, HandlePoison); + default: + HandleForwardedEvent(ev); + } + } + + void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) { + auto it = Find(RegisteredRequests, reqActor); + if (it == RegisteredRequests.end()) + return; + + TActivationContext::Send(ev->Forward(reqActor->Source)); + const TDuration reqLatency = reqActor->AccumulatedLatency(); + if (reqLatency < MinimalSeenLatency) + MinimalSeenLatency = reqLatency; + + *it = nullptr; + PumpQueue(); + } + + void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) { + auto it = Find(RegisteredRequests, reqActor); + if (it == RegisteredRequests.end()) + return; + + TActivationContext::Send(ev->Forward(reqActor->Source)); + + *it = nullptr; + PumpQueue(); + } +}; + +void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) { + QueueActor->HandleRequestReply(ev, this); + PassAway(); +} + +void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) { + QueueActor->HandleRequestUndelivered(ev, this); + PassAway(); +} + + +IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) { + return new TFlowControlledRequestQueue(targetId, activity, config); +} + +} |