diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-02-10 16:46:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:39 +0300 |
commit | 4a6816dea1bcaee46ce29a51a5fd7d3495012858 (patch) | |
tree | d98d6003e190b9e761bd77a83cd98428f9657b35 /library | |
parent | 7aa4cf700385ff96999c5cc301171ff157974773 (diff) | |
download | ydb-4a6816dea1bcaee46ce29a51a5fd7d3495012858.tar.gz |
Restoring authorship annotation for <aozeritsky@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/core/actor.h | 64 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_ut.cpp | 26 | ||||
-rw-r--r-- | library/cpp/actors/core/invoke.h | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 628 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor.cpp | 26 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor_ut.cpp | 26 | ||||
-rw-r--r-- | library/cpp/actors/helpers/ut/ya.make | 72 | ||||
-rw-r--r-- | library/cpp/actors/helpers/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_server.cpp | 24 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_server.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/util/local_process_key.h | 152 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 36 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 22 |
16 files changed, 553 insertions, 553 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index ed29bd14b9..68177a25bf 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -233,7 +233,7 @@ namespace NActors { INTERCONNECT_PROXY_TCP = 12, INTERCONNECT_SESSION_TCP = 13, INTERCONNECT_COMMON = 171, - SELF_PING_ACTOR = 207, + SELF_PING_ACTOR = 207, TEST_ACTOR_RUNTIME = 283, INTERCONNECT_HANDSHAKE = 284, INTERCONNECT_POLLER = 285, @@ -248,11 +248,11 @@ namespace NActors { INTERCONNECT_PROXY_WRAPPER = 546, }; - using EActivityType = EActorActivity; - ui32 ActivityType; - + using EActivityType = EActorActivity; + ui32 ActivityType; + protected: - IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER) + IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER) : StateFunc(stateFunc) , SelfActorId(TActorId()) , ElapsedTicks(0) @@ -288,7 +288,7 @@ namespace NActors { } protected: - void SetActivityType(ui32 activityType) { + void SetActivityType(ui32 activityType) { ActivityType = activityType; } @@ -338,7 +338,7 @@ namespace NActors { void AddElapsedTicks(i64 ticks) { ElapsedTicks += ticks; } - auto GetActivityType() const { + auto GetActivityType() const { return ActivityType; } ui64 GetHandledEvents() const { @@ -393,28 +393,28 @@ namespace NActors { return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index); } - template <typename TDerived> - class TActor: public IActor { - private: - template <typename T, typename = const char*> - struct HasActorName: std::false_type { }; - template <typename T> - struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { }; - + template <typename TDerived> + class TActor: public IActor { + private: + template <typename T, typename = const char*> + struct HasActorName: std::false_type { }; + template <typename T> + struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { }; + static ui32 GetActivityTypeIndex() { - if constexpr(HasActorName<TDerived>::value) { - return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex(); - } else { - using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType()); - // if constexpr(std::is_enum<TActorActivity>::value) { - return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex( - TDerived::ActorActivityType()); - //} else { - // for int, ui32, ... - // return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex( - // static_cast<IActor::EActorActivity>(TDerived::ActorActivityType())); - //} - } + if constexpr(HasActorName<TDerived>::value) { + return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex(); + } else { + using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType()); + // if constexpr(std::is_enum<TActorActivity>::value) { + return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex( + TDerived::ActorActivityType()); + //} else { + // for int, ui32, ... + // return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex( + // static_cast<IActor::EActorActivity>(TDerived::ActorActivityType())); + //} + } } protected: @@ -423,17 +423,17 @@ namespace NActors { return EActorActivity::OTHER; } //*/ - // static constexpr char ActorName[] = "UNNAMED"; - + // static constexpr char ActorName[] = "UNNAMED"; + TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx), ui32 activityType = GetActivityTypeIndex()) : IActor(static_cast<TReceiveFunc>(func), activityType) - { } + { } public: typedef TDerived TThis; }; - + #define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev #define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx) diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 6bcb768eaf..8f1450c434 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -154,7 +154,7 @@ namespace NActors { THolder<TActorCoroImpl> Impl; public: - TActorCoro(THolder<TActorCoroImpl> impl, ui32 activityType = IActor::ACTORLIB_COMMON) + TActorCoro(THolder<TActorCoroImpl> impl, ui32 activityType = IActor::ACTORLIB_COMMON) : IActor(static_cast<TReceiveFunc>(&TActorCoro::StateFunc), activityType) , Impl(std::move(impl)) {} diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index e1b765ec72..a5d43b4619 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> { public: - static constexpr auto ActorActivityType() { + static constexpr auto ActorActivityType() { return ACTORLIB_COMMON; } @@ -525,12 +525,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) { } }; - struct TTestActor : TActorBootstrapped<TTestActor> { - static constexpr char ActorName[] = "TestActor"; + struct TTestActor : TActorBootstrapped<TTestActor> { + static constexpr char ActorName[] = "TestActor"; void Bootstrap() { - const auto& activityTypeIndex = GetActivityType(); + const auto& activityTypeIndex = GetActivityType(); Y_ENSURE(activityTypeIndex < GetActivityTypeCount()); Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor"); PassAway(); @@ -566,13 +566,13 @@ Y_UNIT_TEST_SUITE(TestDecorator) { actorSystem.Stop(); UNIT_ASSERT(pongCounter == 2 && pingCounter == 2); } - - Y_UNIT_TEST(LocalProcessKey) { - static constexpr char ActorName[] = "TestActor"; - - UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetName(IActor::INTERCONNECT_PROXY_TCP) == "INTERCONNECT_PROXY_TCP")); - - UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName)); - UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP)); - } + + Y_UNIT_TEST(LocalProcessKey) { + static constexpr char ActorName[] = "TestActor"; + + UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetName(IActor::INTERCONNECT_PROXY_TCP) == "INTERCONNECT_PROXY_TCP")); + + UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName)); + UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP)); + } } diff --git a/library/cpp/actors/core/invoke.h b/library/cpp/actors/core/invoke.h index 931a9767dd..106097abc9 100644 --- a/library/cpp/actors/core/invoke.h +++ b/library/cpp/actors/core/invoke.h @@ -77,14 +77,14 @@ namespace NActors { // actor's class. But `complete` handler is invoked in parent context and can use its contents. Do not forget to // handle TEvInvokeResult event by calling Process/GetResult method, whichever is necessary. - template<typename TCallback, typename TCompletion, ui32 Activity> + template<typename TCallback, typename TCompletion, ui32 Activity> class TInvokeActor : public TActorBootstrapped<TInvokeActor<TCallback, TCompletion, Activity>> { TCallback Callback; TCompletion Complete; public: - static constexpr auto ActorActivityType() { - return static_cast<IActor::EActorActivity>(Activity); + static constexpr auto ActorActivityType() { + return static_cast<IActor::EActorActivity>(Activity); } TInvokeActor(TCallback&& callback, TCompletion&& complete) @@ -101,7 +101,7 @@ namespace NActors { } }; - template<ui32 Activity, typename TCallback, typename TCompletion> + template<ui32 Activity, typename TCallback, typename TCompletion> std::unique_ptr<IActor> CreateInvokeActor(TCallback&& callback, TCompletion&& complete) { return std::make_unique<TInvokeActor<std::decay_t<TCallback>, std::decay_t<TCompletion>, Activity>>( std::forward<TCallback>(callback), std::forward<TCompletion>(complete)); diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 880a9d00db..9a5ab29e51 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -100,7 +100,7 @@ SRCS( ) GENERATE_ENUM_SERIALIZATION(defs.h) -GENERATE_ENUM_SERIALIZATION(actor.h) +GENERATE_ENUM_SERIALIZATION(actor.h) PEERDIR( library/cpp/actors/memory_log diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 61d0b45780..78c842900e 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -1,314 +1,314 @@ -#pragma once - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/executor_thread.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> - -#include <util/generic/vector.h> -#include <util/generic/xrange.h> -#include <util/string/printf.h> - -namespace NActors { - -// Periodically collects stats from executor threads and exposes them as mon counters -class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> { -private: - struct THistogramCounters { - void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) { - for (size_t i = 0; (1ull<<i) <= maxVal; ++i) { - TString bucketName = ToString(1ull<<i) + " " + unit; - Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true)); - } - Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true)); - } - - void Set(const TLogHistogram& data) { - ui32 i = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) - *Buckets[i] = data.Buckets[i]; - ui64 last = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) - last += data.Buckets[i]; - *Buckets.back() = last; - } - - void Set(const TLogHistogram& data, double factor) { - ui32 i = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) - *Buckets[i] = data.Buckets[i]*factor; - ui64 last = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) - last += data.Buckets[i]; - *Buckets.back() = last*factor; - } - - private: - TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets; - }; - - struct TActivityStats { - void Init(NMonitoring::TDynamicCounterPtr group) { - Group = group; - - ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount()); - ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount()); - ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); - ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); - } - - void Set(const TExecutorThreadStats& stats) { - for (ui32 i : xrange(stats.MaxActivityType())) { - Y_VERIFY(i < GetActivityTypeCount()); - ui64 ticks = stats.ElapsedTicksByActivity[i]; - ui64 events = stats.ReceivedEventsByActivity[i]; - ui64 actors = stats.ActorsAliveByActivity[i]; - ui64 scheduled = stats.ScheduledEventsByActivity[i]; - - if (!ActorsAliveByActivityBuckets[i]) { - if (ticks || events || actors || scheduled) { - InitCountersForActivity(i); - } else { - continue; - } - } - - *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000; - *ReceivedEventsByActivityBuckets[i] = events; - *ActorsAliveByActivityBuckets[i] = actors; - *ScheduledEventsByActivityBuckets[i] = scheduled; - } - } - - private: - void InitCountersForActivity(ui32 activityType) { - Y_VERIFY(activityType < GetActivityTypeCount()); - - auto bucketName = TString(GetActivityTypeName(activityType)); - - ElapsedMicrosecByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true); - ReceivedEventsByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true); - ActorsAliveByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false); - ScheduledEventsByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); - } - - private: - NMonitoring::TDynamicCounterPtr Group; - - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets; - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets; - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets; - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets; - }; - - struct TExecutorPoolCounters { - TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup; - - NMonitoring::TDynamicCounters::TCounterPtr SentEvents; - NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents; - NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents; - NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents; - NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors; - NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation; - NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec; - NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; - NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec; - NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations; - NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive; - NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes; - NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; - NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; - NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; - - THistogramCounters LegacyActivationTimeHistogram; - NMonitoring::THistogramPtr ActivationTimeHistogram; - THistogramCounters LegacyEventDeliveryTimeHistogram; - NMonitoring::THistogramPtr EventDeliveryTimeHistogram; - THistogramCounters LegacyEventProcessingCountHistogram; - NMonitoring::THistogramPtr EventProcessingCountHistogram; - THistogramCounters LegacyEventProcessingTimeHistogram; - NMonitoring::THistogramPtr EventProcessingTimeHistogram; - - TActivityStats ActivityStats; - NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime; - - double Usage = 0; - double LastElapsedSeconds = 0; - THPTimer UsageTimer; - TString Name; - ui32 Threads; - - void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { - LastElapsedSeconds = 0; - Usage = 0; - UsageTimer.Reset(); - Name = poolName; - Threads = threads; - - PoolGroup = group->GetSubgroup("execpool", poolName); - - SentEvents = PoolGroup->GetCounter("SentEvents", true); - ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true); - PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true); - NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true); - DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true); - CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true); - ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true); - ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true); - EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true); - ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true); - ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false); - AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false); - MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); - MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); - MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); - - LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); - ActivationTimeHistogram = PoolGroup->GetHistogram( - "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000); - EventDeliveryTimeHistogram = PoolGroup->GetHistogram( - "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000); - EventProcessingCountHistogram = PoolGroup->GetHistogram( - "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000); - EventProcessingTimeHistogram = PoolGroup->GetHistogram( - "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - - ActivityStats.Init(PoolGroup.Get()); - - MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true); - } - - void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) { -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - *SentEvents = stats.SentEvents; - *ReceivedEvents = stats.ReceivedEvents; - *PreemptedEvents = stats.PreemptedEvents; - *NonDeliveredEvents = stats.NonDeliveredEvents; - *DestroyedActors = stats.PoolDestroyedActors; - *EmptyMailboxActivation = stats.EmptyMailboxActivation; - *CpuMicrosec = stats.CpuNs / 1000; - *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; - *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; - *ActorRegistrations = stats.PoolActorRegistrations; - *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors; - *AllocatedMailboxes = stats.PoolAllocatedMailboxes; - *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; - *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; - *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; - - LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); - ActivationTimeHistogram->Reset(); - ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); - - LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram); - EventDeliveryTimeHistogram->Reset(); - EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram); - - LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram); - EventProcessingCountHistogram->Reset(); - EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); - - double toMicrosec = 1000000 / NHPTimer::GetClockRate(); - LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); - EventProcessingTimeHistogram->Reset(); - for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) { - EventProcessingTimeHistogram->Collect( - stats.EventProcessingTimeHistogram.UpperBound(i), - stats.EventProcessingTimeHistogram.Value(i) * toMicrosec); - } - - ActivityStats.Set(stats); - - *MaxUtilizationTime = poolStats.MaxUtilizationTime; - - double seconds = UsageTimer.PassedReset(); - - // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916 - const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks); - const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0; - LastElapsedSeconds = elapsed; - - // update usage factor according to smoothness - const double smoothness = 0.5; - Usage = currentUsage * smoothness + Usage * (1.0 - smoothness); -#else - Y_UNUSED(poolStats); - Y_UNUSED(stats); - Y_UNUSED(numThreads); -#endif - } - }; - -public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::ACTORLIB_STATS; - } - - TStatsCollectingActor( - ui32 intervalSec, - const TActorSystemSetup& setup, - NMonitoring::TDynamicCounterPtr counters) - : IntervalSec(intervalSec) - , Counters(counters) - { - PoolCounters.resize(setup.GetExecutorsCount()); - for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { - PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); - } - } - - void Bootstrap(const TActorContext& ctx) { - ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); - Become(&TThis::StateWork); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - CFunc(TEvents::TSystem::Wakeup, Wakeup); - } - } - -private: - virtual void OnWakeup(const TActorContext &ctx) { - Y_UNUSED(ctx); - } - - void Wakeup(const TActorContext &ctx) { - for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { - TVector<TExecutorThreadStats> stats; - TExecutorPoolStats poolStats; - ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); - SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); - } - - OnWakeup(ctx); - - ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); - } - - void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) { - // Sum all per-thread counters into the 0th element - for (ui32 idx = 1; idx < stats.size(); ++idx) { - stats[0].Aggregate(stats[idx]); - } - if (stats.size()) { - poolCounters.Set(poolStats, stats[0], stats.size() - 1); - } - } - -protected: - const ui32 IntervalSec; - NMonitoring::TDynamicCounterPtr Counters; - - TVector<TExecutorPoolCounters> PoolCounters; -}; - -} // NActors +#pragma once + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/vector.h> +#include <util/generic/xrange.h> +#include <util/string/printf.h> + +namespace NActors { + +// Periodically collects stats from executor threads and exposes them as mon counters +class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> { +private: + struct THistogramCounters { + void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) { + for (size_t i = 0; (1ull<<i) <= maxVal; ++i) { + TString bucketName = ToString(1ull<<i) + " " + unit; + Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true)); + } + Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true)); + } + + void Set(const TLogHistogram& data) { + ui32 i = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) + *Buckets[i] = data.Buckets[i]; + ui64 last = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) + last += data.Buckets[i]; + *Buckets.back() = last; + } + + void Set(const TLogHistogram& data, double factor) { + ui32 i = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) + *Buckets[i] = data.Buckets[i]*factor; + ui64 last = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) + last += data.Buckets[i]; + *Buckets.back() = last*factor; + } + + private: + TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets; + }; + + struct TActivityStats { + void Init(NMonitoring::TDynamicCounterPtr group) { + Group = group; + + ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount()); + ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount()); + ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); + ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); + } + + void Set(const TExecutorThreadStats& stats) { + for (ui32 i : xrange(stats.MaxActivityType())) { + Y_VERIFY(i < GetActivityTypeCount()); + ui64 ticks = stats.ElapsedTicksByActivity[i]; + ui64 events = stats.ReceivedEventsByActivity[i]; + ui64 actors = stats.ActorsAliveByActivity[i]; + ui64 scheduled = stats.ScheduledEventsByActivity[i]; + + if (!ActorsAliveByActivityBuckets[i]) { + if (ticks || events || actors || scheduled) { + InitCountersForActivity(i); + } else { + continue; + } + } + + *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000; + *ReceivedEventsByActivityBuckets[i] = events; + *ActorsAliveByActivityBuckets[i] = actors; + *ScheduledEventsByActivityBuckets[i] = scheduled; + } + } + + private: + void InitCountersForActivity(ui32 activityType) { + Y_VERIFY(activityType < GetActivityTypeCount()); + + auto bucketName = TString(GetActivityTypeName(activityType)); + + ElapsedMicrosecByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true); + ReceivedEventsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true); + ActorsAliveByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false); + ScheduledEventsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); + } + + private: + NMonitoring::TDynamicCounterPtr Group; + + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets; + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets; + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets; + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets; + }; + + struct TExecutorPoolCounters { + TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup; + + NMonitoring::TDynamicCounters::TCounterPtr SentEvents; + NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents; + NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents; + NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents; + NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors; + NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation; + NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations; + NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive; + NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; + + THistogramCounters LegacyActivationTimeHistogram; + NMonitoring::THistogramPtr ActivationTimeHistogram; + THistogramCounters LegacyEventDeliveryTimeHistogram; + NMonitoring::THistogramPtr EventDeliveryTimeHistogram; + THistogramCounters LegacyEventProcessingCountHistogram; + NMonitoring::THistogramPtr EventProcessingCountHistogram; + THistogramCounters LegacyEventProcessingTimeHistogram; + NMonitoring::THistogramPtr EventProcessingTimeHistogram; + + TActivityStats ActivityStats; + NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime; + + double Usage = 0; + double LastElapsedSeconds = 0; + THPTimer UsageTimer; + TString Name; + ui32 Threads; + + void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { + LastElapsedSeconds = 0; + Usage = 0; + UsageTimer.Reset(); + Name = poolName; + Threads = threads; + + PoolGroup = group->GetSubgroup("execpool", poolName); + + SentEvents = PoolGroup->GetCounter("SentEvents", true); + ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true); + PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true); + NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true); + DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true); + CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true); + ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true); + ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true); + EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true); + ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true); + ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false); + AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false); + MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); + MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); + MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); + + LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); + ActivationTimeHistogram = PoolGroup->GetHistogram( + "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000); + EventDeliveryTimeHistogram = PoolGroup->GetHistogram( + "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000); + EventProcessingCountHistogram = PoolGroup->GetHistogram( + "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000); + EventProcessingTimeHistogram = PoolGroup->GetHistogram( + "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + + ActivityStats.Init(PoolGroup.Get()); + + MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true); + } + + void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + *SentEvents = stats.SentEvents; + *ReceivedEvents = stats.ReceivedEvents; + *PreemptedEvents = stats.PreemptedEvents; + *NonDeliveredEvents = stats.NonDeliveredEvents; + *DestroyedActors = stats.PoolDestroyedActors; + *EmptyMailboxActivation = stats.EmptyMailboxActivation; + *CpuMicrosec = stats.CpuNs / 1000; + *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; + *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; + *ActorRegistrations = stats.PoolActorRegistrations; + *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors; + *AllocatedMailboxes = stats.PoolAllocatedMailboxes; + *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; + *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; + *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; + + LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); + ActivationTimeHistogram->Reset(); + ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); + + LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram); + EventDeliveryTimeHistogram->Reset(); + EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram); + + LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram); + EventProcessingCountHistogram->Reset(); + EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); + + double toMicrosec = 1000000 / NHPTimer::GetClockRate(); + LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); + EventProcessingTimeHistogram->Reset(); + for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) { + EventProcessingTimeHistogram->Collect( + stats.EventProcessingTimeHistogram.UpperBound(i), + stats.EventProcessingTimeHistogram.Value(i) * toMicrosec); + } + + ActivityStats.Set(stats); + + *MaxUtilizationTime = poolStats.MaxUtilizationTime; + + double seconds = UsageTimer.PassedReset(); + + // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916 + const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks); + const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0; + LastElapsedSeconds = elapsed; + + // update usage factor according to smoothness + const double smoothness = 0.5; + Usage = currentUsage * smoothness + Usage * (1.0 - smoothness); +#else + Y_UNUSED(poolStats); + Y_UNUSED(stats); + Y_UNUSED(numThreads); +#endif + } + }; + +public: + static constexpr IActor::EActivityType ActorActivityType() { + return IActor::ACTORLIB_STATS; + } + + TStatsCollectingActor( + ui32 intervalSec, + const TActorSystemSetup& setup, + NMonitoring::TDynamicCounterPtr counters) + : IntervalSec(intervalSec) + , Counters(counters) + { + PoolCounters.resize(setup.GetExecutorsCount()); + for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { + PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); + } + } + + void Bootstrap(const TActorContext& ctx) { + ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + CFunc(TEvents::TSystem::Wakeup, Wakeup); + } + } + +private: + virtual void OnWakeup(const TActorContext &ctx) { + Y_UNUSED(ctx); + } + + void Wakeup(const TActorContext &ctx) { + for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { + TVector<TExecutorThreadStats> stats; + TExecutorPoolStats poolStats; + ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); + SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); + } + + OnWakeup(ctx); + + ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); + } + + void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) { + // Sum all per-thread counters into the 0th element + for (ui32 idx = 1; idx < stats.size(); ++idx) { + stats[0].Aggregate(stats[idx]); + } + if (stats.size()) { + poolCounters.Set(poolStats, stats[0], stats.size() - 1); + } + } + +protected: + const ui32 IntervalSec; + NMonitoring::TDynamicCounterPtr Counters; + + TVector<TExecutorPoolCounters> PoolCounters; +}; + +} // NActors diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index f9bfaf8dc0..7902ff6837 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -10,14 +10,14 @@ namespace NActors { namespace { -struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> { - TEvPing(double timeStart) - : TimeStart(timeStart) - {} - - const double TimeStart; -}; - +struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> { + TEvPing(double timeStart) + : TimeStart(timeStart) + {} + + const double TimeStart; +}; + template <class TValueType_> struct TAvgOperation { struct TValueType { @@ -70,8 +70,8 @@ private: THPTimer Timer; public: - static constexpr auto ActorActivityType() { - return SELF_PING_ACTOR; + static constexpr auto ActorActivityType() { + return SELF_PING_ACTOR; } TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, @@ -93,7 +93,7 @@ public: STFUNC(RunningState) { switch (ev->GetTypeRewrite()) { - HFunc(TEvPing, HandlePing); + HFunc(TEvPing, HandlePing); default: Y_FAIL("TSelfPingActor::RunningState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); } @@ -146,7 +146,7 @@ public: return (ui64)d; } - void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx) + void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx) { const auto now = ctx.Now(); const double hpNow = Timer.Passed(); @@ -166,7 +166,7 @@ public: private: void SchedulePing(const TActorContext &ctx, double hpNow) const { - ctx.Schedule(SendInterval, new TEvPing(hpNow)); + ctx.Schedule(SendInterval, new TEvPing(hpNow)); } }; diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h index d7d07f9fa8..4217fa4293 100644 --- a/library/cpp/actors/helpers/selfping_actor.h +++ b/library/cpp/actors/helpers/selfping_actor.h @@ -1,6 +1,6 @@ #pragma once -#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor.h> #include <library/cpp/monlib/dynamic_counters/counters.h> namespace NActors { diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index 459635fa24..3643397c0b 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -1,24 +1,24 @@ #include "selfping_actor.h" #include <library/cpp/testing/unittest/registar.h> -#include <library/cpp/actors/testlib/test_runtime.h> +#include <library/cpp/actors/testlib/test_runtime.h> namespace NActors { namespace Tests { -THolder<TTestActorRuntimeBase> CreateRuntime() { - auto runtime = MakeHolder<TTestActorRuntimeBase>(); - runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; }); - runtime->Initialize(); - return runtime; -} - +THolder<TTestActorRuntimeBase> CreateRuntime() { + auto runtime = MakeHolder<TTestActorRuntimeBase>(); + runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; }); + runtime->Initialize(); + return runtime; +} + Y_UNIT_TEST_SUITE(TSelfPingTest) { Y_UNIT_TEST(Basic) { - auto runtime = CreateRuntime(); + auto runtime = CreateRuntime(); - //const TActorId sender = runtime.AllocateEdgeActor(); + //const TActorId sender = runtime.AllocateEdgeActor(); NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr()); NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr()); @@ -30,10 +30,10 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0); UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); - const TActorId actorId = runtime->Register(actor); - Y_UNUSED(actorId); + const TActorId actorId = runtime->Register(actor); + Y_UNUSED(actorId); - //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0))); + //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0))); // TODO check after events are handled //Sleep(TDuration::Seconds(1)); diff --git a/library/cpp/actors/helpers/ut/ya.make b/library/cpp/actors/helpers/ut/ya.make index cba4d6d1d9..933544d0e7 100644 --- a/library/cpp/actors/helpers/ut/ya.make +++ b/library/cpp/actors/helpers/ut/ya.make @@ -1,36 +1,36 @@ -UNITTEST_FOR(library/cpp/actors/helpers) - -OWNER( - alexvru - g:kikimr -) - -FORK_SUBTESTS() -IF (SANITIZER_TYPE) - SIZE(LARGE) - TIMEOUT(1200) - TAG(ya:fat) - SPLIT_FACTOR(20) - REQUIREMENTS( - ram:32 - ) -ELSE() - SIZE(MEDIUM) - TIMEOUT(600) - REQUIREMENTS( - ram:16 - ) -ENDIF() - - -PEERDIR( - library/cpp/actors/interconnect - library/cpp/actors/testlib - library/cpp/actors/core -) - -SRCS( - selfping_actor_ut.cpp -) - -END() +UNITTEST_FOR(library/cpp/actors/helpers) + +OWNER( + alexvru + g:kikimr +) + +FORK_SUBTESTS() +IF (SANITIZER_TYPE) + SIZE(LARGE) + TIMEOUT(1200) + TAG(ya:fat) + SPLIT_FACTOR(20) + REQUIREMENTS( + ram:32 + ) +ELSE() + SIZE(MEDIUM) + TIMEOUT(600) + REQUIREMENTS( + ram:16 + ) +ENDIF() + + +PEERDIR( + library/cpp/actors/interconnect + library/cpp/actors/testlib + library/cpp/actors/core +) + +SRCS( + selfping_actor_ut.cpp +) + +END() diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make index d8771179de..10cf704d82 100644 --- a/library/cpp/actors/helpers/ya.make +++ b/library/cpp/actors/helpers/ya.make @@ -9,7 +9,7 @@ SRCS( flow_controlled_queue.h future_callback.h mon_histogram_helper.h - selfping_actor.cpp + selfping_actor.cpp ) PEERDIR( @@ -18,8 +18,8 @@ PEERDIR( ) END() - -RECURSE_FOR_TESTS( - ut -) - + +RECURSE_FOR_TESTS( + ut +) + diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index b95c994598..fae41db0b3 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -7,21 +7,21 @@ #include "interconnect_common.h" namespace NActors { - TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket) + TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket) : TActor(&TThis::Initial) , TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data())) , Address(address.c_str(), port) - , Listener( - socket - ? new NInterconnect::TStreamSocket(*socket) - : nullptr) - , ExternalSocket(!!Listener) + , Listener( + socket + ? new NInterconnect::TStreamSocket(*socket) + : nullptr) + , ExternalSocket(!!Listener) , ProxyCommonCtx(std::move(common)) - { - if (ExternalSocket) { - SetNonBlock(*Listener); - } - } + { + if (ExternalSocket) { + SetNonBlock(*Listener); + } + } TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); @@ -102,7 +102,7 @@ namespace NActors { ctx.Register(CreateIncomingHandshakeActor(ProxyCommonCtx, std::move(socket))); continue; } else if (-r != EAGAIN && -r != EWOULDBLOCK) { - Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket); + Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket); LOG_ERROR_IC("ICL06", "Listen failed: %s (%s)", strerror(-r), Address.ToString().data()); Listener.Reset(); PollerToken.Reset(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h index fc71073c2d..aab8e97091 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h @@ -15,7 +15,7 @@ namespace NActors { return INTERCONNECT_COMMON; } - TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing()); + TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing()); int Bind(); private: @@ -45,7 +45,7 @@ namespace NActors { const NInterconnect::TAddress Address; TIntrusivePtr<NInterconnect::TStreamSocket> Listener; - const bool ExternalSocket; + const bool ExternalSocket; TPollerToken::TPtr PollerToken; TInterconnectProxyCommon::TPtr const ProxyCommonCtx; }; diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h index 172f08fc73..61d627bf28 100644 --- a/library/cpp/actors/util/local_process_key.h +++ b/library/cpp/actors/util/local_process_key.h @@ -1,19 +1,19 @@ #pragma once -#include <util/string/builder.h> +#include <util/string/builder.h> #include <util/generic/strbuf.h> #include <util/generic/vector.h> #include <util/generic/hash.h> #include <util/generic/singleton.h> -#include <util/generic/serialized_enum.h> +#include <util/generic/serialized_enum.h> template <typename T> class TLocalProcessKeyState { template <typename U, const char* Name> friend class TLocalProcessKey; -template <typename U, typename EnumT> -friend class TEnumProcessKey; +template <typename U, typename EnumT> +friend class TEnumProcessKey; public: static TLocalProcessKeyState& GetInstance() { @@ -21,17 +21,17 @@ public: } size_t GetCount() const { - return StartIndex + Names.size(); + return StartIndex + Names.size(); } TStringBuf GetNameByIndex(size_t index) const { - if (index < StartIndex) { - return StaticNames[index]; - } else { - index -= StartIndex; - Y_ENSURE(index < Names.size()); - return Names[index]; - } + if (index < StartIndex) { + return StaticNames[index]; + } else { + index -= StartIndex; + Y_ENSURE(index < Names.size()); + return Names[index]; + } } size_t GetIndexByName(TStringBuf name) const { @@ -42,7 +42,7 @@ public: private: size_t Register(TStringBuf name) { - auto x = Map.emplace(name, Names.size()+StartIndex); + auto x = Map.emplace(name, Names.size()+StartIndex); if (x.second) { Names.emplace_back(name); } @@ -50,29 +50,29 @@ private: return x.first->second; } - size_t Register(TStringBuf name, ui32 index) { - Y_VERIFY(index < StartIndex); - auto x = Map.emplace(name, index); - Y_VERIFY(x.second || x.first->second == index); - StaticNames[index] = name; - return x.first->second; - } - + size_t Register(TStringBuf name, ui32 index) { + Y_VERIFY(index < StartIndex); + auto x = Map.emplace(name, index); + Y_VERIFY(x.second || x.first->second == index); + StaticNames[index] = name; + return x.first->second; + } + private: - static constexpr ui32 StartIndex = 2000; - - TVector<TString> FillStaticNames() { - TVector<TString> staticNames; - staticNames.reserve(StartIndex); - for (ui32 i = 0; i < StartIndex; i++) { - staticNames.push_back(TStringBuilder() << "Activity_" << i); - } - return staticNames; - } - - TVector<TString> StaticNames = FillStaticNames(); - TVector<TString> Names; - THashMap<TString, size_t> Map; + static constexpr ui32 StartIndex = 2000; + + TVector<TString> FillStaticNames() { + TVector<TString> staticNames; + staticNames.reserve(StartIndex); + for (ui32 i = 0; i < StartIndex; i++) { + staticNames.push_back(TStringBuilder() << "Activity_" << i); + } + return staticNames; + } + + TVector<TString> StaticNames = FillStaticNames(); + TVector<TString> Names; + THashMap<TString, size_t> Map; }; template <typename T, const char* Name> @@ -89,44 +89,44 @@ public: private: inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name); }; - -template <typename T, typename EnumT> -class TEnumProcessKey { -public: - static TStringBuf GetName(EnumT key) { - return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key)); - } - - static size_t GetIndex(EnumT key) { - ui32 index = static_cast<ui32>(key); - if (index < TLocalProcessKeyState<T>::StartIndex) { - return index; - } - Y_VERIFY(index < Enum2Index.size()); - return Enum2Index[index]; - } - -private: - inline static TVector<size_t> RegisterAll() { - static_assert(std::is_enum<EnumT>::value, "Enum is required"); - - TVector<size_t> enum2Index; - auto names = GetEnumNames<EnumT>(); - ui32 maxId = 0; - for (const auto& [k, v] : names) { - maxId = Max(maxId, static_cast<ui32>(k)); - } - enum2Index.resize(maxId+1); - for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) { - enum2Index[i] = i; - } - - for (const auto& [k, v] : names) { - ui32 enumId = static_cast<ui32>(k); - enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId); - } - return enum2Index; - } - - inline static TVector<size_t> Enum2Index = RegisterAll(); -}; + +template <typename T, typename EnumT> +class TEnumProcessKey { +public: + static TStringBuf GetName(EnumT key) { + return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key)); + } + + static size_t GetIndex(EnumT key) { + ui32 index = static_cast<ui32>(key); + if (index < TLocalProcessKeyState<T>::StartIndex) { + return index; + } + Y_VERIFY(index < Enum2Index.size()); + return Enum2Index[index]; + } + +private: + inline static TVector<size_t> RegisterAll() { + static_assert(std::is_enum<EnumT>::value, "Enum is required"); + + TVector<size_t> enum2Index; + auto names = GetEnumNames<EnumT>(); + ui32 maxId = 0; + for (const auto& [k, v] : names) { + maxId = Max(maxId, static_cast<ui32>(k)); + } + enum2Index.resize(maxId+1); + for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) { + enum2Index[i] = i; + } + + for (const auto& [k, v] : names) { + ui32 enumId = static_cast<ui32>(k); + enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId); + } + return enum2Index; + } + + inline static TVector<size_t> Enum2Index = RegisterAll(); +}; diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 7437b7a8f5..8863c80832 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -56,22 +56,22 @@ void TGRpcServer::Start() { using grpc::ServerBuilder; using grpc::ResourceQuota; ServerBuilder builder; - auto credentials = grpc::InsecureServerCredentials(); - if (Options_.SslData) { + auto credentials = grpc::InsecureServerCredentials(); + if (Options_.SslData) { grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; keycert.cert_chain = std::move(Options_.SslData->Cert); keycert.private_key = std::move(Options_.SslData->Key); grpc::SslServerCredentialsOptions sslOps; sslOps.pem_root_certs = std::move(Options_.SslData->Root); sslOps.pem_key_cert_pairs.push_back(keycert); - credentials = grpc::SslServerCredentials(sslOps); - } - if (Options_.ExternalListener) { - Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor( - ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD, - credentials - )); - } else { + credentials = grpc::SslServerCredentials(sslOps); + } + if (Options_.ExternalListener) { + Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor( + ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD, + credentials + )); + } else { builder.AddListeningPort(server_address, credentials); } builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); @@ -171,10 +171,10 @@ void TGRpcServer::Start() { })); } } - - if (Options_.ExternalListener) { - Options_.ExternalListener->Start(); - } + + if (Options_.ExternalListener) { + Options_.ExternalListener->Start(); + } } void TGRpcServer::Stop() { @@ -223,10 +223,10 @@ void TGRpcServer::Stop() { } Ts.clear(); - - if (Options_.ExternalListener) { - Options_.ExternalListener->Stop(); - } + + if (Options_.ExternalListener) { + Options_.ExternalListener->Stop(); + } } ui16 TGRpcServer::GetPort() const { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index d6814a90a0..9cb55e3154 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -27,15 +27,15 @@ struct TSslData { TString Root; }; -struct IExternalListener - : public TThrRefBase -{ - using TPtr = TIntrusivePtr<IExternalListener>; - virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0; - virtual void Start() = 0; - virtual void Stop() = 0; -}; - +struct IExternalListener + : public TThrRefBase +{ + using TPtr = TIntrusivePtr<IExternalListener>; + virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0; + virtual void Start() = 0; + virtual void Stop() = 0; +}; + //! Server's options. struct TServerOptions { #define DECLARE_FIELD(name, type, default) \ @@ -93,8 +93,8 @@ struct TServerOptions { //! Custom configurator for ServerBuilder. DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){}); - DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr); - + DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr); + //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). DECLARE_FIELD(Logger, TLoggerPtr, nullptr); |