aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-02-10 16:46:40 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:40 +0300
commitb036a557f285146e5e35d4213e29a094ab907bcf (patch)
tree49e222ea1c5804306084bb3ae065bb702625360f /library
parent4a6816dea1bcaee46ce29a51a5fd7d3495012858 (diff)
downloadydb-b036a557f285146e5e35d4213e29a094ab907bcf.tar.gz
Restoring authorship annotation for <aozeritsky@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/actor.h64
-rw-r--r--library/cpp/actors/core/actor_coroutine.h2
-rw-r--r--library/cpp/actors/core/actor_ut.cpp26
-rw-r--r--library/cpp/actors/core/invoke.h8
-rw-r--r--library/cpp/actors/core/ya.make2
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h628
-rw-r--r--library/cpp/actors/helpers/selfping_actor.cpp26
-rw-r--r--library/cpp/actors/helpers/selfping_actor.h2
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp26
-rw-r--r--library/cpp/actors/helpers/ut/ya.make72
-rw-r--r--library/cpp/actors/helpers/ya.make12
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp24
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.h4
-rw-r--r--library/cpp/actors/util/local_process_key.h152
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp36
-rw-r--r--library/cpp/grpc/server/grpc_server.h22
16 files changed, 553 insertions, 553 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 68177a25bf..ed29bd14b9 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -233,7 +233,7 @@ namespace NActors {
INTERCONNECT_PROXY_TCP = 12,
INTERCONNECT_SESSION_TCP = 13,
INTERCONNECT_COMMON = 171,
- SELF_PING_ACTOR = 207,
+ SELF_PING_ACTOR = 207,
TEST_ACTOR_RUNTIME = 283,
INTERCONNECT_HANDSHAKE = 284,
INTERCONNECT_POLLER = 285,
@@ -248,11 +248,11 @@ namespace NActors {
INTERCONNECT_PROXY_WRAPPER = 546,
};
- using EActivityType = EActorActivity;
- ui32 ActivityType;
-
+ using EActivityType = EActorActivity;
+ ui32 ActivityType;
+
protected:
- IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER)
+ IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER)
: StateFunc(stateFunc)
, SelfActorId(TActorId())
, ElapsedTicks(0)
@@ -288,7 +288,7 @@ namespace NActors {
}
protected:
- void SetActivityType(ui32 activityType) {
+ void SetActivityType(ui32 activityType) {
ActivityType = activityType;
}
@@ -338,7 +338,7 @@ namespace NActors {
void AddElapsedTicks(i64 ticks) {
ElapsedTicks += ticks;
}
- auto GetActivityType() const {
+ auto GetActivityType() const {
return ActivityType;
}
ui64 GetHandledEvents() const {
@@ -393,28 +393,28 @@ namespace NActors {
return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);
}
- template <typename TDerived>
- class TActor: public IActor {
- private:
- template <typename T, typename = const char*>
- struct HasActorName: std::false_type { };
- template <typename T>
- struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { };
-
+ template <typename TDerived>
+ class TActor: public IActor {
+ private:
+ template <typename T, typename = const char*>
+ struct HasActorName: std::false_type { };
+ template <typename T>
+ struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { };
+
static ui32 GetActivityTypeIndex() {
- if constexpr(HasActorName<TDerived>::value) {
- return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex();
- } else {
- using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType());
- // if constexpr(std::is_enum<TActorActivity>::value) {
- return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex(
- TDerived::ActorActivityType());
- //} else {
- // for int, ui32, ...
- // return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(
- // static_cast<IActor::EActorActivity>(TDerived::ActorActivityType()));
- //}
- }
+ if constexpr(HasActorName<TDerived>::value) {
+ return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex();
+ } else {
+ using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType());
+ // if constexpr(std::is_enum<TActorActivity>::value) {
+ return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex(
+ TDerived::ActorActivityType());
+ //} else {
+ // for int, ui32, ...
+ // return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(
+ // static_cast<IActor::EActorActivity>(TDerived::ActorActivityType()));
+ //}
+ }
}
protected:
@@ -423,17 +423,17 @@ namespace NActors {
return EActorActivity::OTHER;
} //*/
- // static constexpr char ActorName[] = "UNNAMED";
-
+ // static constexpr char ActorName[] = "UNNAMED";
+
TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx), ui32 activityType = GetActivityTypeIndex())
: IActor(static_cast<TReceiveFunc>(func), activityType)
- { }
+ { }
public:
typedef TDerived TThis;
};
-
+
#define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx
#define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
#define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx)
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 8f1450c434..6bcb768eaf 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -154,7 +154,7 @@ namespace NActors {
THolder<TActorCoroImpl> Impl;
public:
- TActorCoro(THolder<TActorCoroImpl> impl, ui32 activityType = IActor::ACTORLIB_COMMON)
+ TActorCoro(THolder<TActorCoroImpl> impl, ui32 activityType = IActor::ACTORLIB_COMMON)
: IActor(static_cast<TReceiveFunc>(&TActorCoro::StateFunc), activityType)
, Impl(std::move(impl))
{}
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index a5d43b4619..e1b765ec72 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> {
public:
- static constexpr auto ActorActivityType() {
+ static constexpr auto ActorActivityType() {
return ACTORLIB_COMMON;
}
@@ -525,12 +525,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
}
};
- struct TTestActor : TActorBootstrapped<TTestActor> {
- static constexpr char ActorName[] = "TestActor";
+ struct TTestActor : TActorBootstrapped<TTestActor> {
+ static constexpr char ActorName[] = "TestActor";
void Bootstrap()
{
- const auto& activityTypeIndex = GetActivityType();
+ const auto& activityTypeIndex = GetActivityType();
Y_ENSURE(activityTypeIndex < GetActivityTypeCount());
Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor");
PassAway();
@@ -566,13 +566,13 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
actorSystem.Stop();
UNIT_ASSERT(pongCounter == 2 && pingCounter == 2);
}
-
- Y_UNIT_TEST(LocalProcessKey) {
- static constexpr char ActorName[] = "TestActor";
-
- UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetName(IActor::INTERCONNECT_PROXY_TCP) == "INTERCONNECT_PROXY_TCP"));
-
- UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName));
- UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP));
- }
+
+ Y_UNIT_TEST(LocalProcessKey) {
+ static constexpr char ActorName[] = "TestActor";
+
+ UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetName(IActor::INTERCONNECT_PROXY_TCP) == "INTERCONNECT_PROXY_TCP"));
+
+ UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName));
+ UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP));
+ }
}
diff --git a/library/cpp/actors/core/invoke.h b/library/cpp/actors/core/invoke.h
index 106097abc9..931a9767dd 100644
--- a/library/cpp/actors/core/invoke.h
+++ b/library/cpp/actors/core/invoke.h
@@ -77,14 +77,14 @@ namespace NActors {
// actor's class. But `complete` handler is invoked in parent context and can use its contents. Do not forget to
// handle TEvInvokeResult event by calling Process/GetResult method, whichever is necessary.
- template<typename TCallback, typename TCompletion, ui32 Activity>
+ template<typename TCallback, typename TCompletion, ui32 Activity>
class TInvokeActor : public TActorBootstrapped<TInvokeActor<TCallback, TCompletion, Activity>> {
TCallback Callback;
TCompletion Complete;
public:
- static constexpr auto ActorActivityType() {
- return static_cast<IActor::EActorActivity>(Activity);
+ static constexpr auto ActorActivityType() {
+ return static_cast<IActor::EActorActivity>(Activity);
}
TInvokeActor(TCallback&& callback, TCompletion&& complete)
@@ -101,7 +101,7 @@ namespace NActors {
}
};
- template<ui32 Activity, typename TCallback, typename TCompletion>
+ template<ui32 Activity, typename TCallback, typename TCompletion>
std::unique_ptr<IActor> CreateInvokeActor(TCallback&& callback, TCompletion&& complete) {
return std::make_unique<TInvokeActor<std::decay_t<TCallback>, std::decay_t<TCompletion>, Activity>>(
std::forward<TCallback>(callback), std::forward<TCompletion>(complete));
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 9a5ab29e51..880a9d00db 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -100,7 +100,7 @@ SRCS(
)
GENERATE_ENUM_SERIALIZATION(defs.h)
-GENERATE_ENUM_SERIALIZATION(actor.h)
+GENERATE_ENUM_SERIALIZATION(actor.h)
PEERDIR(
library/cpp/actors/memory_log
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 78c842900e..61d0b45780 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -1,314 +1,314 @@
-#pragma once
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/executor_thread.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-
-#include <util/generic/vector.h>
-#include <util/generic/xrange.h>
-#include <util/string/printf.h>
-
-namespace NActors {
-
-// Periodically collects stats from executor threads and exposes them as mon counters
-class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
-private:
- struct THistogramCounters {
- void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
- for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
- TString bucketName = ToString(1ull<<i) + " " + unit;
- Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
- }
- Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
- }
-
- void Set(const TLogHistogram& data) {
- ui32 i = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
- *Buckets[i] = data.Buckets[i];
- ui64 last = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
- last += data.Buckets[i];
- *Buckets.back() = last;
- }
-
- void Set(const TLogHistogram& data, double factor) {
- ui32 i = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
- *Buckets[i] = data.Buckets[i]*factor;
- ui64 last = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
- last += data.Buckets[i];
- *Buckets.back() = last*factor;
- }
-
- private:
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
- };
-
- struct TActivityStats {
- void Init(NMonitoring::TDynamicCounterPtr group) {
- Group = group;
-
- ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
- ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
- ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
- ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
- }
-
- void Set(const TExecutorThreadStats& stats) {
- for (ui32 i : xrange(stats.MaxActivityType())) {
- Y_VERIFY(i < GetActivityTypeCount());
- ui64 ticks = stats.ElapsedTicksByActivity[i];
- ui64 events = stats.ReceivedEventsByActivity[i];
- ui64 actors = stats.ActorsAliveByActivity[i];
- ui64 scheduled = stats.ScheduledEventsByActivity[i];
-
- if (!ActorsAliveByActivityBuckets[i]) {
- if (ticks || events || actors || scheduled) {
- InitCountersForActivity(i);
- } else {
- continue;
- }
- }
-
- *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
- *ReceivedEventsByActivityBuckets[i] = events;
- *ActorsAliveByActivityBuckets[i] = actors;
- *ScheduledEventsByActivityBuckets[i] = scheduled;
- }
- }
-
- private:
- void InitCountersForActivity(ui32 activityType) {
- Y_VERIFY(activityType < GetActivityTypeCount());
-
- auto bucketName = TString(GetActivityTypeName(activityType));
-
- ElapsedMicrosecByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
- ReceivedEventsByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
- ActorsAliveByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
- ScheduledEventsByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
- }
-
- private:
- NMonitoring::TDynamicCounterPtr Group;
-
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
- };
-
- struct TExecutorPoolCounters {
- TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
-
- NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
- NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
- NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
- NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
- NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
- NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
- NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
- NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
- NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
- NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
- NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
- NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
- NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
- NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
- NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
-
- THistogramCounters LegacyActivationTimeHistogram;
- NMonitoring::THistogramPtr ActivationTimeHistogram;
- THistogramCounters LegacyEventDeliveryTimeHistogram;
- NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
- THistogramCounters LegacyEventProcessingCountHistogram;
- NMonitoring::THistogramPtr EventProcessingCountHistogram;
- THistogramCounters LegacyEventProcessingTimeHistogram;
- NMonitoring::THistogramPtr EventProcessingTimeHistogram;
-
- TActivityStats ActivityStats;
- NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
-
- double Usage = 0;
- double LastElapsedSeconds = 0;
- THPTimer UsageTimer;
- TString Name;
- ui32 Threads;
-
- void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
- LastElapsedSeconds = 0;
- Usage = 0;
- UsageTimer.Reset();
- Name = poolName;
- Threads = threads;
-
- PoolGroup = group->GetSubgroup("execpool", poolName);
-
- SentEvents = PoolGroup->GetCounter("SentEvents", true);
- ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true);
- PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true);
- NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true);
- DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true);
- CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true);
- ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true);
- ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true);
- EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
- ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true);
- ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false);
- AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false);
- MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
- MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
- MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
-
- LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
- ActivationTimeHistogram = PoolGroup->GetHistogram(
- "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
- LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
- EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
- "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
- LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
- EventProcessingCountHistogram = PoolGroup->GetHistogram(
- "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
- LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
- EventProcessingTimeHistogram = PoolGroup->GetHistogram(
- "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
-
- ActivityStats.Init(PoolGroup.Get());
-
- MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
- }
-
- void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) {
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- *SentEvents = stats.SentEvents;
- *ReceivedEvents = stats.ReceivedEvents;
- *PreemptedEvents = stats.PreemptedEvents;
- *NonDeliveredEvents = stats.NonDeliveredEvents;
- *DestroyedActors = stats.PoolDestroyedActors;
- *EmptyMailboxActivation = stats.EmptyMailboxActivation;
- *CpuMicrosec = stats.CpuNs / 1000;
- *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
- *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
- *ActorRegistrations = stats.PoolActorRegistrations;
- *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
- *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
- *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
- *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
- *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
-
- LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
- ActivationTimeHistogram->Reset();
- ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
-
- LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
- EventDeliveryTimeHistogram->Reset();
- EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
-
- LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
- EventProcessingCountHistogram->Reset();
- EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
-
- double toMicrosec = 1000000 / NHPTimer::GetClockRate();
- LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
- EventProcessingTimeHistogram->Reset();
- for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
- EventProcessingTimeHistogram->Collect(
- stats.EventProcessingTimeHistogram.UpperBound(i),
- stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
- }
-
- ActivityStats.Set(stats);
-
- *MaxUtilizationTime = poolStats.MaxUtilizationTime;
-
- double seconds = UsageTimer.PassedReset();
-
- // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
- const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks);
- const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0;
- LastElapsedSeconds = elapsed;
-
- // update usage factor according to smoothness
- const double smoothness = 0.5;
- Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
-#else
- Y_UNUSED(poolStats);
- Y_UNUSED(stats);
- Y_UNUSED(numThreads);
-#endif
- }
- };
-
-public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::ACTORLIB_STATS;
- }
-
- TStatsCollectingActor(
- ui32 intervalSec,
- const TActorSystemSetup& setup,
- NMonitoring::TDynamicCounterPtr counters)
- : IntervalSec(intervalSec)
- , Counters(counters)
- {
- PoolCounters.resize(setup.GetExecutorsCount());
- for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
- PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
- }
- }
-
- void Bootstrap(const TActorContext& ctx) {
- ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
- Become(&TThis::StateWork);
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- CFunc(TEvents::TSystem::Wakeup, Wakeup);
- }
- }
-
-private:
- virtual void OnWakeup(const TActorContext &ctx) {
- Y_UNUSED(ctx);
- }
-
- void Wakeup(const TActorContext &ctx) {
- for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
- TVector<TExecutorThreadStats> stats;
- TExecutorPoolStats poolStats;
- ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);
- SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);
- }
-
- OnWakeup(ctx);
-
- ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
- }
-
- void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) {
- // Sum all per-thread counters into the 0th element
- for (ui32 idx = 1; idx < stats.size(); ++idx) {
- stats[0].Aggregate(stats[idx]);
- }
- if (stats.size()) {
- poolCounters.Set(poolStats, stats[0], stats.size() - 1);
- }
- }
-
-protected:
- const ui32 IntervalSec;
- NMonitoring::TDynamicCounterPtr Counters;
-
- TVector<TExecutorPoolCounters> PoolCounters;
-};
-
-} // NActors
+#pragma once
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/executor_thread.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/xrange.h>
+#include <util/string/printf.h>
+
+namespace NActors {
+
+// Periodically collects stats from executor threads and exposes them as mon counters
+class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
+private:
+ struct THistogramCounters {
+ void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
+ for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
+ TString bucketName = ToString(1ull<<i) + " " + unit;
+ Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
+ }
+ Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
+ }
+
+ void Set(const TLogHistogram& data) {
+ ui32 i = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
+ *Buckets[i] = data.Buckets[i];
+ ui64 last = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
+ last += data.Buckets[i];
+ *Buckets.back() = last;
+ }
+
+ void Set(const TLogHistogram& data, double factor) {
+ ui32 i = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
+ *Buckets[i] = data.Buckets[i]*factor;
+ ui64 last = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
+ last += data.Buckets[i];
+ *Buckets.back() = last*factor;
+ }
+
+ private:
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
+ };
+
+ struct TActivityStats {
+ void Init(NMonitoring::TDynamicCounterPtr group) {
+ Group = group;
+
+ ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
+ ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
+ ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
+ ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
+ }
+
+ void Set(const TExecutorThreadStats& stats) {
+ for (ui32 i : xrange(stats.MaxActivityType())) {
+ Y_VERIFY(i < GetActivityTypeCount());
+ ui64 ticks = stats.ElapsedTicksByActivity[i];
+ ui64 events = stats.ReceivedEventsByActivity[i];
+ ui64 actors = stats.ActorsAliveByActivity[i];
+ ui64 scheduled = stats.ScheduledEventsByActivity[i];
+
+ if (!ActorsAliveByActivityBuckets[i]) {
+ if (ticks || events || actors || scheduled) {
+ InitCountersForActivity(i);
+ } else {
+ continue;
+ }
+ }
+
+ *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
+ *ReceivedEventsByActivityBuckets[i] = events;
+ *ActorsAliveByActivityBuckets[i] = actors;
+ *ScheduledEventsByActivityBuckets[i] = scheduled;
+ }
+ }
+
+ private:
+ void InitCountersForActivity(ui32 activityType) {
+ Y_VERIFY(activityType < GetActivityTypeCount());
+
+ auto bucketName = TString(GetActivityTypeName(activityType));
+
+ ElapsedMicrosecByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
+ ReceivedEventsByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+ ActorsAliveByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
+ ScheduledEventsByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+ }
+
+ private:
+ NMonitoring::TDynamicCounterPtr Group;
+
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
+ };
+
+ struct TExecutorPoolCounters {
+ TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
+
+ NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
+ NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
+ NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
+ NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
+ NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
+ NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
+ NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
+ NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
+ NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
+ NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
+ NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
+
+ THistogramCounters LegacyActivationTimeHistogram;
+ NMonitoring::THistogramPtr ActivationTimeHistogram;
+ THistogramCounters LegacyEventDeliveryTimeHistogram;
+ NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
+ THistogramCounters LegacyEventProcessingCountHistogram;
+ NMonitoring::THistogramPtr EventProcessingCountHistogram;
+ THistogramCounters LegacyEventProcessingTimeHistogram;
+ NMonitoring::THistogramPtr EventProcessingTimeHistogram;
+
+ TActivityStats ActivityStats;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
+
+ double Usage = 0;
+ double LastElapsedSeconds = 0;
+ THPTimer UsageTimer;
+ TString Name;
+ ui32 Threads;
+
+ void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
+ LastElapsedSeconds = 0;
+ Usage = 0;
+ UsageTimer.Reset();
+ Name = poolName;
+ Threads = threads;
+
+ PoolGroup = group->GetSubgroup("execpool", poolName);
+
+ SentEvents = PoolGroup->GetCounter("SentEvents", true);
+ ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true);
+ PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true);
+ NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true);
+ DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true);
+ CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true);
+ ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true);
+ ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true);
+ EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
+ ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true);
+ ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false);
+ AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false);
+ MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
+ MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
+ MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
+
+ LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
+ ActivationTimeHistogram = PoolGroup->GetHistogram(
+ "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+ LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
+ EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
+ "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+ LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
+ EventProcessingCountHistogram = PoolGroup->GetHistogram(
+ "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+ LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
+ EventProcessingTimeHistogram = PoolGroup->GetHistogram(
+ "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+
+ ActivityStats.Init(PoolGroup.Get());
+
+ MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
+ }
+
+ void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) {
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ *SentEvents = stats.SentEvents;
+ *ReceivedEvents = stats.ReceivedEvents;
+ *PreemptedEvents = stats.PreemptedEvents;
+ *NonDeliveredEvents = stats.NonDeliveredEvents;
+ *DestroyedActors = stats.PoolDestroyedActors;
+ *EmptyMailboxActivation = stats.EmptyMailboxActivation;
+ *CpuMicrosec = stats.CpuNs / 1000;
+ *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
+ *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
+ *ActorRegistrations = stats.PoolActorRegistrations;
+ *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
+ *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
+ *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
+ *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
+ *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
+
+ LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
+ ActivationTimeHistogram->Reset();
+ ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
+
+ LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
+ EventDeliveryTimeHistogram->Reset();
+ EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
+
+ LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
+ EventProcessingCountHistogram->Reset();
+ EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
+
+ double toMicrosec = 1000000 / NHPTimer::GetClockRate();
+ LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
+ EventProcessingTimeHistogram->Reset();
+ for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
+ EventProcessingTimeHistogram->Collect(
+ stats.EventProcessingTimeHistogram.UpperBound(i),
+ stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
+ }
+
+ ActivityStats.Set(stats);
+
+ *MaxUtilizationTime = poolStats.MaxUtilizationTime;
+
+ double seconds = UsageTimer.PassedReset();
+
+ // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
+ const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks);
+ const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0;
+ LastElapsedSeconds = elapsed;
+
+ // update usage factor according to smoothness
+ const double smoothness = 0.5;
+ Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
+#else
+ Y_UNUSED(poolStats);
+ Y_UNUSED(stats);
+ Y_UNUSED(numThreads);
+#endif
+ }
+ };
+
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ return IActor::ACTORLIB_STATS;
+ }
+
+ TStatsCollectingActor(
+ ui32 intervalSec,
+ const TActorSystemSetup& setup,
+ NMonitoring::TDynamicCounterPtr counters)
+ : IntervalSec(intervalSec)
+ , Counters(counters)
+ {
+ PoolCounters.resize(setup.GetExecutorsCount());
+ for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
+ PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ CFunc(TEvents::TSystem::Wakeup, Wakeup);
+ }
+ }
+
+private:
+ virtual void OnWakeup(const TActorContext &ctx) {
+ Y_UNUSED(ctx);
+ }
+
+ void Wakeup(const TActorContext &ctx) {
+ for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
+ TVector<TExecutorThreadStats> stats;
+ TExecutorPoolStats poolStats;
+ ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);
+ SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);
+ }
+
+ OnWakeup(ctx);
+
+ ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
+ }
+
+ void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) {
+ // Sum all per-thread counters into the 0th element
+ for (ui32 idx = 1; idx < stats.size(); ++idx) {
+ stats[0].Aggregate(stats[idx]);
+ }
+ if (stats.size()) {
+ poolCounters.Set(poolStats, stats[0], stats.size() - 1);
+ }
+ }
+
+protected:
+ const ui32 IntervalSec;
+ NMonitoring::TDynamicCounterPtr Counters;
+
+ TVector<TExecutorPoolCounters> PoolCounters;
+};
+
+} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp
index 7902ff6837..f9bfaf8dc0 100644
--- a/library/cpp/actors/helpers/selfping_actor.cpp
+++ b/library/cpp/actors/helpers/selfping_actor.cpp
@@ -10,14 +10,14 @@ namespace NActors {
namespace {
-struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> {
- TEvPing(double timeStart)
- : TimeStart(timeStart)
- {}
-
- const double TimeStart;
-};
-
+struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> {
+ TEvPing(double timeStart)
+ : TimeStart(timeStart)
+ {}
+
+ const double TimeStart;
+};
+
template <class TValueType_>
struct TAvgOperation {
struct TValueType {
@@ -70,8 +70,8 @@ private:
THPTimer Timer;
public:
- static constexpr auto ActorActivityType() {
- return SELF_PING_ACTOR;
+ static constexpr auto ActorActivityType() {
+ return SELF_PING_ACTOR;
}
TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter,
@@ -93,7 +93,7 @@ public:
STFUNC(RunningState)
{
switch (ev->GetTypeRewrite()) {
- HFunc(TEvPing, HandlePing);
+ HFunc(TEvPing, HandlePing);
default:
Y_FAIL("TSelfPingActor::RunningState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
}
@@ -146,7 +146,7 @@ public:
return (ui64)d;
}
- void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx)
+ void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx)
{
const auto now = ctx.Now();
const double hpNow = Timer.Passed();
@@ -166,7 +166,7 @@ public:
private:
void SchedulePing(const TActorContext &ctx, double hpNow) const
{
- ctx.Schedule(SendInterval, new TEvPing(hpNow));
+ ctx.Schedule(SendInterval, new TEvPing(hpNow));
}
};
diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h
index 4217fa4293..d7d07f9fa8 100644
--- a/library/cpp/actors/helpers/selfping_actor.h
+++ b/library/cpp/actors/helpers/selfping_actor.h
@@ -1,6 +1,6 @@
#pragma once
-#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NActors {
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index 3643397c0b..459635fa24 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -1,24 +1,24 @@
#include "selfping_actor.h"
#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/actors/testlib/test_runtime.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
namespace NActors {
namespace Tests {
-THolder<TTestActorRuntimeBase> CreateRuntime() {
- auto runtime = MakeHolder<TTestActorRuntimeBase>();
- runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; });
- runtime->Initialize();
- return runtime;
-}
-
+THolder<TTestActorRuntimeBase> CreateRuntime() {
+ auto runtime = MakeHolder<TTestActorRuntimeBase>();
+ runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; });
+ runtime->Initialize();
+ return runtime;
+}
+
Y_UNIT_TEST_SUITE(TSelfPingTest) {
Y_UNIT_TEST(Basic)
{
- auto runtime = CreateRuntime();
+ auto runtime = CreateRuntime();
- //const TActorId sender = runtime.AllocateEdgeActor();
+ //const TActorId sender = runtime.AllocateEdgeActor();
NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr());
NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr());
@@ -30,10 +30,10 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);
UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0);
- const TActorId actorId = runtime->Register(actor);
- Y_UNUSED(actorId);
+ const TActorId actorId = runtime->Register(actor);
+ Y_UNUSED(actorId);
- //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0)));
+ //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0)));
// TODO check after events are handled
//Sleep(TDuration::Seconds(1));
diff --git a/library/cpp/actors/helpers/ut/ya.make b/library/cpp/actors/helpers/ut/ya.make
index 933544d0e7..cba4d6d1d9 100644
--- a/library/cpp/actors/helpers/ut/ya.make
+++ b/library/cpp/actors/helpers/ut/ya.make
@@ -1,36 +1,36 @@
-UNITTEST_FOR(library/cpp/actors/helpers)
-
-OWNER(
- alexvru
- g:kikimr
-)
-
-FORK_SUBTESTS()
-IF (SANITIZER_TYPE)
- SIZE(LARGE)
- TIMEOUT(1200)
- TAG(ya:fat)
- SPLIT_FACTOR(20)
- REQUIREMENTS(
- ram:32
- )
-ELSE()
- SIZE(MEDIUM)
- TIMEOUT(600)
- REQUIREMENTS(
- ram:16
- )
-ENDIF()
-
-
-PEERDIR(
- library/cpp/actors/interconnect
- library/cpp/actors/testlib
- library/cpp/actors/core
-)
-
-SRCS(
- selfping_actor_ut.cpp
-)
-
-END()
+UNITTEST_FOR(library/cpp/actors/helpers)
+
+OWNER(
+ alexvru
+ g:kikimr
+)
+
+FORK_SUBTESTS()
+IF (SANITIZER_TYPE)
+ SIZE(LARGE)
+ TIMEOUT(1200)
+ TAG(ya:fat)
+ SPLIT_FACTOR(20)
+ REQUIREMENTS(
+ ram:32
+ )
+ELSE()
+ SIZE(MEDIUM)
+ TIMEOUT(600)
+ REQUIREMENTS(
+ ram:16
+ )
+ENDIF()
+
+
+PEERDIR(
+ library/cpp/actors/interconnect
+ library/cpp/actors/testlib
+ library/cpp/actors/core
+)
+
+SRCS(
+ selfping_actor_ut.cpp
+)
+
+END()
diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make
index 10cf704d82..d8771179de 100644
--- a/library/cpp/actors/helpers/ya.make
+++ b/library/cpp/actors/helpers/ya.make
@@ -9,7 +9,7 @@ SRCS(
flow_controlled_queue.h
future_callback.h
mon_histogram_helper.h
- selfping_actor.cpp
+ selfping_actor.cpp
)
PEERDIR(
@@ -18,8 +18,8 @@ PEERDIR(
)
END()
-
-RECURSE_FOR_TESTS(
- ut
-)
-
+
+RECURSE_FOR_TESTS(
+ ut
+)
+
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index fae41db0b3..b95c994598 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -7,21 +7,21 @@
#include "interconnect_common.h"
namespace NActors {
- TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket)
+ TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket)
: TActor(&TThis::Initial)
, TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data()))
, Address(address.c_str(), port)
- , Listener(
- socket
- ? new NInterconnect::TStreamSocket(*socket)
- : nullptr)
- , ExternalSocket(!!Listener)
+ , Listener(
+ socket
+ ? new NInterconnect::TStreamSocket(*socket)
+ : nullptr)
+ , ExternalSocket(!!Listener)
, ProxyCommonCtx(std::move(common))
- {
- if (ExternalSocket) {
- SetNonBlock(*Listener);
- }
- }
+ {
+ if (ExternalSocket) {
+ SetNonBlock(*Listener);
+ }
+ }
TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
@@ -102,7 +102,7 @@ namespace NActors {
ctx.Register(CreateIncomingHandshakeActor(ProxyCommonCtx, std::move(socket)));
continue;
} else if (-r != EAGAIN && -r != EWOULDBLOCK) {
- Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket);
+ Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket);
LOG_ERROR_IC("ICL06", "Listen failed: %s (%s)", strerror(-r), Address.ToString().data());
Listener.Reset();
PollerToken.Reset();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h
index aab8e97091..fc71073c2d 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h
@@ -15,7 +15,7 @@ namespace NActors {
return INTERCONNECT_COMMON;
}
- TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing());
+ TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing());
int Bind();
private:
@@ -45,7 +45,7 @@ namespace NActors {
const NInterconnect::TAddress Address;
TIntrusivePtr<NInterconnect::TStreamSocket> Listener;
- const bool ExternalSocket;
+ const bool ExternalSocket;
TPollerToken::TPtr PollerToken;
TInterconnectProxyCommon::TPtr const ProxyCommonCtx;
};
diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h
index 61d627bf28..172f08fc73 100644
--- a/library/cpp/actors/util/local_process_key.h
+++ b/library/cpp/actors/util/local_process_key.h
@@ -1,19 +1,19 @@
#pragma once
-#include <util/string/builder.h>
+#include <util/string/builder.h>
#include <util/generic/strbuf.h>
#include <util/generic/vector.h>
#include <util/generic/hash.h>
#include <util/generic/singleton.h>
-#include <util/generic/serialized_enum.h>
+#include <util/generic/serialized_enum.h>
template <typename T>
class TLocalProcessKeyState {
template <typename U, const char* Name>
friend class TLocalProcessKey;
-template <typename U, typename EnumT>
-friend class TEnumProcessKey;
+template <typename U, typename EnumT>
+friend class TEnumProcessKey;
public:
static TLocalProcessKeyState& GetInstance() {
@@ -21,17 +21,17 @@ public:
}
size_t GetCount() const {
- return StartIndex + Names.size();
+ return StartIndex + Names.size();
}
TStringBuf GetNameByIndex(size_t index) const {
- if (index < StartIndex) {
- return StaticNames[index];
- } else {
- index -= StartIndex;
- Y_ENSURE(index < Names.size());
- return Names[index];
- }
+ if (index < StartIndex) {
+ return StaticNames[index];
+ } else {
+ index -= StartIndex;
+ Y_ENSURE(index < Names.size());
+ return Names[index];
+ }
}
size_t GetIndexByName(TStringBuf name) const {
@@ -42,7 +42,7 @@ public:
private:
size_t Register(TStringBuf name) {
- auto x = Map.emplace(name, Names.size()+StartIndex);
+ auto x = Map.emplace(name, Names.size()+StartIndex);
if (x.second) {
Names.emplace_back(name);
}
@@ -50,29 +50,29 @@ private:
return x.first->second;
}
- size_t Register(TStringBuf name, ui32 index) {
- Y_VERIFY(index < StartIndex);
- auto x = Map.emplace(name, index);
- Y_VERIFY(x.second || x.first->second == index);
- StaticNames[index] = name;
- return x.first->second;
- }
-
+ size_t Register(TStringBuf name, ui32 index) {
+ Y_VERIFY(index < StartIndex);
+ auto x = Map.emplace(name, index);
+ Y_VERIFY(x.second || x.first->second == index);
+ StaticNames[index] = name;
+ return x.first->second;
+ }
+
private:
- static constexpr ui32 StartIndex = 2000;
-
- TVector<TString> FillStaticNames() {
- TVector<TString> staticNames;
- staticNames.reserve(StartIndex);
- for (ui32 i = 0; i < StartIndex; i++) {
- staticNames.push_back(TStringBuilder() << "Activity_" << i);
- }
- return staticNames;
- }
-
- TVector<TString> StaticNames = FillStaticNames();
- TVector<TString> Names;
- THashMap<TString, size_t> Map;
+ static constexpr ui32 StartIndex = 2000;
+
+ TVector<TString> FillStaticNames() {
+ TVector<TString> staticNames;
+ staticNames.reserve(StartIndex);
+ for (ui32 i = 0; i < StartIndex; i++) {
+ staticNames.push_back(TStringBuilder() << "Activity_" << i);
+ }
+ return staticNames;
+ }
+
+ TVector<TString> StaticNames = FillStaticNames();
+ TVector<TString> Names;
+ THashMap<TString, size_t> Map;
};
template <typename T, const char* Name>
@@ -89,44 +89,44 @@ public:
private:
inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name);
};
-
-template <typename T, typename EnumT>
-class TEnumProcessKey {
-public:
- static TStringBuf GetName(EnumT key) {
- return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key));
- }
-
- static size_t GetIndex(EnumT key) {
- ui32 index = static_cast<ui32>(key);
- if (index < TLocalProcessKeyState<T>::StartIndex) {
- return index;
- }
- Y_VERIFY(index < Enum2Index.size());
- return Enum2Index[index];
- }
-
-private:
- inline static TVector<size_t> RegisterAll() {
- static_assert(std::is_enum<EnumT>::value, "Enum is required");
-
- TVector<size_t> enum2Index;
- auto names = GetEnumNames<EnumT>();
- ui32 maxId = 0;
- for (const auto& [k, v] : names) {
- maxId = Max(maxId, static_cast<ui32>(k));
- }
- enum2Index.resize(maxId+1);
- for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) {
- enum2Index[i] = i;
- }
-
- for (const auto& [k, v] : names) {
- ui32 enumId = static_cast<ui32>(k);
- enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId);
- }
- return enum2Index;
- }
-
- inline static TVector<size_t> Enum2Index = RegisterAll();
-};
+
+template <typename T, typename EnumT>
+class TEnumProcessKey {
+public:
+ static TStringBuf GetName(EnumT key) {
+ return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key));
+ }
+
+ static size_t GetIndex(EnumT key) {
+ ui32 index = static_cast<ui32>(key);
+ if (index < TLocalProcessKeyState<T>::StartIndex) {
+ return index;
+ }
+ Y_VERIFY(index < Enum2Index.size());
+ return Enum2Index[index];
+ }
+
+private:
+ inline static TVector<size_t> RegisterAll() {
+ static_assert(std::is_enum<EnumT>::value, "Enum is required");
+
+ TVector<size_t> enum2Index;
+ auto names = GetEnumNames<EnumT>();
+ ui32 maxId = 0;
+ for (const auto& [k, v] : names) {
+ maxId = Max(maxId, static_cast<ui32>(k));
+ }
+ enum2Index.resize(maxId+1);
+ for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) {
+ enum2Index[i] = i;
+ }
+
+ for (const auto& [k, v] : names) {
+ ui32 enumId = static_cast<ui32>(k);
+ enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId);
+ }
+ return enum2Index;
+ }
+
+ inline static TVector<size_t> Enum2Index = RegisterAll();
+};
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 8863c80832..7437b7a8f5 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -56,22 +56,22 @@ void TGRpcServer::Start() {
using grpc::ServerBuilder;
using grpc::ResourceQuota;
ServerBuilder builder;
- auto credentials = grpc::InsecureServerCredentials();
- if (Options_.SslData) {
+ auto credentials = grpc::InsecureServerCredentials();
+ if (Options_.SslData) {
grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
keycert.cert_chain = std::move(Options_.SslData->Cert);
keycert.private_key = std::move(Options_.SslData->Key);
grpc::SslServerCredentialsOptions sslOps;
sslOps.pem_root_certs = std::move(Options_.SslData->Root);
sslOps.pem_key_cert_pairs.push_back(keycert);
- credentials = grpc::SslServerCredentials(sslOps);
- }
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
- ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
- credentials
- ));
- } else {
+ credentials = grpc::SslServerCredentials(sslOps);
+ }
+ if (Options_.ExternalListener) {
+ Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
+ ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
+ credentials
+ ));
+ } else {
builder.AddListeningPort(server_address, credentials);
}
builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
@@ -171,10 +171,10 @@ void TGRpcServer::Start() {
}));
}
}
-
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Start();
- }
+
+ if (Options_.ExternalListener) {
+ Options_.ExternalListener->Start();
+ }
}
void TGRpcServer::Stop() {
@@ -223,10 +223,10 @@ void TGRpcServer::Stop() {
}
Ts.clear();
-
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Stop();
- }
+
+ if (Options_.ExternalListener) {
+ Options_.ExternalListener->Stop();
+ }
}
ui16 TGRpcServer::GetPort() const {
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 9cb55e3154..d6814a90a0 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -27,15 +27,15 @@ struct TSslData {
TString Root;
};
-struct IExternalListener
- : public TThrRefBase
-{
- using TPtr = TIntrusivePtr<IExternalListener>;
- virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0;
- virtual void Start() = 0;
- virtual void Stop() = 0;
-};
-
+struct IExternalListener
+ : public TThrRefBase
+{
+ using TPtr = TIntrusivePtr<IExternalListener>;
+ virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0;
+ virtual void Start() = 0;
+ virtual void Stop() = 0;
+};
+
//! Server's options.
struct TServerOptions {
#define DECLARE_FIELD(name, type, default) \
@@ -93,8 +93,8 @@ struct TServerOptions {
//! Custom configurator for ServerBuilder.
DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){});
- DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);
-
+ DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);
+
//! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
DECLARE_FIELD(Logger, TLoggerPtr, nullptr);