diff options
| author | aozeritsky <[email protected]> | 2022-02-10 16:46:40 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:46:40 +0300 | 
| commit | b036a557f285146e5e35d4213e29a094ab907bcf (patch) | |
| tree | 49e222ea1c5804306084bb3ae065bb702625360f /library/cpp | |
| parent | 4a6816dea1bcaee46ce29a51a5fd7d3495012858 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/actors/core/actor.h | 64 | ||||
| -rw-r--r-- | library/cpp/actors/core/actor_coroutine.h | 2 | ||||
| -rw-r--r-- | library/cpp/actors/core/actor_ut.cpp | 26 | ||||
| -rw-r--r-- | library/cpp/actors/core/invoke.h | 8 | ||||
| -rw-r--r-- | library/cpp/actors/core/ya.make | 2 | ||||
| -rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 628 | ||||
| -rw-r--r-- | library/cpp/actors/helpers/selfping_actor.cpp | 26 | ||||
| -rw-r--r-- | library/cpp/actors/helpers/selfping_actor.h | 2 | ||||
| -rw-r--r-- | library/cpp/actors/helpers/selfping_actor_ut.cpp | 26 | ||||
| -rw-r--r-- | library/cpp/actors/helpers/ut/ya.make | 72 | ||||
| -rw-r--r-- | library/cpp/actors/helpers/ya.make | 12 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_server.cpp | 24 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_server.h | 4 | ||||
| -rw-r--r-- | library/cpp/actors/util/local_process_key.h | 152 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 36 | ||||
| -rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 22 | 
16 files changed, 553 insertions, 553 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 68177a25bf6..ed29bd14b9e 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -233,7 +233,7 @@ namespace NActors {              INTERCONNECT_PROXY_TCP = 12,              INTERCONNECT_SESSION_TCP = 13,              INTERCONNECT_COMMON = 171, -            SELF_PING_ACTOR = 207,  +            SELF_PING_ACTOR = 207,              TEST_ACTOR_RUNTIME = 283,              INTERCONNECT_HANDSHAKE = 284,              INTERCONNECT_POLLER = 285, @@ -248,11 +248,11 @@ namespace NActors {              INTERCONNECT_PROXY_WRAPPER = 546,          }; -        using EActivityType = EActorActivity;  -        ui32 ActivityType;  -  +        using EActivityType = EActorActivity; +        ui32 ActivityType; +      protected: -        IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER)  +        IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER)              : StateFunc(stateFunc)              , SelfActorId(TActorId())              , ElapsedTicks(0) @@ -288,7 +288,7 @@ namespace NActors {          }      protected: -        void SetActivityType(ui32 activityType) {  +        void SetActivityType(ui32 activityType) {              ActivityType = activityType;          } @@ -338,7 +338,7 @@ namespace NActors {          void AddElapsedTicks(i64 ticks) {              ElapsedTicks += ticks;          } -        auto GetActivityType() const {  +        auto GetActivityType() const {              return ActivityType;          }          ui64 GetHandledEvents() const { @@ -393,28 +393,28 @@ namespace NActors {          return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);      } -    template <typename TDerived>  -    class TActor: public IActor {  -    private:  -        template <typename T, typename = const char*>  -        struct HasActorName: std::false_type { };  -        template <typename T>  -        struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { };  -  +    template <typename TDerived> +    class TActor: public IActor { +    private: +        template <typename T, typename = const char*> +        struct HasActorName: std::false_type { }; +        template <typename T> +        struct HasActorName<T, decltype((void)T::ActorName, (const char*)nullptr)>: std::true_type { }; +          static ui32 GetActivityTypeIndex() { -            if constexpr(HasActorName<TDerived>::value) {  -                return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex();  -            } else {  -                using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType());  -                // if constexpr(std::is_enum<TActorActivity>::value) {  -                    return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex(  -                    TDerived::ActorActivityType());  -                //} else {  -                    // for int, ui32, ...  -                //    return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(  -                //        static_cast<IActor::EActorActivity>(TDerived::ActorActivityType()));  -                //}  -            }  +            if constexpr(HasActorName<TDerived>::value) { +                return TLocalProcessKey<TActorActivityTag, TDerived::ActorName>::GetIndex(); +            } else { +                using TActorActivity = decltype(((TDerived*)nullptr)->ActorActivityType()); +                // if constexpr(std::is_enum<TActorActivity>::value) { +                    return TEnumProcessKey<TActorActivityTag, TActorActivity>::GetIndex( +                    TDerived::ActorActivityType()); +                //} else { +                    // for int, ui32, ... +                //    return TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex( +                //        static_cast<IActor::EActorActivity>(TDerived::ActorActivityType())); +                //} +            }          }      protected: @@ -423,17 +423,17 @@ namespace NActors {              return EActorActivity::OTHER;          } //*/ -        // static constexpr char ActorName[] = "UNNAMED";  -  +        // static constexpr char ActorName[] = "UNNAMED"; +          TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx), ui32 activityType = GetActivityTypeIndex())              : IActor(static_cast<TReceiveFunc>(func), activityType) -        { }  +        { }      public:          typedef TDerived TThis;      }; -  +  #define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx  #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev  #define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx) diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 8f1450c4349..6bcb768eafc 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -154,7 +154,7 @@ namespace NActors {          THolder<TActorCoroImpl> Impl;      public: -        TActorCoro(THolder<TActorCoroImpl> impl, ui32 activityType = IActor::ACTORLIB_COMMON)  +        TActorCoro(THolder<TActorCoroImpl> impl, ui32 activityType = IActor::ACTORLIB_COMMON)              : IActor(static_cast<TReceiveFunc>(&TActorCoro::StateFunc), activityType)              , Impl(std::move(impl))          {} diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index a5d43b4619f..e1b765ec72a 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {      class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> {      public: -        static constexpr auto ActorActivityType() {  +        static constexpr auto ActorActivityType() {              return ACTORLIB_COMMON;          } @@ -525,12 +525,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) {          }      }; -    struct TTestActor : TActorBootstrapped<TTestActor> {  -        static constexpr char ActorName[] = "TestActor";  +    struct TTestActor : TActorBootstrapped<TTestActor> { +        static constexpr char ActorName[] = "TestActor";          void Bootstrap()          { -            const auto& activityTypeIndex = GetActivityType();  +            const auto& activityTypeIndex = GetActivityType();              Y_ENSURE(activityTypeIndex < GetActivityTypeCount());              Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor");              PassAway(); @@ -566,13 +566,13 @@ Y_UNIT_TEST_SUITE(TestDecorator) {          actorSystem.Stop();          UNIT_ASSERT(pongCounter == 2 && pingCounter == 2);      } -  -    Y_UNIT_TEST(LocalProcessKey) {  -        static constexpr char ActorName[] = "TestActor";  -  -        UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetName(IActor::INTERCONNECT_PROXY_TCP) == "INTERCONNECT_PROXY_TCP"));  -  -        UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName));  -        UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP));  -    }  + +    Y_UNIT_TEST(LocalProcessKey) { +        static constexpr char ActorName[] = "TestActor"; + +        UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetName(IActor::INTERCONNECT_PROXY_TCP) == "INTERCONNECT_PROXY_TCP")); + +        UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName)); +        UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP)); +    }  } diff --git a/library/cpp/actors/core/invoke.h b/library/cpp/actors/core/invoke.h index 106097abc96..931a9767ddd 100644 --- a/library/cpp/actors/core/invoke.h +++ b/library/cpp/actors/core/invoke.h @@ -77,14 +77,14 @@ namespace NActors {      // actor's class. But `complete` handler is invoked in parent context and can use its contents. Do not forget to      // handle TEvInvokeResult event by calling Process/GetResult method, whichever is necessary. -    template<typename TCallback, typename TCompletion, ui32 Activity>  +    template<typename TCallback, typename TCompletion, ui32 Activity>      class TInvokeActor : public TActorBootstrapped<TInvokeActor<TCallback, TCompletion, Activity>> {          TCallback Callback;          TCompletion Complete;      public: -        static constexpr auto ActorActivityType() {  -            return static_cast<IActor::EActorActivity>(Activity);  +        static constexpr auto ActorActivityType() { +            return static_cast<IActor::EActorActivity>(Activity);          }          TInvokeActor(TCallback&& callback, TCompletion&& complete) @@ -101,7 +101,7 @@ namespace NActors {          }      }; -    template<ui32 Activity, typename TCallback, typename TCompletion>  +    template<ui32 Activity, typename TCallback, typename TCompletion>      std::unique_ptr<IActor> CreateInvokeActor(TCallback&& callback, TCompletion&& complete) {          return std::make_unique<TInvokeActor<std::decay_t<TCallback>, std::decay_t<TCompletion>, Activity>>(              std::forward<TCallback>(callback), std::forward<TCompletion>(complete)); diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 9a5ab29e510..880a9d00dba 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -100,7 +100,7 @@ SRCS(  )  GENERATE_ENUM_SERIALIZATION(defs.h) -GENERATE_ENUM_SERIALIZATION(actor.h)  +GENERATE_ENUM_SERIALIZATION(actor.h)  PEERDIR(      library/cpp/actors/memory_log diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 78c842900e1..61d0b457803 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -1,314 +1,314 @@ -#pragma once  -  -#include <library/cpp/actors/core/actor_bootstrapped.h>  -#include <library/cpp/actors/core/actorsystem.h>  -#include <library/cpp/actors/core/executor_thread.h>  -#include <library/cpp/actors/core/hfunc.h>  -#include <library/cpp/monlib/dynamic_counters/counters.h>  -  -#include <util/generic/vector.h>  -#include <util/generic/xrange.h>  -#include <util/string/printf.h>  -  -namespace NActors {  -  -// Periodically collects stats from executor threads and exposes them as mon counters  -class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {  -private:  -    struct THistogramCounters {  -        void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {  -            for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {  -                TString bucketName = ToString(1ull<<i) + " " + unit;  -                Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));  -            }  -            Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));  -        }  -  -        void Set(const TLogHistogram& data) {  -            ui32 i = 0;  -            for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)  -                *Buckets[i] = data.Buckets[i];  -            ui64 last = 0;  -            for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)  -                last += data.Buckets[i];  -            *Buckets.back() = last;  -        }  -  -        void Set(const TLogHistogram& data, double factor) {  -            ui32 i = 0;  -            for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)  -                *Buckets[i] = data.Buckets[i]*factor;  -            ui64 last = 0;  -            for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)  -                last += data.Buckets[i];  -            *Buckets.back() = last*factor;  -        }  -  -    private:  -        TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;  -    };  -  -    struct TActivityStats {  -        void Init(NMonitoring::TDynamicCounterPtr group) {  -            Group = group;  -  -            ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());  -            ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());  -            ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());  -            ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());  -        }  -  -        void Set(const TExecutorThreadStats& stats) {  -            for (ui32 i : xrange(stats.MaxActivityType())) {  -                Y_VERIFY(i < GetActivityTypeCount());  -                ui64 ticks = stats.ElapsedTicksByActivity[i];  -                ui64 events = stats.ReceivedEventsByActivity[i];  -                ui64 actors = stats.ActorsAliveByActivity[i];  -                ui64 scheduled = stats.ScheduledEventsByActivity[i];  -  -                if (!ActorsAliveByActivityBuckets[i]) {  -                    if (ticks || events || actors || scheduled) {  -                        InitCountersForActivity(i);  -                    } else {  -                        continue;  -                    }  -                }  -  -                *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;  -                *ReceivedEventsByActivityBuckets[i] = events;  -                *ActorsAliveByActivityBuckets[i] = actors;  -                *ScheduledEventsByActivityBuckets[i] = scheduled;  -            }  -        }  -  -    private:  -        void InitCountersForActivity(ui32 activityType) {  -            Y_VERIFY(activityType < GetActivityTypeCount());  -  -            auto bucketName = TString(GetActivityTypeName(activityType));  -  -            ElapsedMicrosecByActivityBuckets[activityType] =  -                Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);  -            ReceivedEventsByActivityBuckets[activityType] =  -                Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);  -            ActorsAliveByActivityBuckets[activityType] =  -                Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);  -            ScheduledEventsByActivityBuckets[activityType] =  -                Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);  -        }  -  -    private:  -        NMonitoring::TDynamicCounterPtr Group;  -  -        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;  -        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;  -        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;  -        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;  -    };  -  -    struct TExecutorPoolCounters {  -        TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;  -  -        NMonitoring::TDynamicCounters::TCounterPtr SentEvents;  -        NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;  -        NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;  -        NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;  -        NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;  -        NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;  -        NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;  -        NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;  -        NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;  -        NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;  -        NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;  -        NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;  -        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;  -        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;  -        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;  -  -        THistogramCounters LegacyActivationTimeHistogram;  -        NMonitoring::THistogramPtr ActivationTimeHistogram;  -        THistogramCounters LegacyEventDeliveryTimeHistogram;  -        NMonitoring::THistogramPtr EventDeliveryTimeHistogram;  -        THistogramCounters LegacyEventProcessingCountHistogram;  -        NMonitoring::THistogramPtr EventProcessingCountHistogram;  -        THistogramCounters LegacyEventProcessingTimeHistogram;  -        NMonitoring::THistogramPtr EventProcessingTimeHistogram;  -  -        TActivityStats ActivityStats;  -        NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;  -  -        double Usage = 0;  -        double LastElapsedSeconds = 0;  -        THPTimer UsageTimer;  -        TString Name;  -        ui32 Threads;  -  -        void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {  -            LastElapsedSeconds = 0;  -            Usage = 0;  -            UsageTimer.Reset();  -            Name = poolName;  -            Threads = threads;  -  -            PoolGroup = group->GetSubgroup("execpool", poolName);  -  -            SentEvents          = PoolGroup->GetCounter("SentEvents", true);  -            ReceivedEvents      = PoolGroup->GetCounter("ReceivedEvents", true);  -            PreemptedEvents     = PoolGroup->GetCounter("PreemptedEvents", true);  -            NonDeliveredEvents  = PoolGroup->GetCounter("NonDeliveredEvents", true);  -            DestroyedActors     = PoolGroup->GetCounter("DestroyedActors", true);  -            CpuMicrosec         = PoolGroup->GetCounter("CpuMicrosec", true);  -            ElapsedMicrosec     = PoolGroup->GetCounter("ElapsedMicrosec", true);  -            ParkedMicrosec      = PoolGroup->GetCounter("ParkedMicrosec", true);  -            EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);  -            ActorRegistrations  = PoolGroup->GetCounter("ActorRegistrations", true);  -            ActorsAlive         = PoolGroup->GetCounter("ActorsAlive", false);  -            AllocatedMailboxes  = PoolGroup->GetCounter("AllocatedMailboxes", false);  -            MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);  -            MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);  -            MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);  -  -            LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);  -            ActivationTimeHistogram = PoolGroup->GetHistogram(  -                "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));  -            LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);  -            EventDeliveryTimeHistogram = PoolGroup->GetHistogram(  -                "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));  -            LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);  -            EventProcessingCountHistogram = PoolGroup->GetHistogram(  -                "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));  -            LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);  -            EventProcessingTimeHistogram = PoolGroup->GetHistogram(  -                "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));  -  -            ActivityStats.Init(PoolGroup.Get());  -  -            MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);  -        }  -  -        void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) {  -#ifdef ACTORSLIB_COLLECT_EXEC_STATS  -            *SentEvents         = stats.SentEvents;  -            *ReceivedEvents     = stats.ReceivedEvents;  -            *PreemptedEvents     = stats.PreemptedEvents;  -            *NonDeliveredEvents = stats.NonDeliveredEvents;  -            *DestroyedActors    = stats.PoolDestroyedActors;  -            *EmptyMailboxActivation = stats.EmptyMailboxActivation;  -            *CpuMicrosec        = stats.CpuNs / 1000;  -            *ElapsedMicrosec    = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;  -            *ParkedMicrosec     = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;  -            *ActorRegistrations = stats.PoolActorRegistrations;  -            *ActorsAlive        = stats.PoolActorRegistrations - stats.PoolDestroyedActors;  -            *AllocatedMailboxes = stats.PoolAllocatedMailboxes;  -            *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;  -            *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;  -            *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;  -  -            LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);  -            ActivationTimeHistogram->Reset();  -            ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);  -  -            LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);  -            EventDeliveryTimeHistogram->Reset();  -            EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);  -  -            LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);  -            EventProcessingCountHistogram->Reset();  -            EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);  -  -            double toMicrosec = 1000000 / NHPTimer::GetClockRate();  -            LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);  -            EventProcessingTimeHistogram->Reset();  -            for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {  -                EventProcessingTimeHistogram->Collect(  -                    stats.EventProcessingTimeHistogram.UpperBound(i),  -                    stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);  -            }  -  -            ActivityStats.Set(stats);  -  -            *MaxUtilizationTime = poolStats.MaxUtilizationTime;  -  -            double seconds = UsageTimer.PassedReset();  -  -            // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916  -            const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks);  -            const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0;  -            LastElapsedSeconds = elapsed;  -  -            // update usage factor according to smoothness  -            const double smoothness = 0.5;  -            Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);  -#else  -            Y_UNUSED(poolStats);  -            Y_UNUSED(stats);  -            Y_UNUSED(numThreads);  -#endif  -        }  -    };  -  -public:  -    static constexpr IActor::EActivityType ActorActivityType() {  -        return IActor::ACTORLIB_STATS;  -    }  -  -    TStatsCollectingActor(  -            ui32 intervalSec,  -            const TActorSystemSetup& setup,  -            NMonitoring::TDynamicCounterPtr counters)  -        : IntervalSec(intervalSec)  -        , Counters(counters)  -    {  -        PoolCounters.resize(setup.GetExecutorsCount());  -        for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {  -            PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));  -        }  -    }  -  -    void Bootstrap(const TActorContext& ctx) {  -        ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());  -        Become(&TThis::StateWork);  -    }  -  -    STFUNC(StateWork) {  -        switch (ev->GetTypeRewrite()) {  -            CFunc(TEvents::TSystem::Wakeup, Wakeup);  -        }  -    }  -  -private:  -    virtual void OnWakeup(const TActorContext &ctx) {  -        Y_UNUSED(ctx);  -    }  -  -    void Wakeup(const TActorContext &ctx) {  -        for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {  -            TVector<TExecutorThreadStats> stats;  -            TExecutorPoolStats poolStats;  -            ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);  -            SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);  -        }  -  -        OnWakeup(ctx);  -  -        ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());  -    }  -  -    void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) {  -        // Sum all per-thread counters into the 0th element  -        for (ui32 idx = 1; idx < stats.size(); ++idx) {  -            stats[0].Aggregate(stats[idx]);  -        }  -        if (stats.size()) {  -            poolCounters.Set(poolStats, stats[0], stats.size() - 1);  -        }  -    }  -  -protected:  -    const ui32 IntervalSec;  -    NMonitoring::TDynamicCounterPtr Counters;  -  -    TVector<TExecutorPoolCounters> PoolCounters;  -};  -  -} // NActors  +#pragma once + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/vector.h> +#include <util/generic/xrange.h> +#include <util/string/printf.h> + +namespace NActors { + +// Periodically collects stats from executor threads and exposes them as mon counters +class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> { +private: +    struct THistogramCounters { +        void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) { +            for (size_t i = 0; (1ull<<i) <= maxVal; ++i) { +                TString bucketName = ToString(1ull<<i) + " " + unit; +                Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true)); +            } +            Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true)); +        } + +        void Set(const TLogHistogram& data) { +            ui32 i = 0; +            for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) +                *Buckets[i] = data.Buckets[i]; +            ui64 last = 0; +            for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) +                last += data.Buckets[i]; +            *Buckets.back() = last; +        } + +        void Set(const TLogHistogram& data, double factor) { +            ui32 i = 0; +            for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) +                *Buckets[i] = data.Buckets[i]*factor; +            ui64 last = 0; +            for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) +                last += data.Buckets[i]; +            *Buckets.back() = last*factor; +        } + +    private: +        TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets; +    }; + +    struct TActivityStats { +        void Init(NMonitoring::TDynamicCounterPtr group) { +            Group = group; + +            ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount()); +            ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount()); +            ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); +            ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); +        } + +        void Set(const TExecutorThreadStats& stats) { +            for (ui32 i : xrange(stats.MaxActivityType())) { +                Y_VERIFY(i < GetActivityTypeCount()); +                ui64 ticks = stats.ElapsedTicksByActivity[i]; +                ui64 events = stats.ReceivedEventsByActivity[i]; +                ui64 actors = stats.ActorsAliveByActivity[i]; +                ui64 scheduled = stats.ScheduledEventsByActivity[i]; + +                if (!ActorsAliveByActivityBuckets[i]) { +                    if (ticks || events || actors || scheduled) { +                        InitCountersForActivity(i); +                    } else { +                        continue; +                    } +                } + +                *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000; +                *ReceivedEventsByActivityBuckets[i] = events; +                *ActorsAliveByActivityBuckets[i] = actors; +                *ScheduledEventsByActivityBuckets[i] = scheduled; +            } +        } + +    private: +        void InitCountersForActivity(ui32 activityType) { +            Y_VERIFY(activityType < GetActivityTypeCount()); + +            auto bucketName = TString(GetActivityTypeName(activityType)); + +            ElapsedMicrosecByActivityBuckets[activityType] = +                Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true); +            ReceivedEventsByActivityBuckets[activityType] = +                Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true); +            ActorsAliveByActivityBuckets[activityType] = +                Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false); +            ScheduledEventsByActivityBuckets[activityType] = +                Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); +        } + +    private: +        NMonitoring::TDynamicCounterPtr Group; + +        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets; +        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets; +        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets; +        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets; +    }; + +    struct TExecutorPoolCounters { +        TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup; + +        NMonitoring::TDynamicCounters::TCounterPtr SentEvents; +        NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents; +        NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents; +        NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents; +        NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors; +        NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation; +        NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec; +        NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; +        NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec; +        NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations; +        NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive; +        NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes; +        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; +        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; +        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; + +        THistogramCounters LegacyActivationTimeHistogram; +        NMonitoring::THistogramPtr ActivationTimeHistogram; +        THistogramCounters LegacyEventDeliveryTimeHistogram; +        NMonitoring::THistogramPtr EventDeliveryTimeHistogram; +        THistogramCounters LegacyEventProcessingCountHistogram; +        NMonitoring::THistogramPtr EventProcessingCountHistogram; +        THistogramCounters LegacyEventProcessingTimeHistogram; +        NMonitoring::THistogramPtr EventProcessingTimeHistogram; + +        TActivityStats ActivityStats; +        NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime; + +        double Usage = 0; +        double LastElapsedSeconds = 0; +        THPTimer UsageTimer; +        TString Name; +        ui32 Threads; + +        void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { +            LastElapsedSeconds = 0; +            Usage = 0; +            UsageTimer.Reset(); +            Name = poolName; +            Threads = threads; + +            PoolGroup = group->GetSubgroup("execpool", poolName); + +            SentEvents          = PoolGroup->GetCounter("SentEvents", true); +            ReceivedEvents      = PoolGroup->GetCounter("ReceivedEvents", true); +            PreemptedEvents     = PoolGroup->GetCounter("PreemptedEvents", true); +            NonDeliveredEvents  = PoolGroup->GetCounter("NonDeliveredEvents", true); +            DestroyedActors     = PoolGroup->GetCounter("DestroyedActors", true); +            CpuMicrosec         = PoolGroup->GetCounter("CpuMicrosec", true); +            ElapsedMicrosec     = PoolGroup->GetCounter("ElapsedMicrosec", true); +            ParkedMicrosec      = PoolGroup->GetCounter("ParkedMicrosec", true); +            EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true); +            ActorRegistrations  = PoolGroup->GetCounter("ActorRegistrations", true); +            ActorsAlive         = PoolGroup->GetCounter("ActorsAlive", false); +            AllocatedMailboxes  = PoolGroup->GetCounter("AllocatedMailboxes", false); +            MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); +            MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); +            MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); + +            LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); +            ActivationTimeHistogram = PoolGroup->GetHistogram( +                "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); +            LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000); +            EventDeliveryTimeHistogram = PoolGroup->GetHistogram( +                "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); +            LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000); +            EventProcessingCountHistogram = PoolGroup->GetHistogram( +                "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1)); +            LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000); +            EventProcessingTimeHistogram = PoolGroup->GetHistogram( +                "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + +            ActivityStats.Init(PoolGroup.Get()); + +            MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true); +        } + +        void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS +            *SentEvents         = stats.SentEvents; +            *ReceivedEvents     = stats.ReceivedEvents; +            *PreemptedEvents     = stats.PreemptedEvents; +            *NonDeliveredEvents = stats.NonDeliveredEvents; +            *DestroyedActors    = stats.PoolDestroyedActors; +            *EmptyMailboxActivation = stats.EmptyMailboxActivation; +            *CpuMicrosec        = stats.CpuNs / 1000; +            *ElapsedMicrosec    = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; +            *ParkedMicrosec     = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; +            *ActorRegistrations = stats.PoolActorRegistrations; +            *ActorsAlive        = stats.PoolActorRegistrations - stats.PoolDestroyedActors; +            *AllocatedMailboxes = stats.PoolAllocatedMailboxes; +            *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; +            *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; +            *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; + +            LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); +            ActivationTimeHistogram->Reset(); +            ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); + +            LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram); +            EventDeliveryTimeHistogram->Reset(); +            EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram); + +            LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram); +            EventProcessingCountHistogram->Reset(); +            EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); + +            double toMicrosec = 1000000 / NHPTimer::GetClockRate(); +            LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); +            EventProcessingTimeHistogram->Reset(); +            for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) { +                EventProcessingTimeHistogram->Collect( +                    stats.EventProcessingTimeHistogram.UpperBound(i), +                    stats.EventProcessingTimeHistogram.Value(i) * toMicrosec); +            } + +            ActivityStats.Set(stats); + +            *MaxUtilizationTime = poolStats.MaxUtilizationTime; + +            double seconds = UsageTimer.PassedReset(); + +            // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916 +            const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks); +            const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0; +            LastElapsedSeconds = elapsed; + +            // update usage factor according to smoothness +            const double smoothness = 0.5; +            Usage = currentUsage * smoothness + Usage * (1.0 - smoothness); +#else +            Y_UNUSED(poolStats); +            Y_UNUSED(stats); +            Y_UNUSED(numThreads); +#endif +        } +    }; + +public: +    static constexpr IActor::EActivityType ActorActivityType() { +        return IActor::ACTORLIB_STATS; +    } + +    TStatsCollectingActor( +            ui32 intervalSec, +            const TActorSystemSetup& setup, +            NMonitoring::TDynamicCounterPtr counters) +        : IntervalSec(intervalSec) +        , Counters(counters) +    { +        PoolCounters.resize(setup.GetExecutorsCount()); +        for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { +            PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); +        } +    } + +    void Bootstrap(const TActorContext& ctx) { +        ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); +        Become(&TThis::StateWork); +    } + +    STFUNC(StateWork) { +        switch (ev->GetTypeRewrite()) { +            CFunc(TEvents::TSystem::Wakeup, Wakeup); +        } +    } + +private: +    virtual void OnWakeup(const TActorContext &ctx) { +        Y_UNUSED(ctx); +    } + +    void Wakeup(const TActorContext &ctx) { +        for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { +            TVector<TExecutorThreadStats> stats; +            TExecutorPoolStats poolStats; +            ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); +            SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); +        } + +        OnWakeup(ctx); + +        ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); +    } + +    void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) { +        // Sum all per-thread counters into the 0th element +        for (ui32 idx = 1; idx < stats.size(); ++idx) { +            stats[0].Aggregate(stats[idx]); +        } +        if (stats.size()) { +            poolCounters.Set(poolStats, stats[0], stats.size() - 1); +        } +    } + +protected: +    const ui32 IntervalSec; +    NMonitoring::TDynamicCounterPtr Counters; + +    TVector<TExecutorPoolCounters> PoolCounters; +}; + +} // NActors diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index 7902ff68373..f9bfaf8dc09 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -10,14 +10,14 @@ namespace NActors {  namespace { -struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> {  -    TEvPing(double timeStart)  -        : TimeStart(timeStart)  -    {}  -  -    const double TimeStart;  -};  -  +struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> { +    TEvPing(double timeStart) +        : TimeStart(timeStart) +    {} + +    const double TimeStart; +}; +  template <class TValueType_>  struct TAvgOperation {      struct TValueType { @@ -70,8 +70,8 @@ private:      THPTimer Timer;  public: -    static constexpr auto ActorActivityType() {  -        return SELF_PING_ACTOR;  +    static constexpr auto ActorActivityType() { +        return SELF_PING_ACTOR;      }      TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, @@ -93,7 +93,7 @@ public:      STFUNC(RunningState)      {          switch (ev->GetTypeRewrite()) { -            HFunc(TEvPing, HandlePing);  +            HFunc(TEvPing, HandlePing);          default:              Y_FAIL("TSelfPingActor::RunningState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());          } @@ -146,7 +146,7 @@ public:          return (ui64)d;      } -    void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx)  +    void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx)      {          const auto now = ctx.Now();          const double hpNow = Timer.Passed(); @@ -166,7 +166,7 @@ public:  private:      void SchedulePing(const TActorContext &ctx, double hpNow) const      { -        ctx.Schedule(SendInterval, new TEvPing(hpNow));  +        ctx.Schedule(SendInterval, new TEvPing(hpNow));      }  }; diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h index 4217fa42930..d7d07f9fa8b 100644 --- a/library/cpp/actors/helpers/selfping_actor.h +++ b/library/cpp/actors/helpers/selfping_actor.h @@ -1,6 +1,6 @@  #pragma once -#include <library/cpp/actors/core/actor.h>  +#include <library/cpp/actors/core/actor.h>  #include <library/cpp/monlib/dynamic_counters/counters.h>  namespace NActors { diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index 3643397c0b2..459635fa24a 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -1,24 +1,24 @@  #include "selfping_actor.h"  #include <library/cpp/testing/unittest/registar.h> -#include <library/cpp/actors/testlib/test_runtime.h>  +#include <library/cpp/actors/testlib/test_runtime.h>  namespace NActors {  namespace Tests { -THolder<TTestActorRuntimeBase> CreateRuntime() {  -    auto runtime = MakeHolder<TTestActorRuntimeBase>();  -    runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; });  -    runtime->Initialize();  -    return runtime;  -}  -  +THolder<TTestActorRuntimeBase> CreateRuntime() { +    auto runtime = MakeHolder<TTestActorRuntimeBase>(); +    runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; }); +    runtime->Initialize(); +    return runtime; +} +  Y_UNIT_TEST_SUITE(TSelfPingTest) {      Y_UNIT_TEST(Basic)      { -        auto runtime = CreateRuntime();  +        auto runtime = CreateRuntime(); -        //const TActorId sender = runtime.AllocateEdgeActor();  +        //const TActorId sender = runtime.AllocateEdgeActor();          NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr());          NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr()); @@ -30,10 +30,10 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {          UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);          UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); -        const TActorId actorId = runtime->Register(actor);  -        Y_UNUSED(actorId);  +        const TActorId actorId = runtime->Register(actor); +        Y_UNUSED(actorId); -        //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0)));  +        //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0)));          // TODO check after events are handled          //Sleep(TDuration::Seconds(1)); diff --git a/library/cpp/actors/helpers/ut/ya.make b/library/cpp/actors/helpers/ut/ya.make index 933544d0e76..cba4d6d1d96 100644 --- a/library/cpp/actors/helpers/ut/ya.make +++ b/library/cpp/actors/helpers/ut/ya.make @@ -1,36 +1,36 @@ -UNITTEST_FOR(library/cpp/actors/helpers)  -  -OWNER(  -    alexvru  -    g:kikimr  -)  -  -FORK_SUBTESTS()  -IF (SANITIZER_TYPE)  -    SIZE(LARGE)  -    TIMEOUT(1200)  -    TAG(ya:fat)  -    SPLIT_FACTOR(20)  -    REQUIREMENTS(  -        ram:32  -    )  -ELSE()  -    SIZE(MEDIUM)  -    TIMEOUT(600)  -    REQUIREMENTS(  -        ram:16  -    )  -ENDIF()  -  -  -PEERDIR(  -    library/cpp/actors/interconnect  -    library/cpp/actors/testlib  -    library/cpp/actors/core  -)  -  -SRCS(  -    selfping_actor_ut.cpp  -)  -  -END()  +UNITTEST_FOR(library/cpp/actors/helpers) + +OWNER( +    alexvru +    g:kikimr +) + +FORK_SUBTESTS() +IF (SANITIZER_TYPE) +    SIZE(LARGE) +    TIMEOUT(1200) +    TAG(ya:fat) +    SPLIT_FACTOR(20) +    REQUIREMENTS( +        ram:32 +    ) +ELSE() +    SIZE(MEDIUM) +    TIMEOUT(600) +    REQUIREMENTS( +        ram:16 +    ) +ENDIF() + + +PEERDIR( +    library/cpp/actors/interconnect +    library/cpp/actors/testlib +    library/cpp/actors/core +) + +SRCS( +    selfping_actor_ut.cpp +) + +END() diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make index 10cf704d827..d8771179de8 100644 --- a/library/cpp/actors/helpers/ya.make +++ b/library/cpp/actors/helpers/ya.make @@ -9,7 +9,7 @@ SRCS(      flow_controlled_queue.h      future_callback.h      mon_histogram_helper.h -    selfping_actor.cpp  +    selfping_actor.cpp  )  PEERDIR( @@ -18,8 +18,8 @@ PEERDIR(  )  END() -  -RECURSE_FOR_TESTS(  -    ut  -)  -  + +RECURSE_FOR_TESTS( +    ut +) + diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index fae41db0b3a..b95c994598d 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -7,21 +7,21 @@  #include "interconnect_common.h"  namespace NActors { -    TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket)  +    TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket)          : TActor(&TThis::Initial)          , TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data()))          , Address(address.c_str(), port) -        , Listener(  -            socket  -            ? new NInterconnect::TStreamSocket(*socket)  -            : nullptr)  -        , ExternalSocket(!!Listener)  +        , Listener( +            socket +            ? new NInterconnect::TStreamSocket(*socket) +            : nullptr) +        , ExternalSocket(!!Listener)          , ProxyCommonCtx(std::move(common)) -    {  -        if (ExternalSocket) {  -            SetNonBlock(*Listener);  -        }  -    }  +    { +        if (ExternalSocket) { +            SetNonBlock(*Listener); +        } +    }      TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {          return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); @@ -102,7 +102,7 @@ namespace NActors {                  ctx.Register(CreateIncomingHandshakeActor(ProxyCommonCtx, std::move(socket)));                  continue;              } else if (-r != EAGAIN && -r != EWOULDBLOCK) { -                Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket);  +                Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket);                  LOG_ERROR_IC("ICL06", "Listen failed: %s (%s)", strerror(-r), Address.ToString().data());                  Listener.Reset();                  PollerToken.Reset(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h index aab8e970916..fc71073c2df 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h @@ -15,7 +15,7 @@ namespace NActors {              return INTERCONNECT_COMMON;          } -        TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing());  +        TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing());          int Bind();      private: @@ -45,7 +45,7 @@ namespace NActors {          const NInterconnect::TAddress Address;          TIntrusivePtr<NInterconnect::TStreamSocket> Listener; -        const bool ExternalSocket;  +        const bool ExternalSocket;          TPollerToken::TPtr PollerToken;          TInterconnectProxyCommon::TPtr const ProxyCommonCtx;      }; diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h index 61d627bf28b..172f08fc735 100644 --- a/library/cpp/actors/util/local_process_key.h +++ b/library/cpp/actors/util/local_process_key.h @@ -1,19 +1,19 @@  #pragma once -#include <util/string/builder.h>  +#include <util/string/builder.h>  #include <util/generic/strbuf.h>  #include <util/generic/vector.h>  #include <util/generic/hash.h>  #include <util/generic/singleton.h> -#include <util/generic/serialized_enum.h>  +#include <util/generic/serialized_enum.h>  template <typename T>  class TLocalProcessKeyState {  template <typename U, const char* Name>  friend class TLocalProcessKey; -template <typename U, typename EnumT>  -friend class TEnumProcessKey;  +template <typename U, typename EnumT> +friend class TEnumProcessKey;  public:      static TLocalProcessKeyState& GetInstance() { @@ -21,17 +21,17 @@ public:      }      size_t GetCount() const { -        return StartIndex + Names.size();  +        return StartIndex + Names.size();      }      TStringBuf GetNameByIndex(size_t index) const { -        if (index < StartIndex) {  -            return StaticNames[index];  -        } else {  -            index -= StartIndex;  -            Y_ENSURE(index < Names.size());  -            return Names[index];  -        }  +        if (index < StartIndex) { +            return StaticNames[index]; +        } else { +            index -= StartIndex; +            Y_ENSURE(index < Names.size()); +            return Names[index]; +        }      }      size_t GetIndexByName(TStringBuf name) const { @@ -42,7 +42,7 @@ public:  private:      size_t Register(TStringBuf name) { -        auto x = Map.emplace(name, Names.size()+StartIndex);  +        auto x = Map.emplace(name, Names.size()+StartIndex);          if (x.second) {              Names.emplace_back(name);          } @@ -50,29 +50,29 @@ private:          return x.first->second;      } -    size_t Register(TStringBuf name, ui32 index) {  -        Y_VERIFY(index < StartIndex);  -        auto x = Map.emplace(name, index);  -        Y_VERIFY(x.second || x.first->second == index);  -        StaticNames[index] = name;  -        return x.first->second;  -    }  -  +    size_t Register(TStringBuf name, ui32 index) { +        Y_VERIFY(index < StartIndex); +        auto x = Map.emplace(name, index); +        Y_VERIFY(x.second || x.first->second == index); +        StaticNames[index] = name; +        return x.first->second; +    } +  private: -    static constexpr ui32 StartIndex = 2000;  -  -    TVector<TString> FillStaticNames() {  -        TVector<TString> staticNames;  -        staticNames.reserve(StartIndex);  -        for (ui32 i = 0; i < StartIndex; i++) {  -            staticNames.push_back(TStringBuilder() << "Activity_" << i);  -        }  -        return staticNames;  -    }  -  -    TVector<TString> StaticNames = FillStaticNames();  -    TVector<TString> Names;  -    THashMap<TString, size_t> Map;  +    static constexpr ui32 StartIndex = 2000; + +    TVector<TString> FillStaticNames() { +        TVector<TString> staticNames; +        staticNames.reserve(StartIndex); +        for (ui32 i = 0; i < StartIndex; i++) { +            staticNames.push_back(TStringBuilder() << "Activity_" << i); +        } +        return staticNames; +    } + +    TVector<TString> StaticNames = FillStaticNames(); +    TVector<TString> Names; +    THashMap<TString, size_t> Map;  };  template <typename T, const char* Name> @@ -89,44 +89,44 @@ public:  private:      inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name);  }; -  -template <typename T, typename EnumT>  -class TEnumProcessKey {  -public:  -    static TStringBuf GetName(EnumT key) {  -        return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key));  -    }  -  -    static size_t GetIndex(EnumT key) {  -        ui32 index = static_cast<ui32>(key);  -        if (index < TLocalProcessKeyState<T>::StartIndex) {  -            return index;  -        }  -        Y_VERIFY(index < Enum2Index.size());  -        return Enum2Index[index];  -    }  -  -private:  -    inline static TVector<size_t> RegisterAll() {  -        static_assert(std::is_enum<EnumT>::value, "Enum is required");  -  -        TVector<size_t> enum2Index;  -        auto names = GetEnumNames<EnumT>();  -        ui32 maxId = 0;  -        for (const auto& [k, v] : names) {  -            maxId = Max(maxId, static_cast<ui32>(k));  -        }  -        enum2Index.resize(maxId+1);  -        for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) {  -            enum2Index[i] = i;  -        }  -  -        for (const auto& [k, v] : names) {  -            ui32 enumId = static_cast<ui32>(k);  -            enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId);  -        }  -        return enum2Index;  -    }  -  -    inline static TVector<size_t> Enum2Index = RegisterAll();  -};  + +template <typename T, typename EnumT> +class TEnumProcessKey { +public: +    static TStringBuf GetName(EnumT key) { +        return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key)); +    } + +    static size_t GetIndex(EnumT key) { +        ui32 index = static_cast<ui32>(key); +        if (index < TLocalProcessKeyState<T>::StartIndex) { +            return index; +        } +        Y_VERIFY(index < Enum2Index.size()); +        return Enum2Index[index]; +    } + +private: +    inline static TVector<size_t> RegisterAll() { +        static_assert(std::is_enum<EnumT>::value, "Enum is required"); + +        TVector<size_t> enum2Index; +        auto names = GetEnumNames<EnumT>(); +        ui32 maxId = 0; +        for (const auto& [k, v] : names) { +            maxId = Max(maxId, static_cast<ui32>(k)); +        } +        enum2Index.resize(maxId+1); +        for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) { +            enum2Index[i] = i; +        } + +        for (const auto& [k, v] : names) { +            ui32 enumId = static_cast<ui32>(k); +            enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId); +        } +        return enum2Index; +    } + +    inline static TVector<size_t> Enum2Index = RegisterAll(); +}; diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 8863c80832d..7437b7a8f5e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -56,22 +56,22 @@ void TGRpcServer::Start() {      using grpc::ServerBuilder;      using grpc::ResourceQuota;      ServerBuilder builder; -    auto credentials = grpc::InsecureServerCredentials();  -    if (Options_.SslData) {  +    auto credentials = grpc::InsecureServerCredentials(); +    if (Options_.SslData) {          grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;          keycert.cert_chain = std::move(Options_.SslData->Cert);          keycert.private_key = std::move(Options_.SslData->Key);          grpc::SslServerCredentialsOptions sslOps;          sslOps.pem_root_certs = std::move(Options_.SslData->Root);          sslOps.pem_key_cert_pairs.push_back(keycert); -        credentials = grpc::SslServerCredentials(sslOps);  -    }  -    if (Options_.ExternalListener) {  -        Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(  -            ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,  -            credentials  -        ));  -    } else {  +        credentials = grpc::SslServerCredentials(sslOps); +    } +    if (Options_.ExternalListener) { +        Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor( +            ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD, +            credentials +        )); +    } else {          builder.AddListeningPort(server_address, credentials);      }      builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); @@ -171,10 +171,10 @@ void TGRpcServer::Start() {              }));          }      } -  -    if (Options_.ExternalListener) {  -        Options_.ExternalListener->Start();  -    }  + +    if (Options_.ExternalListener) { +        Options_.ExternalListener->Start(); +    }  }  void TGRpcServer::Stop() { @@ -223,10 +223,10 @@ void TGRpcServer::Stop() {      }      Ts.clear(); -  -    if (Options_.ExternalListener) {  -        Options_.ExternalListener->Stop();  -    }  + +    if (Options_.ExternalListener) { +        Options_.ExternalListener->Stop(); +    }  }  ui16 TGRpcServer::GetPort() const { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 9cb55e3154a..d6814a90a0d 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -27,15 +27,15 @@ struct TSslData {      TString Root;  }; -struct IExternalListener  -    : public TThrRefBase  -{  -    using TPtr = TIntrusivePtr<IExternalListener>;  -    virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0;  -    virtual void Start() = 0;  -    virtual void Stop() = 0;  -};  -  +struct IExternalListener +    : public TThrRefBase +{ +    using TPtr = TIntrusivePtr<IExternalListener>; +    virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0; +    virtual void Start() = 0; +    virtual void Stop() = 0; +}; +  //! Server's options.  struct TServerOptions {  #define DECLARE_FIELD(name, type, default) \ @@ -93,8 +93,8 @@ struct TServerOptions {      //! Custom configurator for ServerBuilder.      DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){}); -    DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);  -  +    DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr); +      //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).      DECLARE_FIELD(Logger, TLoggerPtr, nullptr);  | 
