diff options
author | ddoarn <ddoarn@yandex-team.ru> | 2022-02-10 16:49:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:52 +0300 |
commit | 0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e (patch) | |
tree | 6d6a79d83e5003eaf4d45cac346113c1137cb886 /library/cpp/actors/helpers | |
parent | 9541fc30d6f0877db9ff199a16f7fc2505d46a5c (diff) | |
download | ydb-0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e.tar.gz |
Restoring authorship annotation for <ddoarn@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/helpers')
-rw-r--r-- | library/cpp/actors/helpers/flow_controlled_queue.cpp | 406 | ||||
-rw-r--r-- | library/cpp/actors/helpers/flow_controlled_queue.h | 32 | ||||
-rw-r--r-- | library/cpp/actors/helpers/mon_histogram_helper.h | 20 | ||||
-rw-r--r-- | library/cpp/actors/helpers/ya.make | 24 |
4 files changed, 241 insertions, 241 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp index d75cc540236..81d26c3e55f 100644 --- a/library/cpp/actors/helpers/flow_controlled_queue.cpp +++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp @@ -1,215 +1,215 @@ -#include "flow_controlled_queue.h" - +#include "flow_controlled_queue.h" + #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/util/datetime.h> - -#include <util/generic/deque.h> -#include <util/datetime/cputimer.h> -#include <util/generic/algorithm.h> - -namespace NActors { - -class TFlowControlledRequestQueue; - -class TFlowControlledRequestActor : public IActor { - TFlowControlledRequestQueue * const QueueActor; - - void HandleReply(TAutoPtr<IEventHandle> &ev); - void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev); -public: + +#include <util/generic/deque.h> +#include <util/datetime/cputimer.h> +#include <util/generic/algorithm.h> + +namespace NActors { + +class TFlowControlledRequestQueue; + +class TFlowControlledRequestActor : public IActor { + TFlowControlledRequestQueue * const QueueActor; + + void HandleReply(TAutoPtr<IEventHandle> &ev); + void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev); +public: const TActorId Source; - const ui64 Cookie; - const ui32 Flags; - const ui64 StartCounter; - + const ui64 Cookie; + const ui32 Flags; + const ui64 StartCounter; + TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags) - : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity) - , QueueActor(queue) - , Source(source) - , Cookie(cookie) - , Flags(flags) + : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity) + , QueueActor(queue) + , Source(source) + , Cookie(cookie) + , Flags(flags) , StartCounter(GetCycleCountFast()) - {} - - STATEFN(StateWait) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvents::TEvUndelivered, HandleUndelivered); - default: - HandleReply(ev); - } - } - - TDuration AccumulatedLatency() const { + {} + + STATEFN(StateWait) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvUndelivered, HandleUndelivered); + default: + HandleReply(ev); + } + } + + TDuration AccumulatedLatency() const { const ui64 cc = GetCycleCountFast() - StartCounter; - return CyclesToDuration(cc); - } - - using IActor::PassAway; -}; - -class TFlowControlledRequestQueue : public IActor { + return CyclesToDuration(cc); + } + + using IActor::PassAway; +}; + +class TFlowControlledRequestQueue : public IActor { const TActorId Target; - const TFlowControlledQueueConfig Config; - - TDeque<THolder<IEventHandle>> UnhandledRequests; - TDeque<TFlowControlledRequestActor *> RegisteredRequests; - - bool Subscribed = false; - - TDuration MinimalSeenLatency; - - bool CanRegister() { - const ui64 inFly = RegisteredRequests.size(); - if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0 - return true; - - if (inFly >= Config.MaxAllowedInFly) - return false; - - if (Config.TargetDynamicRate) { - if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) { - if (inFly >= dynMax) - return false; - } - } - - const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency(); - if (currentLatency <= Config.MinTrackedLatency) - return true; - - if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor) - return true; - - return false; - } - - void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) { - if (CanRegister()) { - RegisterReqActor(ev); - } else { - UnhandledRequests.emplace_back(ev.Release()); - } - } - - void RegisterReqActor(THolder<IEventHandle> ev) { - TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags); + const TFlowControlledQueueConfig Config; + + TDeque<THolder<IEventHandle>> UnhandledRequests; + TDeque<TFlowControlledRequestActor *> RegisteredRequests; + + bool Subscribed = false; + + TDuration MinimalSeenLatency; + + bool CanRegister() { + const ui64 inFly = RegisteredRequests.size(); + if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0 + return true; + + if (inFly >= Config.MaxAllowedInFly) + return false; + + if (Config.TargetDynamicRate) { + if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) { + if (inFly >= dynMax) + return false; + } + } + + const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency(); + if (currentLatency <= Config.MinTrackedLatency) + return true; + + if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor) + return true; + + return false; + } + + void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) { + if (CanRegister()) { + RegisterReqActor(ev); + } else { + UnhandledRequests.emplace_back(ev.Release()); + } + } + + void RegisterReqActor(THolder<IEventHandle> ev) { + TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags); const TActorId reqActorId = RegisterWithSameMailbox(reqActor); - RegisteredRequests.emplace_back(reqActor); - - if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) { - Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery); - Subscribed = true; - } - - TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); - } - - void PumpQueue() { - while (RegisteredRequests && RegisteredRequests.front() == nullptr) - RegisteredRequests.pop_front(); - - while (UnhandledRequests && CanRegister()) { - RegisterReqActor(std::move(UnhandledRequests.front())); - UnhandledRequests.pop_front(); - } - } - - void HandleDisconnected() { - Subscribed = false; - - const ui32 nodeid = Target.NodeId(); - for (TFlowControlledRequestActor *reqActor : RegisteredRequests) { - if (reqActor) { - if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) { - TActivationContext::Send( + RegisteredRequests.emplace_back(reqActor); + + if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) { + Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery); + Subscribed = true; + } + + TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); + } + + void PumpQueue() { + while (RegisteredRequests && RegisteredRequests.front() == nullptr) + RegisteredRequests.pop_front(); + + while (UnhandledRequests && CanRegister()) { + RegisterReqActor(std::move(UnhandledRequests.front())); + UnhandledRequests.pop_front(); + } + } + + void HandleDisconnected() { + Subscribed = false; + + const ui32 nodeid = Target.NodeId(); + for (TFlowControlledRequestActor *reqActor : RegisteredRequests) { + if (reqActor) { + if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) { + TActivationContext::Send( new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) - ); - } - reqActor->PassAway(); - } - } - - RegisteredRequests.clear(); - - for (auto &ev : UnhandledRequests) { - const auto reason = TEvents::TEvUndelivered::Disconnected; - if (ev->Flags & IEventHandle::FlagTrackDelivery) { - TActivationContext::Send( - new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) - ); - } - } - - UnhandledRequests.clear(); - } - - void HandlePoison() { - HandleDisconnected(); - - if (SelfId().NodeId() != Target.NodeId()) - Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe()); - - PassAway(); - } -public: + ); + } + reqActor->PassAway(); + } + } + + RegisteredRequests.clear(); + + for (auto &ev : UnhandledRequests) { + const auto reason = TEvents::TEvUndelivered::Disconnected; + if (ev->Flags & IEventHandle::FlagTrackDelivery) { + TActivationContext::Send( + new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) + ); + } + } + + UnhandledRequests.clear(); + } + + void HandlePoison() { + HandleDisconnected(); + + if (SelfId().NodeId() != Target.NodeId()) + Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe()); + + PassAway(); + } +public: TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config) - : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity) - , Target(target) - , Config(config) - , MinimalSeenLatency(TDuration::Seconds(1)) - {} - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected); - IgnoreFunc(TEvInterconnect::TEvNodeConnected); - cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected); - cFunc(TEvents::TEvPoison::EventType, HandlePoison); - default: - HandleForwardedEvent(ev); - } - } - - void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) { - auto it = Find(RegisteredRequests, reqActor); - if (it == RegisteredRequests.end()) - return; - - TActivationContext::Send(ev->Forward(reqActor->Source)); - const TDuration reqLatency = reqActor->AccumulatedLatency(); - if (reqLatency < MinimalSeenLatency) - MinimalSeenLatency = reqLatency; - - *it = nullptr; - PumpQueue(); - } - - void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) { - auto it = Find(RegisteredRequests, reqActor); - if (it == RegisteredRequests.end()) - return; - - TActivationContext::Send(ev->Forward(reqActor->Source)); - - *it = nullptr; - PumpQueue(); - } -}; - -void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) { - QueueActor->HandleRequestReply(ev, this); - PassAway(); -} - -void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) { - QueueActor->HandleRequestUndelivered(ev, this); - PassAway(); -} - - + : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity) + , Target(target) + , Config(config) + , MinimalSeenLatency(TDuration::Seconds(1)) + {} + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected); + IgnoreFunc(TEvInterconnect::TEvNodeConnected); + cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected); + cFunc(TEvents::TEvPoison::EventType, HandlePoison); + default: + HandleForwardedEvent(ev); + } + } + + void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) { + auto it = Find(RegisteredRequests, reqActor); + if (it == RegisteredRequests.end()) + return; + + TActivationContext::Send(ev->Forward(reqActor->Source)); + const TDuration reqLatency = reqActor->AccumulatedLatency(); + if (reqLatency < MinimalSeenLatency) + MinimalSeenLatency = reqLatency; + + *it = nullptr; + PumpQueue(); + } + + void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) { + auto it = Find(RegisteredRequests, reqActor); + if (it == RegisteredRequests.end()) + return; + + TActivationContext::Send(ev->Forward(reqActor->Source)); + + *it = nullptr; + PumpQueue(); + } +}; + +void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) { + QueueActor->HandleRequestReply(ev, this); + PassAway(); +} + +void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) { + QueueActor->HandleRequestUndelivered(ev, this); + PassAway(); +} + + IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) { - return new TFlowControlledRequestQueue(targetId, activity, config); -} - -} + return new TFlowControlledRequestQueue(targetId, activity, config); +} + +} diff --git a/library/cpp/actors/helpers/flow_controlled_queue.h b/library/cpp/actors/helpers/flow_controlled_queue.h index d2504053047..0699b35ca6e 100644 --- a/library/cpp/actors/helpers/flow_controlled_queue.h +++ b/library/cpp/actors/helpers/flow_controlled_queue.h @@ -1,18 +1,18 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/core/actor.h> - -namespace NActors { - - struct TFlowControlledQueueConfig { - ui32 MinAllowedInFly = 20; - ui32 MaxAllowedInFly = 100; - ui32 TargetDynamicRate = 0; - - TDuration MinTrackedLatency = TDuration::MilliSeconds(20); - ui32 LatencyFactor = 4; - }; - + +namespace NActors { + + struct TFlowControlledQueueConfig { + ui32 MinAllowedInFly = 20; + ui32 MaxAllowedInFly = 100; + ui32 TargetDynamicRate = 0; + + TDuration MinTrackedLatency = TDuration::MilliSeconds(20); + ui32 LatencyFactor = 4; + }; + IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity = IActor::ACTORLIB_COMMON, const TFlowControlledQueueConfig &config = TFlowControlledQueueConfig()); - -} + +} diff --git a/library/cpp/actors/helpers/mon_histogram_helper.h b/library/cpp/actors/helpers/mon_histogram_helper.h index a9a57e38238..35d030b822f 100644 --- a/library/cpp/actors/helpers/mon_histogram_helper.h +++ b/library/cpp/actors/helpers/mon_histogram_helper.h @@ -1,10 +1,10 @@ -#pragma once - +#pragma once + #include <library/cpp/monlib/dynamic_counters/counters.h> #include <util/string/cast.h> -namespace NActors { +namespace NActors { namespace NMon { class THistogramCounterHelper { public: @@ -13,7 +13,7 @@ namespace NActors { , BucketCount(0) { } - + THistogramCounterHelper(const THistogramCounterHelper&) = default; void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, @@ -21,7 +21,7 @@ namespace NActors { { Y_ASSERT(FirstBucketVal == 0); Y_ASSERT(BucketCount == 0); - + FirstBucketVal = firstBucket; BucketCount = bucketCnt; BucketsHolder.reserve(BucketCount); @@ -33,7 +33,7 @@ namespace NActors { Buckets.push_back(BucketsHolder.back().Get()); } } - + void Add(ui64 val) { Y_ASSERT(FirstBucketVal != 0); Y_ASSERT(BucketCount != 0); @@ -47,7 +47,7 @@ namespace NActors { } Buckets[ind]->Inc(); } - + ui64 GetBucketCount() const { return BucketCount; } @@ -73,8 +73,8 @@ namespace NActors { // Last slot is up to +INF return "INF"; } - } - + } + private: ui64 FirstBucketVal; ui64 BucketCount; @@ -82,5 +82,5 @@ namespace NActors { TVector<NMonitoring::TDeprecatedCounter*> Buckets; }; - } + } } diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make index d8771179de8..b6cce2cc237 100644 --- a/library/cpp/actors/helpers/ya.make +++ b/library/cpp/actors/helpers/ya.make @@ -1,23 +1,23 @@ -LIBRARY() - +LIBRARY() + OWNER(g:kikimr) - -SRCS( + +SRCS( activeactors.cpp activeactors.h - flow_controlled_queue.cpp - flow_controlled_queue.h + flow_controlled_queue.cpp + flow_controlled_queue.h future_callback.h mon_histogram_helper.h selfping_actor.cpp -) - -PEERDIR( +) + +PEERDIR( library/cpp/actors/core library/cpp/monlib/dynamic_counters -) - -END() +) + +END() RECURSE_FOR_TESTS( ut |