diff options
author | Alexander Gololobov <davenger@yandex-team.com> | 2022-02-10 16:47:38 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:38 +0300 |
commit | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors | |
parent | 39608cdb86363c75ce55b2b9a69841c3b71f22cf (diff) | |
download | ydb-fccc62e9bfdce9be2fe7e0f23479da3a5512211a.tar.gz |
Restoring authorship annotation for Alexander Gololobov <davenger@yandex-team.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
38 files changed, 373 insertions, 373 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 7eabe6fbd1..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -218,7 +218,7 @@ namespace NActors { TActorIdentity SelfActorId; i64 ElapsedTicks; ui64 HandledEvents; - + friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); friend class TDecorator; @@ -235,10 +235,10 @@ namespace NActors { INTERCONNECT_COMMON = 171, SELF_PING_ACTOR = 207, TEST_ACTOR_RUNTIME = 283, - INTERCONNECT_HANDSHAKE = 284, - INTERCONNECT_POLLER = 285, - INTERCONNECT_SESSION_KILLER = 286, - ACTOR_SYSTEM_SCHEDULER_ACTOR = 312, + INTERCONNECT_HANDSHAKE = 284, + INTERCONNECT_POLLER = 285, + INTERCONNECT_SESSION_KILLER = 286, + ACTOR_SYSTEM_SCHEDULER_ACTOR = 312, ACTOR_FUTURE_CALLBACK = 337, INTERCONNECT_MONACTOR = 362, INTERCONNECT_LOAD_ACTOR = 376, @@ -418,10 +418,10 @@ namespace NActors { } protected: - //* Comment this function to find unmarked activities + //* Comment this function to find unmarked activities static constexpr IActor::EActivityType ActorActivityType() { return EActorActivity::OTHER; - } //*/ + } //*/ // static constexpr char ActorName[] = "UNNAMED"; diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 242794ac6f..40499d7586 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -8,7 +8,7 @@ #include "event.h" #include "log_settings.h" #include "scheduler_cookie.h" -#include "mon_stats.h" +#include "mon_stats.h" #include <library/cpp/threading/future/future.h> #include <library/cpp/actors/util/ticket_lock.h> @@ -58,7 +58,7 @@ namespace NActors { , DestroyedActors(0) { } - + virtual ~IExecutorPool() { } @@ -348,7 +348,7 @@ namespace NActors { T* AppData() const { return (T*)AppData0; } - + NLog::TSettings* LoggerSettings() const { return LoggerSettings0.Get(); } diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h index 6c50ab677c..980b7d767b 100644 --- a/library/cpp/actors/core/defs.h +++ b/library/cpp/actors/core/defs.h @@ -6,12 +6,12 @@ #include <util/generic/hash.h> #include <util/string/printf.h> -// Enables collection of -// event send/receive counts -// activation time histograms -// event processing time histograms -#define ACTORSLIB_COLLECT_EXEC_STATS - +// Enables collection of +// event send/receive counts +// activation time histograms +// event processing time histograms +#define ACTORSLIB_COLLECT_EXEC_STATS + namespace NActors { using TPoolId = ui8; using TPoolsMask = ui64; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 9d2b694cb2..6ff02aaf94 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -7,9 +7,9 @@ #include <library/cpp/actors/wilson/wilson_trace.h> -#include <util/system/hp_timer.h> +#include <util/system/hp_timer.h> #include <util/generic/maybe.h> - + namespace NActors { class TChunkSerializer; @@ -110,10 +110,10 @@ namespace NActors { // filled if feeded by interconnect session const TActorId InterconnectSession; -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS ::NHPTimer::STime SendTime; -#endif - +#endif + static const size_t ChannelBits = 12; static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; @@ -174,9 +174,9 @@ namespace NActors { , Sender(sender) , Cookie(cookie) , TraceId(std::move(traceId)) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS , SendTime(0) -#endif +#endif , Event(ev) , RewriteRecipient(Recipient) , RewriteType(Type) @@ -199,9 +199,9 @@ namespace NActors { , Sender(sender) , Cookie(cookie) , TraceId(std::move(traceId)) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS , SendTime(0) -#endif +#endif , Buffer(std::move(buffer)) , RewriteRecipient(Recipient) , RewriteType(Type) @@ -228,9 +228,9 @@ namespace NActors { , OriginScopeId(originScopeId) , TraceId(std::move(traceId)) , InterconnectSession(session) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS , SendTime(0) -#endif +#endif , Buffer(std::move(buffer)) , RewriteRecipient(Recipient) , RewriteType(Type) diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index d7ff4a3cbe..d7546b901a 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -127,27 +127,27 @@ namespace NActors { static const size_t EventMaxByteSize = 67108000; #endif - template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder> - class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder { + template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder> + class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder { // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies TVector<TRope> Payload; public: - using TRecHolder::Record; - - public: + using TRecHolder::Record; + + public: using ProtoRecordType = TRecord; - TEventPBBase() = default; + TEventPBBase() = default; - explicit TEventPBBase(const TRecord& rec) + explicit TEventPBBase(const TRecord& rec) { - Record = rec; + Record = rec; } - explicit TEventPBBase(TRecord&& rec) + explicit TEventPBBase(TRecord&& rec) { - Record = std::move(rec); + Record = std::move(rec); } TString ToStringHeader() const override { @@ -231,7 +231,7 @@ namespace NActors { } static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) { - THolder<TEventPBBase> ev(new TEv()); + THolder<TEventPBBase> ev(new TEv()); if (!input->GetSize()) { Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString()); } else { @@ -273,7 +273,7 @@ namespace NActors { } ev->CachedByteSize = input->GetSize(); return ev.Release(); - } + } size_t GetCachedByteSize() const { if (CachedByteSize == 0) { @@ -369,43 +369,43 @@ namespace NActors { } }; - // Protobuf record not using arena - template <typename TRecord> - struct TRecordHolder { - TRecord Record; - }; - - // Protobuf arena and a record allocated on it - template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize> - struct TArenaRecordHolder { - google::protobuf::Arena PbArena; - TRecord& Record; - - static const google::protobuf::ArenaOptions GetArenaOptions() { - google::protobuf::ArenaOptions opts; - opts.initial_block_size = InitialBlockSize; - opts.max_block_size = MaxBlockSize; - return opts; - } - - TArenaRecordHolder() - : PbArena(GetArenaOptions()) - , Record(*google::protobuf::Arena::CreateMessage<TRecord>(&PbArena)) - {} - }; - + // Protobuf record not using arena + template <typename TRecord> + struct TRecordHolder { + TRecord Record; + }; + + // Protobuf arena and a record allocated on it + template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize> + struct TArenaRecordHolder { + google::protobuf::Arena PbArena; + TRecord& Record; + + static const google::protobuf::ArenaOptions GetArenaOptions() { + google::protobuf::ArenaOptions opts; + opts.initial_block_size = InitialBlockSize; + opts.max_block_size = MaxBlockSize; + return opts; + } + + TArenaRecordHolder() + : PbArena(GetArenaOptions()) + , Record(*google::protobuf::Arena::CreateMessage<TRecord>(&PbArena)) + {} + }; + + template <typename TEv, typename TRecord, ui32 TEventType> + class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > { + typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase; + // NOTE: No extra fields allowed: TEventPB must be a "template typedef" + public: + using TPbBase::TPbBase; + }; + + template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024> + using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >; + template <typename TEv, typename TRecord, ui32 TEventType> - class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > { - typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase; - // NOTE: No extra fields allowed: TEventPB must be a "template typedef" - public: - using TPbBase::TPbBase; - }; - - template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024> - using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >; - - template <typename TEv, typename TRecord, ui32 TEventType> class TEventShortDebugPB: public TEventPB<TEv, TRecord, TEventType> { public: using TBase = TEventPB<TEv, TRecord, TEventType>; @@ -455,7 +455,7 @@ namespace NActors { base.Swap(©); PreSerializedData.clear(); } - return TBase::Record; + return TBase::Record; } const TRecord& GetRecord() const { @@ -463,7 +463,7 @@ namespace NActors { } TRecord* MutableRecord() { - GetRecord(); // Make sure PreSerializedData is parsed + GetRecord(); // Make sure PreSerializedData is parsed return &(TBase::Record); } diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index 4dcc958bb1..eab007bc15 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -8,7 +8,7 @@ using namespace NActors; enum { EvMessageWithPayload = EventSpaceBegin(TEvents::ES_PRIVATE), - EvArenaMessage, + EvArenaMessage, EvArenaMessageBig, EvMessageWithPayloadPreSerialized }; @@ -38,81 +38,81 @@ TString MakeString(size_t len) { Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { - template <class TEventFrom, class TEventTo> - void TestSerializeDeserialize(size_t size1, size_t size2) { - static_assert(TEventFrom::EventType == TEventTo::EventType, "Must be same event type"); + template <class TEventFrom, class TEventTo> + void TestSerializeDeserialize(size_t size1, size_t size2) { + static_assert(TEventFrom::EventType == TEventTo::EventType, "Must be same event type"); - TEventFrom msg; - msg.Record.SetMeta("hello, world!"); - msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size1)))); - msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size2)))); - msg.Record.AddSomeData(MakeString((size1 + size2) % 50 + 11)); + TEventFrom msg; + msg.Record.SetMeta("hello, world!"); + msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size1)))); + msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size2)))); + msg.Record.AddSomeData(MakeString((size1 + size2) % 50 + 11)); - auto serializer = MakeHolder<TAllocChunkSerializer>(); + auto serializer = MakeHolder<TAllocChunkSerializer>(); msg.SerializeToArcadiaStream(serializer.Get()); - auto buffers = serializer->Release(msg.IsExtendedFormat()); - UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize()); - TString ser = buffers->GetString(); - - TString chunkerRes; - TCoroutineChunkSerializer chunker; - chunker.SetSerializingEvent(&msg); - while (!chunker.IsComplete()) { - char buffer[4096]; + auto buffers = serializer->Release(msg.IsExtendedFormat()); + UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize()); + TString ser = buffers->GetString(); + + TString chunkerRes; + TCoroutineChunkSerializer chunker; + chunker.SetSerializingEvent(&msg); + while (!chunker.IsComplete()) { + char buffer[4096]; auto range = chunker.FeedBuf(buffer, sizeof(buffer)); for (auto p = range.first; p != range.second; ++p) { chunkerRes += TString(p->first, p->second); } } - UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser); - + UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser); + THolder<IEventBase> ev2 = THolder(TEventTo::Load(buffers)); - TEventTo& msg2 = static_cast<TEventTo&>(*ev2); - UNIT_ASSERT_VALUES_EQUAL(msg2.Record.GetMeta(), msg.Record.GetMeta()); - UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(0)), msg.GetPayload(msg.Record.GetPayloadId(0))); - UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(1)), msg.GetPayload(msg.Record.GetPayloadId(1))); + TEventTo& msg2 = static_cast<TEventTo&>(*ev2); + UNIT_ASSERT_VALUES_EQUAL(msg2.Record.GetMeta(), msg.Record.GetMeta()); + UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(0)), msg.GetPayload(msg.Record.GetPayloadId(0))); + UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(1)), msg.GetPayload(msg.Record.GetPayloadId(1))); + } + + template <class TEvent> + void TestAllSizes(size_t step1 = 100, size_t step2 = 111) { + for (size_t size1 = 0; size1 < 10000; size1 += step1) { + for (size_t size2 = 0; size2 < 10000; size2 += step2) { + TestSerializeDeserialize<TEvent, TEvent>(size1, size2); + } + } } - - template <class TEvent> - void TestAllSizes(size_t step1 = 100, size_t step2 = 111) { - for (size_t size1 = 0; size1 < 10000; size1 += step1) { - for (size_t size2 = 0; size2 < 10000; size2 += step2) { - TestSerializeDeserialize<TEvent, TEvent>(size1, size2); - } - } - } - + #if (!defined(_tsan_enabled_)) - Y_UNIT_TEST(SerializeDeserialize) { - TestAllSizes<TEvMessageWithPayload>(); - } + Y_UNIT_TEST(SerializeDeserialize) { + TestAllSizes<TEvMessageWithPayload>(); + } #endif - - - struct TEvArenaMessage : TEventPBWithArena<TEvArenaMessage, TMessageWithPayload, EvArenaMessage> { - }; - - Y_UNIT_TEST(SerializeDeserializeArena) { - TestAllSizes<TEvArenaMessage>(500, 111); - } - - - struct TEvArenaMessageBig : TEventPBWithArena<TEvArenaMessageBig, TMessageWithPayload, EvArenaMessageBig, 4000, 32000> { - }; - - Y_UNIT_TEST(SerializeDeserializeArenaBig) { - TestAllSizes<TEvArenaMessageBig>(111, 500); - } - - - // Compatible with TEvArenaMessage but doesn't use arenas - struct TEvArenaMessageWithoutArena : TEventPB<TEvArenaMessageWithoutArena, TMessageWithPayload, EvArenaMessage> { - }; - - Y_UNIT_TEST(Compatibility) { - TestSerializeDeserialize<TEvArenaMessage, TEvArenaMessageWithoutArena>(200, 14010); - TestSerializeDeserialize<TEvArenaMessageWithoutArena, TEvArenaMessage>(2000, 4010); - } + + + struct TEvArenaMessage : TEventPBWithArena<TEvArenaMessage, TMessageWithPayload, EvArenaMessage> { + }; + + Y_UNIT_TEST(SerializeDeserializeArena) { + TestAllSizes<TEvArenaMessage>(500, 111); + } + + + struct TEvArenaMessageBig : TEventPBWithArena<TEvArenaMessageBig, TMessageWithPayload, EvArenaMessageBig, 4000, 32000> { + }; + + Y_UNIT_TEST(SerializeDeserializeArenaBig) { + TestAllSizes<TEvArenaMessageBig>(111, 500); + } + + + // Compatible with TEvArenaMessage but doesn't use arenas + struct TEvArenaMessageWithoutArena : TEventPB<TEvArenaMessageWithoutArena, TMessageWithPayload, EvArenaMessage> { + }; + + Y_UNIT_TEST(Compatibility) { + TestSerializeDeserialize<TEvArenaMessage, TEvArenaMessageWithoutArena>(200, 14010); + TestSerializeDeserialize<TEvArenaMessageWithoutArena, TEvArenaMessage>(2000, 4010); + } Y_UNIT_TEST(PreSerializedCompatibility) { // ensure TEventPreSerializedPB and TEventPB are interchangable with no compatibility issues diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h index a2e380e466..e7a13c1005 100644 --- a/library/cpp/actors/core/executelater.h +++ b/library/cpp/actors/core/executelater.h @@ -8,10 +8,10 @@ namespace NActors { template <typename TCallback> class TExecuteLater: public TActorBootstrapped<TExecuteLater<TCallback>> { public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::ACTORLIB_COMMON; - } - + static constexpr IActor::EActivityType ActorActivityType() { + return IActor::ACTORLIB_COMMON; + } + TExecuteLater( TCallback&& callback, IActor::EActivityType activityType, diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index e3852eaf8d..c3b9999168 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -16,9 +16,9 @@ namespace NActors { : IExecutorPool(poolId) , ActorSystem(nullptr) , MailboxTable(new TMailboxTable) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - , Stats(maxActivityType) -#endif +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , Stats(maxActivityType) +#endif {} TExecutorPoolBaseMailboxed::~TExecutorPoolBaseMailboxed() { @@ -47,9 +47,9 @@ namespace NActors { bool TExecutorPoolBaseMailboxed::Send(TAutoPtr<IEventHandle>& ev) { Y_VERIFY_DEBUG(ev->GetRecipientRewrite().PoolID() == PoolId); -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast()); -#endif +#endif return MailboxTable->SendTo(ev, this); } @@ -59,21 +59,21 @@ namespace NActors { TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) { NHPTimer::STime hpstart = GetCycleCountFast(); -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS ui32 at = actor->GetActivityType(); if (at >= Stats.MaxActivityType()) at = 0; AtomicIncrement(Stats.ActorsAliveByActivity[at]); -#endif +#endif AtomicIncrement(ActorRegistrations); - + // first step - find good enough mailbox ui32 hint = 0; TMailboxHeader* mailbox = nullptr; if (revolvingWriteCounter == 0) revolvingWriteCounter = AtomicIncrement(RegisterRevolvingCounter); - + { ui32 hintBackoff = 0; @@ -122,7 +122,7 @@ namespace NActors { default: Y_FAIL(); } - + NHPTimer::STime elapsed = GetCycleCountFast() - hpstart; if (elapsed > 1000000) { LWPROBE(SlowRegisterNew, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0); @@ -133,14 +133,14 @@ namespace NActors { TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) { NHPTimer::STime hpstart = GetCycleCountFast(); -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS ui32 at = actor->GetActivityType(); if (at >= Stats.MaxActivityType()) at = 0; AtomicIncrement(Stats.ActorsAliveByActivity[at]); -#endif +#endif AtomicIncrement(ActorRegistrations); - + const ui64 localActorId = AllocateID(); mailbox->AttachActor(localActorId, actor); diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index 49129e8a8d..c84ce1af77 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -12,11 +12,11 @@ namespace NActors { protected: TActorSystem* ActorSystem; THolder<TMailboxTable> MailboxTable; -#ifdef ACTORSLIB_COLLECT_EXEC_STATS +#ifdef ACTORSLIB_COLLECT_EXEC_STATS // Need to have per pool object to collect stats like actor registrations (because // registrations might be done in threads from other pools) TExecutorThreadStats Stats; -#endif +#endif TAtomic RegisterRevolvingCounter = 0; ui64 AllocateID(); public: diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index fdd07ef84f..4dce16939a 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -21,8 +21,8 @@ namespace NActors { TAffinity* affinity, TDuration timePerMailbox, ui32 eventsPerMailbox, - int realtimePriority, - ui32 maxActivityType) + int realtimePriority, + ui32 maxActivityType) : TExecutorPoolBase(poolId, threads, affinity, maxActivityType) , SpinThreshold(spinThreshold) , SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles @@ -195,7 +195,7 @@ namespace NActors { return activation; } SpinLockPause(); - } + } // stopping, die! return 0; @@ -245,8 +245,8 @@ namespace NActors { for (size_t i = 0; i < PoolThreads; ++i) { Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]); } - } - + } + void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { TAffinityGuard affinityGuard(Affinity()); diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 2a663ab3ca..023190f7fe 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -77,8 +77,8 @@ namespace NActors { TAffinity* affinity = nullptr, TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX, - int realtimePriority = 0, - ui32 maxActivityType = 1); + int realtimePriority = 0, + ui32 maxActivityType = 1); explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg); ~TBasicExecutorPool(); diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h index b4f1472731..e576d642a1 100644 --- a/library/cpp/actors/core/executor_pool_io.h +++ b/library/cpp/actors/core/executor_pool_io.h @@ -26,7 +26,7 @@ namespace NActors { public: TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr, - ui32 maxActivityType = 1); + ui32 maxActivityType = 1); explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg); ~TIOExecutorPool(); diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index c8c2b3af54..dac6245635 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -741,7 +741,7 @@ namespace NActors { #endif } } - } + } void WakeFastWorker() { #ifdef _linux_ diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 437f1d04b9..446b651efd 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -21,9 +21,9 @@ #include <util/system/type_name.h> #include <util/system/datetime.h> - -LWTRACE_USING(ACTORLIB_PROVIDER) - + +LWTRACE_USING(ACTORLIB_PROVIDER) + namespace NActors { constexpr TDuration TExecutorThread::DEFAULT_TIME_PER_MAILBOX; @@ -98,10 +98,10 @@ namespace NActors { } } - inline TString ActorTypeName(const IActor* actor, ui32 activityType) { - return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)"); - } - + inline TString ActorTypeName(const IActor* actor, ui32 activityType) { + return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)"); + } + inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient, double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) { const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; @@ -124,13 +124,13 @@ namespace NActors { eventMs, baseEv ? SafeTypeName(baseEv) : ToString(evTypeForTracing), currentRecipient.ToString(), - ActorTypeName(actor, activityType)); + ActorTypeName(actor, activityType)); } template <typename TMailbox> void TExecutorThread::Execute(TMailbox* mailbox, ui32 hint) { Y_VERIFY_DEBUG(DyingActors.empty()); - + bool reclaimAsFree = false; NHPTimer::STime hpstart = GetCycleCountFast(); @@ -167,9 +167,9 @@ namespace NActors { double sinceActivationMs = NHPTimer::GetSeconds(hpprev - hpstart) * 1000.0; LwTraceSlowDelivery(ev.Get(), actor, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed); } - + ui32 evTypeForTracing = ev->Type; - + ui32 activityType = actor->GetActivityType(); if (activityType != prevActivityType) { prevActivityType = activityType; @@ -184,10 +184,10 @@ namespace NActors { DropUnregistered(); actor = nullptr; } - + if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox reclaimAsFree = true; - + hpnow = GetCycleCountFast(); NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter); if (elapsed > 1000000) { @@ -197,9 +197,9 @@ namespace NActors { // The actor might have been destroyed if (actor) actor->AddElapsedTicks(elapsed); - + CurrentRecipient = TActorId(); - } else { + } else { TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); if (nonDelivered.Get()) { ActorSystem->Send(nonDelivered); @@ -207,7 +207,7 @@ namespace NActors { Ctx.IncrementNonDeliveredEvents(); } hpnow = GetCycleCountFast(); - } + } hpprev = hpnow; @@ -241,8 +241,8 @@ namespace NActors { recipient.ToString(), SafeTypeName(actor)); break; - } - + } + if (executed + 1 == Ctx.EventsPerMailbox) { AtomicStore(&mailbox->ScheduleMoment, hpnow); Ctx.IncrementMailboxPushedOutByEventCount(); @@ -255,8 +255,8 @@ namespace NActors { Ctx.WorkerId, recipient.ToString(), SafeTypeName(actor)); - break; - } + break; + } } else { if (executed == 0) Ctx.IncrementEmptyMailboxActivation(); @@ -271,7 +271,7 @@ namespace NActors { SafeTypeName(actor)); break; // empty queue, leave } - } + } NProfiling::TMemoryTagScope::Reset(0); TlsActivationContext = nullptr; diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 6d6e527f9e..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -54,15 +54,15 @@ namespace NActors { #ifdef USE_ACTOR_CALLSTACK ev->Callstack = TCallstack::GetTlsCallstack(); ev->Callstack.Trace(); -#endif +#endif Ctx.IncrementSentEvents(); return ActorSystem->Send(ev); } - + void GetCurrentStats(TExecutorThreadStats& statsCopy) const { Ctx.GetCurrentStats(statsCopy); } - + TThreadId GetThreadId() const; // blocks, must be called after Start() TWorkerId GetWorkerId() const { return Ctx.WorkerId; } diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 3edc42012c..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -164,8 +164,8 @@ namespace NActors { std::shared_ptr<NMonitoring::TMetricRegistry> Metrics; }; - TAtomic TLoggerActor::IsOverflow = 0; - + TAtomic TLoggerActor::IsOverflow = 0; + TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, TAutoPtr<TLogBackend> logBackend, TIntrusivePtr<NMonitoring::TDynamicCounters> counters) @@ -224,10 +224,10 @@ namespace NActors { } void TLoggerActor::Throttle(const NLog::TSettings& settings) { - if (AtomicGet(IsOverflow)) + if (AtomicGet(IsOverflow)) Sleep(settings.ThrottleDelay); - } - + } + void TLoggerActor::LogIgnoredCount(TInstant now) { TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount); if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { @@ -250,7 +250,7 @@ namespace NActors { Metrics->IncActorMsgs(); const auto prio = ev.Level.ToPrio(); - + switch (prio) { case ::NActors::NLog::EPrio::Alert: Metrics->IncAlertMsgs(); @@ -267,28 +267,28 @@ namespace NActors { void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds(); WriteMessageStat(*ev->Get()); - if (Settings->AllowDrop) { - // Disable throttling if it was enabled previously - if (AtomicGet(IsOverflow)) - AtomicSet(IsOverflow, 0); - - // Check if some records have to be dropped + if (Settings->AllowDrop) { + // Disable throttling if it was enabled previously + if (AtomicGet(IsOverflow)) + AtomicSet(IsOverflow, 0); + + // Check if some records have to be dropped if ((PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs) || IgnoredCount > 0) { Metrics->IncIgnoredMsgs(); - if (IgnoredCount == 0) { - ctx.Send(ctx.SelfID, new TLogIgnored()); - } - ++IgnoredCount; - PassedCount = 0; - return; + if (IgnoredCount == 0) { + ctx.Send(ctx.SelfID, new TLogIgnored()); + } + ++IgnoredCount; + PassedCount = 0; + return; } - PassedCount++; - } else { - // Enable of disable throttling depending on the load - if (delayMillisec > (i64)Settings->TimeThresholdMs && !AtomicGet(IsOverflow)) - AtomicSet(IsOverflow, 1); - else if (delayMillisec <= (i64)Settings->TimeThresholdMs && AtomicGet(IsOverflow)) - AtomicSet(IsOverflow, 0); + PassedCount++; + } else { + // Enable of disable throttling depending on the load + if (delayMillisec > (i64)Settings->TimeThresholdMs && !AtomicGet(IsOverflow)) + AtomicSet(IsOverflow, 1); + else if (delayMillisec <= (i64)Settings->TimeThresholdMs && AtomicGet(IsOverflow)) + AtomicSet(IsOverflow, 0); } const auto prio = ev->Get()->Level.ToPrio(); @@ -376,8 +376,8 @@ namespace NActors { bool hasPriority = false; bool hasSamplingPriority = false; bool hasSamplingRate = false; - bool hasAllowDrop = false; - int allowDrop = 0; + bool hasAllowDrop = false; + int allowDrop = 0; if (params.Has("c")) { if (TryFromString(params.Get("c"), component) && (component == NLog::InvalidComponent || Settings->IsValidComponent(component))) { hasComponent = true; @@ -402,11 +402,11 @@ namespace NActors { } } } - if (params.Has("allowdrop")) { - if (TryFromString(params.Get("allowdrop"), allowDrop)) { - hasAllowDrop = true; - } - } + if (params.Has("allowdrop")) { + if (TryFromString(params.Get("allowdrop"), allowDrop)) { + hasAllowDrop = true; + } + } TStringStream str; if (hasComponent && !hasPriority && !hasSamplingPriority && !hasSamplingRate) { @@ -485,9 +485,9 @@ namespace NActors { if (hasComponent && hasSamplingRate) { Settings->SetSamplingRate(samplingRate, component, explanation); } - if (hasAllowDrop) { - Settings->SetAllowDrop(allowDrop); - } + if (hasAllowDrop) { + Settings->SetAllowDrop(allowDrop); + } HTML(str) { if (!explanation.empty()) { @@ -559,10 +559,10 @@ namespace NActors { str << "Drop log entries in case of overflow: " << (Settings->AllowDrop ? "Enabled" : "Disabled"); } - str << "<form method=\"GET\">" << Endl; + str << "<form method=\"GET\">" << Endl; str << "<input type=\"hidden\" name=\"allowdrop\" value=\"" << (Settings->AllowDrop ? "0" : "1") << "\"/>" << Endl; str << "<input class=\"btn btn-primary\" type=\"submit\" value=\"" << (Settings->AllowDrop ? "Disable" : "Enable") << "\"/>" << Endl; - str << "</form>" << Endl; + str << "</form>" << Endl; } } Metrics->GetOutputHtml(str); @@ -588,7 +588,7 @@ namespace NActors { logRecord << time; } logRecord - << Settings->MessagePrefix + << Settings->MessagePrefix << " :" << Settings->ComponentName(component) << " " << PriorityToString(priority) << ": " << formatted; diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 4219194faa..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -233,13 +233,13 @@ namespace NActors { void Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...); static void Throttle(const NLog::TSettings& settings); - + private: TIntrusivePtr<NLog::TSettings> Settings; std::shared_ptr<TLogBackend> LogBackend; ui64 IgnoredCount = 0; ui64 PassedCount = 0; - static TAtomic IsOverflow; + static TAtomic IsOverflow; TDuration WakeupInterval{TDuration::Seconds(5)}; std::unique_ptr<ILoggerMetrics> Metrics; diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index e22760d149..f52f2fc5d2 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -11,7 +11,7 @@ namespace NActors { : LoggerActorId(loggerActorId) , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) - , AllowDrop(true) + , AllowDrop(true) , ThrottleDelay(TDuration::MilliSeconds(100)) , MinVal(0) , MaxVal(0) @@ -33,7 +33,7 @@ namespace NActors { : LoggerActorId(loggerActorId) , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) - , AllowDrop(true) + , AllowDrop(true) , ThrottleDelay(TDuration::MilliSeconds(100)) , MinVal(0) , MaxVal(0) @@ -201,10 +201,10 @@ namespace NActors { return (MinVal <= component) && (component <= MaxVal) && !ComponentNames[component].empty(); } - void TSettings::SetAllowDrop(bool val) { - AllowDrop = val; - } - + void TSettings::SetAllowDrop(bool val) { + AllowDrop = val; + } + void TSettings::SetThrottleDelay(TDuration value) { ThrottleDelay = value; } diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index acab6bb93e..7fe4504edd 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -72,7 +72,7 @@ namespace NActors { TActorId LoggerActorId; EComponent LoggerComponent; ui64 TimeThresholdMs; - bool AllowDrop; + bool AllowDrop; TDuration ThrottleDelay; TArrayHolder<TAtomic> ComponentInfo; TVector<TString> ComponentNames; @@ -92,7 +92,7 @@ namespace NActors { ELogFormat Format; TString ShortHostName; TString ClusterName; - TString MessagePrefix; + TString MessagePrefix; // The best way to provide minVal, maxVal and func is to have // protobuf enumeration of components. In this case protoc @@ -161,7 +161,7 @@ namespace NActors { static int PowerOf2Mask(int val); static bool IsValidPriority(EPriority priority); bool IsValidComponent(EComponent component); - void SetAllowDrop(bool val); + void SetAllowDrop(bool val); void SetThrottleDelay(TDuration value); void SetUseLocalTimestamps(bool value); diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index 87337bfa9e..d84b4f9e46 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -180,7 +180,7 @@ namespace NActors { TSimpleMailbox* const mailbox = TSimpleMailbox::Get(lineHint, x); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); @@ -200,11 +200,11 @@ namespace NActors { "We expect that one line can store more simple mailboxes than revolving mailboxes"); if (lineHint > TRevolvingMailbox::MaxMailboxesInLine()) return false; - + TRevolvingMailbox* const mailbox = TRevolvingMailbox::Get(lineHint, x); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->QueueWriter.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 1879a8aea6..0bd9c4d314 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -370,7 +370,7 @@ namespace NActors { TRevolvingMailbox(); ~TRevolvingMailbox(); - + IEventHandle* Pop() { return QueueReader.Pop(); } diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 2415537e71..d55552af0c 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -1,16 +1,16 @@ -#pragma once - -#include "defs.h" -#include "actor.h" +#pragma once + +#include "defs.h" +#include "actor.h" #include <library/cpp/monlib/metrics/histogram_snapshot.h> -#include <util/system/hp_timer.h> - -namespace NActors { +#include <util/system/hp_timer.h> + +namespace NActors { struct TLogHistogram : public NMonitoring::IHistogramSnapshot { TLogHistogram() { memset(Buckets, 0, sizeof(Buckets)); } - + inline void Add(ui64 val, ui64 inc = 1) { size_t ind = 0; #if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 @@ -27,15 +27,15 @@ namespace NActors { RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc); RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc); } - + void Aggregate(const TLogHistogram& other) { const ui64 inc = RelaxedLoad(&other.TotalSamples); RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc); for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) { Buckets[i] += RelaxedLoad(&other.Buckets[i]); } - } - + } + // IHistogramSnapshot ui32 Count() const override { return Y_ARRAY_SIZE(Buckets); @@ -57,7 +57,7 @@ namespace NActors { ui64 TotalSamples = 0; ui64 Buckets[65]; }; - + struct TExecutorPoolStats { ui64 MaxUtilizationTime = 0; }; @@ -86,7 +86,7 @@ namespace NActors { ui64 MailboxPushedOutBySoftPreemption = 0; ui64 MailboxPushedOutByTime = 0; ui64 MailboxPushedOutByEventCount = 0; - + TExecutorThreadStats(size_t activityVecSize = 1) // must be not empty as 0 used as default : ElapsedTicksByActivity(activityVecSize) , ReceivedEventsByActivity(activityVecSize) @@ -103,7 +103,7 @@ namespace NActors { for (size_t at = 0; at < otherSize; ++at) self[at] += RelaxedLoad(&other[at]); } - + void Aggregate(const TExecutorThreadStats& other) { SentEvents += RelaxedLoad(&other.SentEvents); ReceivedEvents += RelaxedLoad(&other.ReceivedEvents); @@ -115,9 +115,9 @@ namespace NActors { ParkedTicks += RelaxedLoad(&other.ParkedTicks); BlockedTicks += RelaxedLoad(&other.BlockedTicks); MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); - MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); - MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); - + MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); + MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); + ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram); EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram); EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram); @@ -143,5 +143,5 @@ namespace NActors { return ActorsAliveByActivity.size(); } }; - + } diff --git a/library/cpp/actors/core/probes.cpp b/library/cpp/actors/core/probes.cpp index ae475bb59f..7ace83e102 100644 --- a/library/cpp/actors/core/probes.cpp +++ b/library/cpp/actors/core/probes.cpp @@ -1,10 +1,10 @@ -#include "probes.h" - +#include "probes.h" + #include "actorsystem.h" #include <util/string/builder.h> -LWTRACE_DEFINE_PROVIDER(ACTORLIB_PROVIDER); +LWTRACE_DEFINE_PROVIDER(ACTORLIB_PROVIDER); namespace NActors { TVector<NLWTrace::TDashboard> LWTraceDashboards(TActorSystemSetup* setup) { diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index 536200b977..4912d6dd26 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + #include <library/cpp/lwtrace/all.h> #include <util/generic/vector.h> - + #define LWACTORID(x) (x).RawX1(), (x).RawX2(), (x).NodeId(), (x).PoolID() #define LWTYPE_ACTORID ui64, ui64, ui32, ui32 #define LWNAME_ACTORID(n) n "Raw1", n "Raw2", n "NodeId", n "PoolId" @@ -167,8 +167,8 @@ TYPES(ui32, ui64, TString, TString, ui32), \ NAMES("fromPoolId", "toPoolId", "fromPool", "toPool", "cpu")) \ /**/ - -LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER) + +LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER) namespace NActors { struct TActorSystemSetup; diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp index f3a7341980..0e1dbd0031 100644 --- a/library/cpp/actors/core/process_stats.cpp +++ b/library/cpp/actors/core/process_stats.cpp @@ -1,25 +1,25 @@ -#include "actorsystem.h" -#include "actor_bootstrapped.h" -#include "hfunc.h" +#include "actorsystem.h" +#include "actor_bootstrapped.h" +#include "hfunc.h" #include "process_stats.h" - + #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> - + #include <util/datetime/uptime.h> -#include <util/system/defaults.h> -#include <util/stream/file.h> -#include <util/string/vector.h> -#include <util/string/split.h> - -#ifndef _win_ +#include <util/system/defaults.h> +#include <util/stream/file.h> +#include <util/string/vector.h> +#include <util/string/split.h> + +#ifndef _win_ #include <sys/user.h> #include <sys/sysctl.h> -#endif - -namespace NActors { -#ifdef _linux_ - +#endif + +namespace NActors { +#ifdef _linux_ + namespace { template <typename TVal> static bool ExtractVal(const TString& str, const TString& name, TVal& res) { @@ -32,16 +32,16 @@ namespace NActors { res = atol(str.data() + pos); return true; } - + float TicksPerMillisec() { -#ifdef _SC_CLK_TCK +#ifdef _SC_CLK_TCK return sysconf(_SC_CLK_TCK) / 1000.0; -#else +#else return 1.f; -#endif - } +#endif + } } - + bool TProcStat::Fill(pid_t pid) { try { TString strPid(ToString(pid)); @@ -57,9 +57,9 @@ namespace NActors { } // Convert from kB to bytes Rss *= 1024; - + float tickPerMillisec = TicksPerMillisec(); - + TFileInput procStat("/proc/" + strPid + "/stat"); procStat.ReadLine(str); if (!str.empty()) { @@ -114,28 +114,28 @@ namespace NActors { } catch (...) { return false; - } + } return true; - } - + } + long TProcStat::ObtainPageSize() { long sz = sysconf(_SC_PAGESIZE); return sz; } -#else - +#else + bool TProcStat::Fill(pid_t pid) { Y_UNUSED(pid); return false; } - + long TProcStat::ObtainPageSize() { return 0; } -#endif - +#endif + namespace { // Periodically collects process stats and exposes them as mon counters template <typename TDerived> @@ -144,7 +144,7 @@ namespace { static constexpr IActor::EActivityType ActorActivityType() { return IActor::ACTORLIB_STATS; } - + TProcStatCollectingActor(TDuration interval) : Interval(interval) { @@ -154,19 +154,19 @@ namespace { ctx.Schedule(Interval, new TEvents::TEvWakeup()); Self()->Become(&TDerived::StateWork); } - + STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { CFunc(TEvents::TSystem::Wakeup, Wakeup); } } - + private: void Wakeup(const TActorContext& ctx) { Self()->UpdateCounters(ProcStat); ctx.Schedule(Interval, new TEvents::TEvWakeup()); - } - + } + TDerived* Self() { ProcStat.Fill(getpid()); return static_cast<TDerived*>(this); @@ -176,7 +176,7 @@ namespace { const TDuration Interval; TProcStat ProcStat; }; - + // Periodically collects process stats and exposes them as mon counters class TDynamicCounterCollector: public TProcStatCollectingActor<TDynamicCounterCollector> { using TBase = TProcStatCollectingActor<TDynamicCounterCollector>; @@ -295,8 +295,8 @@ namespace { IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters) { return new TDynamicCounterCollector(intervalSec, counters); - } - + } + IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) { return new TRegistryCollector(interval, registry); } diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h index 454c04e6f3..66346d0b5a 100644 --- a/library/cpp/actors/core/process_stats.h +++ b/library/cpp/actors/core/process_stats.h @@ -1,15 +1,15 @@ -#pragma once - -#include "defs.h" -#include "actor.h" - +#pragma once + +#include "defs.h" +#include "actor.h" + #include <library/cpp/monlib/dynamic_counters/counters.h> - + namespace NMonitoring { class TMetricRegistry; } -namespace NActors { +namespace NActors { struct TProcStat { ui64 Rss; ui64 VolCtxSwtch; diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp index 60d972d9aa..febc5e40dd 100644 --- a/library/cpp/actors/core/scheduler_actor.cpp +++ b/library/cpp/actors/core/scheduler_actor.cpp @@ -61,7 +61,7 @@ namespace NActors { public: static constexpr IActor::EActivityType ActorActivityType() { - return IActor::ACTOR_SYSTEM_SCHEDULER_ACTOR; + return IActor::ACTOR_SYSTEM_SCHEDULER_ACTOR; } TSchedulerActor(const TSchedulerConfig& cfg) diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 5a3a6c7a35..880a9d00db 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -80,15 +80,15 @@ SRCS( memory_tracker.cpp memory_tracker.h mon.h - mon_stats.h + mon_stats.h monotonic.cpp monotonic.h worker_context.cpp worker_context.h probes.cpp - probes.h - process_stats.cpp - process_stats.h + probes.h + process_stats.cpp + process_stats.h scheduler_actor.cpp scheduler_actor.h scheduler_basic.cpp diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index 2fa2ce3a45..f9bfaf8dc0 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -72,8 +72,8 @@ private: public: static constexpr auto ActorActivityType() { return SELF_PING_ACTOR; - } - + } + TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) : SendInterval(sendInterval) diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index 9df99d1949..459635fa24 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -27,9 +27,9 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { TDuration::MilliSeconds(100), // sendInterval (unused in test) counter, counter2); - UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0); UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); - + const TActorId actorId = runtime->Register(actor); Y_UNUSED(actorId); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index ea09913bdd..9ede998d8e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -98,10 +98,10 @@ namespace NActors { TInstant Deadline; public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::INTERCONNECT_HANDSHAKE; - } - + static constexpr IActor::EActivityType ActorActivityType() { + return IActor::INTERCONNECT_HANDSHAKE; + } + THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 846388edfc..7fc00dbcc5 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -518,10 +518,10 @@ namespace NActors { TInterconnectProxyCommon::TPtr Common; public: - static constexpr EActivityType ActorActivityType() { - return INTERCONNECT_SESSION_KILLER; - } - + static constexpr EActivityType ActorActivityType() { + return INTERCONNECT_SESSION_KILLER; + } + TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common) : Common(common) { diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index b40a01b3e8..2a8443da71 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -100,10 +100,10 @@ namespace NInterconnect { } public: - static constexpr IActor::EActivityType ActorActivityType() { + static constexpr IActor::EActivityType ActorActivityType() { return IActor::INTERCONNECT_LOAD_RESPONDER; - } - + } + TLoadResponderMasterActor() {} @@ -150,10 +150,10 @@ namespace NInterconnect { std::shared_ptr<std::atomic_uint64_t> Traffic; public: - static constexpr IActor::EActivityType ActorActivityType() { + static constexpr IActor::EActivityType ActorActivityType() { return IActor::INTERCONNECT_LOAD_ACTOR; - } - + } + TLoadActor(const TLoadParams& params) : Params(params) {} diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 9bf3bd1d8d..e75cbcaef4 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -246,10 +246,10 @@ namespace NActors { std::shared_ptr<TPollerThread> PollerThread; public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::INTERCONNECT_POLLER; - } - + static constexpr IActor::EActivityType ActorActivityType() { + return IActor::INTERCONNECT_POLLER; + } + void Bootstrap() { PollerThread = std::make_shared<TPollerThread>(TlsActivationContext->ExecutorThread.ActorSystem); Become(&TPollerActor::StateFunc); diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index 95666d3f7a..ff30b1445e 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -21,10 +21,10 @@ public: TChannelsConfig channelsSettings = TChannelsConfig(), ui32 numDynamicNodes = 0, ui32 numThreads = 1) { TActorSystemSetup setup; - setup.NodeId = nodeId; - setup.ExecutorsCount = 1; + setup.NodeId = nodeId; + setup.ExecutorsCount = 1; setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]); - for (ui32 i = 0; i < setup.ExecutorsCount; ++i) { + for (ui32 i = 0; i < setup.ExecutorsCount; ++i) { setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */)); } setup.Scheduler.Reset(new TBasicSchedulerThread()); diff --git a/library/cpp/actors/protos/unittests.proto b/library/cpp/actors/protos/unittests.proto index 64503bc1c1..a856b0942a 100644 --- a/library/cpp/actors/protos/unittests.proto +++ b/library/cpp/actors/protos/unittests.proto @@ -1,5 +1,5 @@ -option cc_enable_arenas = true; - +option cc_enable_arenas = true; + message TSimple { required string Str1 = 1; optional string Str2 = 2; @@ -16,5 +16,5 @@ message TBigMessage { message TMessageWithPayload { optional string Meta = 1; repeated uint32 PayloadId = 2; - repeated string SomeData = 3; + repeated string SomeData = 3; } diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index fd61e4c720..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -19,7 +19,7 @@ #include <util/string/printf.h> #include <typeinfo> -bool VERBOSE = false; +bool VERBOSE = false; const bool PRINT_EVENT_BODY = false; namespace { @@ -86,8 +86,8 @@ namespace NActors { public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; - } - + } + TEdgeActor(TTestActorRuntimeBase* runtime) : TActor(&TEdgeActor::StateFunc) , Runtime(runtime) @@ -722,9 +722,9 @@ namespace NActors { } void TTestActorRuntimeBase::SetVerbose(bool verbose) { - VERBOSE = verbose; - } - + VERBOSE = verbose; + } + void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { Y_VERIFY(!IsInitialized); Y_VERIFY(nodeIndex < NodeCount); @@ -1038,10 +1038,10 @@ namespace NActors { bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { TGuard<TMutex> guard(Mutex); - return DispatchEventsInternal(options, simDeadline); - } - - // Mutex must be locked by caller! + return DispatchEventsInternal(options, simDeadline); + } + + // Mutex must be locked by caller! bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { TDispatchContext localContext; localContext.Options = &options; @@ -1253,9 +1253,9 @@ namespace NActors { if (!localContext.FoundNonEmptyMailboxes.empty()) return true; - if (options.CustomFinalCondition && options.CustomFinalCondition()) - return true; - + if (options.CustomFinalCondition && options.CustomFinalCondition()) + return true; + if (options.FinalEvents.empty()) { for (auto& mbox : currentMailboxes) { if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) @@ -1755,8 +1755,8 @@ namespace NActors { public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; - } - + } + TReplyActor(TStrandingActorDecorator* owner) : TActor(&TReplyActor::StateFunc) , Owner(owner) @@ -1771,8 +1771,8 @@ namespace NActors { static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; - } - + } + TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 90de87b5ac..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -93,7 +93,7 @@ namespace NActors { TVector<TFinalEventCondition> FinalEvents; TVector<TEventMailboxId> NonEmptyMailboxes; TVector<TEventMailboxId> OnlyMailboxes; - std::function<bool()> CustomFinalCondition; + std::function<bool()> CustomFinalCondition; bool Quiet = false; }; @@ -219,8 +219,8 @@ namespace NActors { TEventFilter SetEventFilter(TEventFilter filterFunc); TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); - static bool IsVerbose(); - static void SetVerbose(bool verbose); + static bool IsVerbose(); + static void SetVerbose(bool verbose); TDuration SetDispatchTimeout(TDuration timeout); void SetDispatchedEventsLimit(ui64 limit) { DispatchedEventsLimit = limit; @@ -499,7 +499,7 @@ namespace NActors { void ClearMailbox(ui32 nodeId, ui32 hint); void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); - bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); + bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); private: ui64 ScheduledCount; |