aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/helpers
diff options
context:
space:
mode:
authorddoarn <ddoarn@yandex-team.ru>2022-02-10 16:49:52 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:52 +0300
commit0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e (patch)
tree6d6a79d83e5003eaf4d45cac346113c1137cb886 /library/cpp/actors/helpers
parent9541fc30d6f0877db9ff199a16f7fc2505d46a5c (diff)
downloadydb-0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e.tar.gz
Restoring authorship annotation for <ddoarn@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/helpers')
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.cpp406
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.h32
-rw-r--r--library/cpp/actors/helpers/mon_histogram_helper.h20
-rw-r--r--library/cpp/actors/helpers/ya.make24
4 files changed, 241 insertions, 241 deletions
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp
index d75cc540236..81d26c3e55f 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.cpp
+++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp
@@ -1,215 +1,215 @@
-#include "flow_controlled_queue.h"
-
+#include "flow_controlled_queue.h"
+
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/util/datetime.h>
-
-#include <util/generic/deque.h>
-#include <util/datetime/cputimer.h>
-#include <util/generic/algorithm.h>
-
-namespace NActors {
-
-class TFlowControlledRequestQueue;
-
-class TFlowControlledRequestActor : public IActor {
- TFlowControlledRequestQueue * const QueueActor;
-
- void HandleReply(TAutoPtr<IEventHandle> &ev);
- void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
-public:
+
+#include <util/generic/deque.h>
+#include <util/datetime/cputimer.h>
+#include <util/generic/algorithm.h>
+
+namespace NActors {
+
+class TFlowControlledRequestQueue;
+
+class TFlowControlledRequestActor : public IActor {
+ TFlowControlledRequestQueue * const QueueActor;
+
+ void HandleReply(TAutoPtr<IEventHandle> &ev);
+ void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
+public:
const TActorId Source;
- const ui64 Cookie;
- const ui32 Flags;
- const ui64 StartCounter;
-
+ const ui64 Cookie;
+ const ui32 Flags;
+ const ui64 StartCounter;
+
TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
- : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
- , QueueActor(queue)
- , Source(source)
- , Cookie(cookie)
- , Flags(flags)
+ : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
+ , QueueActor(queue)
+ , Source(source)
+ , Cookie(cookie)
+ , Flags(flags)
, StartCounter(GetCycleCountFast())
- {}
-
- STATEFN(StateWait) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvents::TEvUndelivered, HandleUndelivered);
- default:
- HandleReply(ev);
- }
- }
-
- TDuration AccumulatedLatency() const {
+ {}
+
+ STATEFN(StateWait) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvents::TEvUndelivered, HandleUndelivered);
+ default:
+ HandleReply(ev);
+ }
+ }
+
+ TDuration AccumulatedLatency() const {
const ui64 cc = GetCycleCountFast() - StartCounter;
- return CyclesToDuration(cc);
- }
-
- using IActor::PassAway;
-};
-
-class TFlowControlledRequestQueue : public IActor {
+ return CyclesToDuration(cc);
+ }
+
+ using IActor::PassAway;
+};
+
+class TFlowControlledRequestQueue : public IActor {
const TActorId Target;
- const TFlowControlledQueueConfig Config;
-
- TDeque<THolder<IEventHandle>> UnhandledRequests;
- TDeque<TFlowControlledRequestActor *> RegisteredRequests;
-
- bool Subscribed = false;
-
- TDuration MinimalSeenLatency;
-
- bool CanRegister() {
- const ui64 inFly = RegisteredRequests.size();
- if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0
- return true;
-
- if (inFly >= Config.MaxAllowedInFly)
- return false;
-
- if (Config.TargetDynamicRate) {
- if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) {
- if (inFly >= dynMax)
- return false;
- }
- }
-
- const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency();
- if (currentLatency <= Config.MinTrackedLatency)
- return true;
-
- if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor)
- return true;
-
- return false;
- }
-
- void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) {
- if (CanRegister()) {
- RegisterReqActor(ev);
- } else {
- UnhandledRequests.emplace_back(ev.Release());
- }
- }
-
- void RegisterReqActor(THolder<IEventHandle> ev) {
- TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
+ const TFlowControlledQueueConfig Config;
+
+ TDeque<THolder<IEventHandle>> UnhandledRequests;
+ TDeque<TFlowControlledRequestActor *> RegisteredRequests;
+
+ bool Subscribed = false;
+
+ TDuration MinimalSeenLatency;
+
+ bool CanRegister() {
+ const ui64 inFly = RegisteredRequests.size();
+ if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0
+ return true;
+
+ if (inFly >= Config.MaxAllowedInFly)
+ return false;
+
+ if (Config.TargetDynamicRate) {
+ if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) {
+ if (inFly >= dynMax)
+ return false;
+ }
+ }
+
+ const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency();
+ if (currentLatency <= Config.MinTrackedLatency)
+ return true;
+
+ if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor)
+ return true;
+
+ return false;
+ }
+
+ void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) {
+ if (CanRegister()) {
+ RegisterReqActor(ev);
+ } else {
+ UnhandledRequests.emplace_back(ev.Release());
+ }
+ }
+
+ void RegisterReqActor(THolder<IEventHandle> ev) {
+ TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
const TActorId reqActorId = RegisterWithSameMailbox(reqActor);
- RegisteredRequests.emplace_back(reqActor);
-
- if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {
- Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
- Subscribed = true;
- }
-
- TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
- }
-
- void PumpQueue() {
- while (RegisteredRequests && RegisteredRequests.front() == nullptr)
- RegisteredRequests.pop_front();
-
- while (UnhandledRequests && CanRegister()) {
- RegisterReqActor(std::move(UnhandledRequests.front()));
- UnhandledRequests.pop_front();
- }
- }
-
- void HandleDisconnected() {
- Subscribed = false;
-
- const ui32 nodeid = Target.NodeId();
- for (TFlowControlledRequestActor *reqActor : RegisteredRequests) {
- if (reqActor) {
- if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
- TActivationContext::Send(
+ RegisteredRequests.emplace_back(reqActor);
+
+ if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {
+ Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
+ Subscribed = true;
+ }
+
+ TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
+ }
+
+ void PumpQueue() {
+ while (RegisteredRequests && RegisteredRequests.front() == nullptr)
+ RegisteredRequests.pop_front();
+
+ while (UnhandledRequests && CanRegister()) {
+ RegisterReqActor(std::move(UnhandledRequests.front()));
+ UnhandledRequests.pop_front();
+ }
+ }
+
+ void HandleDisconnected() {
+ Subscribed = false;
+
+ const ui32 nodeid = Target.NodeId();
+ for (TFlowControlledRequestActor *reqActor : RegisteredRequests) {
+ if (reqActor) {
+ if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
+ TActivationContext::Send(
new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
- );
- }
- reqActor->PassAway();
- }
- }
-
- RegisteredRequests.clear();
-
- for (auto &ev : UnhandledRequests) {
- const auto reason = TEvents::TEvUndelivered::Disconnected;
- if (ev->Flags & IEventHandle::FlagTrackDelivery) {
- TActivationContext::Send(
- new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
- );
- }
- }
-
- UnhandledRequests.clear();
- }
-
- void HandlePoison() {
- HandleDisconnected();
-
- if (SelfId().NodeId() != Target.NodeId())
- Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe());
-
- PassAway();
- }
-public:
+ );
+ }
+ reqActor->PassAway();
+ }
+ }
+
+ RegisteredRequests.clear();
+
+ for (auto &ev : UnhandledRequests) {
+ const auto reason = TEvents::TEvUndelivered::Disconnected;
+ if (ev->Flags & IEventHandle::FlagTrackDelivery) {
+ TActivationContext::Send(
+ new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
+ );
+ }
+ }
+
+ UnhandledRequests.clear();
+ }
+
+ void HandlePoison() {
+ HandleDisconnected();
+
+ if (SelfId().NodeId() != Target.NodeId())
+ Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe());
+
+ PassAway();
+ }
+public:
TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config)
- : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
- , Target(target)
- , Config(config)
- , MinimalSeenLatency(TDuration::Seconds(1))
- {}
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected);
- IgnoreFunc(TEvInterconnect::TEvNodeConnected);
- cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected);
- cFunc(TEvents::TEvPoison::EventType, HandlePoison);
- default:
- HandleForwardedEvent(ev);
- }
- }
-
- void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) {
- auto it = Find(RegisteredRequests, reqActor);
- if (it == RegisteredRequests.end())
- return;
-
- TActivationContext::Send(ev->Forward(reqActor->Source));
- const TDuration reqLatency = reqActor->AccumulatedLatency();
- if (reqLatency < MinimalSeenLatency)
- MinimalSeenLatency = reqLatency;
-
- *it = nullptr;
- PumpQueue();
- }
-
- void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) {
- auto it = Find(RegisteredRequests, reqActor);
- if (it == RegisteredRequests.end())
- return;
-
- TActivationContext::Send(ev->Forward(reqActor->Source));
-
- *it = nullptr;
- PumpQueue();
- }
-};
-
-void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) {
- QueueActor->HandleRequestReply(ev, this);
- PassAway();
-}
-
-void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) {
- QueueActor->HandleRequestUndelivered(ev, this);
- PassAway();
-}
-
-
+ : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
+ , Target(target)
+ , Config(config)
+ , MinimalSeenLatency(TDuration::Seconds(1))
+ {}
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected);
+ IgnoreFunc(TEvInterconnect::TEvNodeConnected);
+ cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected);
+ cFunc(TEvents::TEvPoison::EventType, HandlePoison);
+ default:
+ HandleForwardedEvent(ev);
+ }
+ }
+
+ void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) {
+ auto it = Find(RegisteredRequests, reqActor);
+ if (it == RegisteredRequests.end())
+ return;
+
+ TActivationContext::Send(ev->Forward(reqActor->Source));
+ const TDuration reqLatency = reqActor->AccumulatedLatency();
+ if (reqLatency < MinimalSeenLatency)
+ MinimalSeenLatency = reqLatency;
+
+ *it = nullptr;
+ PumpQueue();
+ }
+
+ void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) {
+ auto it = Find(RegisteredRequests, reqActor);
+ if (it == RegisteredRequests.end())
+ return;
+
+ TActivationContext::Send(ev->Forward(reqActor->Source));
+
+ *it = nullptr;
+ PumpQueue();
+ }
+};
+
+void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) {
+ QueueActor->HandleRequestReply(ev, this);
+ PassAway();
+}
+
+void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) {
+ QueueActor->HandleRequestUndelivered(ev, this);
+ PassAway();
+}
+
+
IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) {
- return new TFlowControlledRequestQueue(targetId, activity, config);
-}
-
-}
+ return new TFlowControlledRequestQueue(targetId, activity, config);
+}
+
+}
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.h b/library/cpp/actors/helpers/flow_controlled_queue.h
index d2504053047..0699b35ca6e 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.h
+++ b/library/cpp/actors/helpers/flow_controlled_queue.h
@@ -1,18 +1,18 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/actor.h>
-
-namespace NActors {
-
- struct TFlowControlledQueueConfig {
- ui32 MinAllowedInFly = 20;
- ui32 MaxAllowedInFly = 100;
- ui32 TargetDynamicRate = 0;
-
- TDuration MinTrackedLatency = TDuration::MilliSeconds(20);
- ui32 LatencyFactor = 4;
- };
-
+
+namespace NActors {
+
+ struct TFlowControlledQueueConfig {
+ ui32 MinAllowedInFly = 20;
+ ui32 MaxAllowedInFly = 100;
+ ui32 TargetDynamicRate = 0;
+
+ TDuration MinTrackedLatency = TDuration::MilliSeconds(20);
+ ui32 LatencyFactor = 4;
+ };
+
IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity = IActor::ACTORLIB_COMMON, const TFlowControlledQueueConfig &config = TFlowControlledQueueConfig());
-
-}
+
+}
diff --git a/library/cpp/actors/helpers/mon_histogram_helper.h b/library/cpp/actors/helpers/mon_histogram_helper.h
index a9a57e38238..35d030b822f 100644
--- a/library/cpp/actors/helpers/mon_histogram_helper.h
+++ b/library/cpp/actors/helpers/mon_histogram_helper.h
@@ -1,10 +1,10 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <util/string/cast.h>
-namespace NActors {
+namespace NActors {
namespace NMon {
class THistogramCounterHelper {
public:
@@ -13,7 +13,7 @@ namespace NActors {
, BucketCount(0)
{
}
-
+
THistogramCounterHelper(const THistogramCounterHelper&) = default;
void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit,
@@ -21,7 +21,7 @@ namespace NActors {
{
Y_ASSERT(FirstBucketVal == 0);
Y_ASSERT(BucketCount == 0);
-
+
FirstBucketVal = firstBucket;
BucketCount = bucketCnt;
BucketsHolder.reserve(BucketCount);
@@ -33,7 +33,7 @@ namespace NActors {
Buckets.push_back(BucketsHolder.back().Get());
}
}
-
+
void Add(ui64 val) {
Y_ASSERT(FirstBucketVal != 0);
Y_ASSERT(BucketCount != 0);
@@ -47,7 +47,7 @@ namespace NActors {
}
Buckets[ind]->Inc();
}
-
+
ui64 GetBucketCount() const {
return BucketCount;
}
@@ -73,8 +73,8 @@ namespace NActors {
// Last slot is up to +INF
return "INF";
}
- }
-
+ }
+
private:
ui64 FirstBucketVal;
ui64 BucketCount;
@@ -82,5 +82,5 @@ namespace NActors {
TVector<NMonitoring::TDeprecatedCounter*> Buckets;
};
- }
+ }
}
diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make
index d8771179de8..b6cce2cc237 100644
--- a/library/cpp/actors/helpers/ya.make
+++ b/library/cpp/actors/helpers/ya.make
@@ -1,23 +1,23 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(g:kikimr)
-
-SRCS(
+
+SRCS(
activeactors.cpp
activeactors.h
- flow_controlled_queue.cpp
- flow_controlled_queue.h
+ flow_controlled_queue.cpp
+ flow_controlled_queue.h
future_callback.h
mon_histogram_helper.h
selfping_actor.cpp
-)
-
-PEERDIR(
+)
+
+PEERDIR(
library/cpp/actors/core
library/cpp/monlib/dynamic_counters
-)
-
-END()
+)
+
+END()
RECURSE_FOR_TESTS(
ut