diff options
author | alexvru <alexvru@ydb.tech> | 2023-08-07 19:44:47 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-08-07 21:49:19 +0300 |
commit | 39be171bb25ea0ee82a970b3ddacd128fbb95845 (patch) | |
tree | 31bfcbff36bbc411f16e849cc7ec9fa357646dc4 /library/cpp | |
parent | 2a7d26c691d3f5c8f056443dd190679bcc293f84 (diff) | |
download | ydb-39be171bb25ea0ee82a970b3ddacd128fbb95845.tar.gz |
Report actor usage activity KIKIMR-11082
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/actor.cpp | 56 | ||||
-rw-r--r-- | library/cpp/actors/core/actor.h | 30 | ||||
-rw-r--r-- | library/cpp/actors/core/defs.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.cpp | 64 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/mailbox.cpp | 23 | ||||
-rw-r--r-- | library/cpp/actors/core/mailbox.h | 48 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 11 | ||||
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 10 |
10 files changed, 208 insertions, 43 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index 5e28d30e5e..73edd82067 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -8,6 +8,62 @@ namespace NActors { Y_POD_THREAD(TThreadContext*) TlsThreadContext(nullptr); Y_POD_THREAD(TActivationContext*) TlsActivationContext(nullptr); + template<i64 Increment> + static void UpdateQueueSizeAndTimestamp(TActorUsageImpl<true>& impl, ui64 time) { + ui64 usedTimeIncrement = 0; + using T = TActorUsageImpl<true>; + + for (;;) { + uint64_t value = impl.QueueSizeAndTimestamp.load(); + ui64 count = value >> T::TimestampBits; + + count += Increment; + Y_VERIFY((count & ~T::CountMask) == 0); + + ui64 timestamp = value; + if (Increment == 1 && count == 1) { + timestamp = time; + } else if (Increment == -1 && count == 0) { + usedTimeIncrement = (static_cast<ui64>(time) - timestamp) & T::TimestampMask; + timestamp = 0; // reset timestamp to some zero value + } + + const ui64 updated = (timestamp & T::TimestampMask) | (count << T::TimestampBits); + if (impl.QueueSizeAndTimestamp.compare_exchange_weak(value, updated)) { + break; + } + } + + if (usedTimeIncrement && impl.LastUsageTimestamp <= time) { + impl.UsedTime += usedTimeIncrement; + } + } + + void TActorUsageImpl<true>::OnEnqueueEvent(ui64 time) { + UpdateQueueSizeAndTimestamp<+1>(*this, time); + } + + void TActorUsageImpl<true>::OnDequeueEvent() { + UpdateQueueSizeAndTimestamp<-1>(*this, GetCycleCountFast()); + } + + double TActorUsageImpl<true>::GetUsage(ui64 time) { + ui64 used = UsedTime.exchange(0); + if (const ui64 value = QueueSizeAndTimestamp.load(); value >> TimestampBits) { + used += (static_cast<ui64>(time) - value) & TimestampMask; + } + + Y_VERIFY(LastUsageTimestamp <= time); + ui64 passed = time - LastUsageTimestamp; + LastUsageTimestamp = time; + + if (!passed) { + return 0; + } + + return (double)Min(passed, used) / passed; + } + void IActor::Describe(IOutputStream &out) const noexcept { SelfActorId.Out(out); } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 9fbb942ed2..ac6aef7864 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -309,7 +309,35 @@ namespace NActors { }; - class IActor: protected IActorOps { + template<bool> + struct TActorUsageImpl { + void OnEnqueueEvent(ui64 /*time*/) {} // called asynchronously when event is put in the mailbox + void OnDequeueEvent() {} // called when processed by Executor + double GetUsage(ui64 /*time*/) { return 0; } // called from collector thread + void DoActorInit() {} + }; + + template<> + struct TActorUsageImpl<true> { + static constexpr int TimestampBits = 40; + static constexpr int CountBits = 24; + static constexpr ui64 TimestampMask = ((ui64)1 << TimestampBits) - 1; + static constexpr ui64 CountMask = ((ui64)1 << CountBits) - 1; + + std::atomic_uint64_t QueueSizeAndTimestamp = 0; + std::atomic_uint64_t UsedTime = 0; // how much time did we consume since last GetUsage() call + ui64 LastUsageTimestamp = 0; // when GetUsage() was called the last time + + void OnEnqueueEvent(ui64 time); + void OnDequeueEvent(); + double GetUsage(ui64 time); + void DoActorInit() { LastUsageTimestamp = GetCycleCountFast(); } + }; + + class IActor + : protected IActorOps + , public TActorUsageImpl<ActorLibCollectUsageStats> + { private: TActorIdentity SelfActorId; i64 ElapsedTicks; diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h index 276ebef219..64b90e995d 100644 --- a/library/cpp/actors/core/defs.h +++ b/library/cpp/actors/core/defs.h @@ -12,6 +12,8 @@ // event processing time histograms #define ACTORSLIB_COLLECT_EXEC_STATS +static constexpr bool ActorLibCollectUsageStats = false; + namespace NActors { using TPoolId = ui8; using TPoolsMask = ui64; diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 1671190f5c..10362a7a2b 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -11,6 +11,7 @@ namespace NActors { void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) { actor->SelfActorId = self; + actor->DoActorInit(); actor->Registered(sys, owner); } @@ -32,21 +33,33 @@ namespace NActors { const TMonotonic now = ActorSystem->Monotonic(); - std::vector<ui32> stuckActors; - stuckActors.reserve(Actors.size()); + for (auto& u : stats.UsageByActivity) { + u.fill(0); + } + + auto accountUsage = [&](ui32 activityType, double usage) { + Y_VERIFY(0 <= usage); + Y_VERIFY(usage <= 1); + int bin = Min<int>(9, usage * 10); + ++stats.UsageByActivity[activityType][bin]; + }; + + std::fill(stats.StuckActorsByActivity.begin(), stats.StuckActorsByActivity.end(), 0); with_lock (StuckObserverMutex) { - for (IActor *actor : Actors) { + for (size_t i = 0; i < Actors.size(); ++i) { + IActor *actor = Actors[i]; + Y_VERIFY(actor->StuckIndex == i); const TDuration delta = now - actor->LastReceiveTimestamp; if (delta > TDuration::Seconds(30)) { - stuckActors.push_back(actor->GetActivityType()); + ++stats.StuckActorsByActivity[actor->GetActivityType()]; } + accountUsage(actor->GetActivityType(), actor->GetUsage(GetCycleCountFast())); } - } - - std::fill(stats.StuckActorsByActivity.begin(), stats.StuckActorsByActivity.end(), 0); - for (ui32 activity : stuckActors) { - ++stats.StuckActorsByActivity[activity]; + for (const auto& [activityType, usage] : DeadActorsUsage) { + accountUsage(activityType, usage); + } + DeadActorsUsage.clear(); } } #endif @@ -122,13 +135,6 @@ namespace NActors { Y_VERIFY(at < Stats.ActorsAliveByActivity.size()); } AtomicIncrement(Stats.ActorsAliveByActivity[at]); - if (ActorSystem->MonitorStuckActors()) { - with_lock (StuckObserverMutex) { - Y_VERIFY(actor->StuckIndex == Max<size_t>()); - actor->StuckIndex = Actors.size(); - Actors.push_back(actor); - } - } #endif AtomicIncrement(ActorRegistrations); @@ -165,6 +171,16 @@ namespace NActors { const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); DoActorInit(ActorSystem, actor, actorId, parentId); +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + if (ActorSystem->MonitorStuckActors()) { + with_lock (StuckObserverMutex) { + Y_VERIFY(actor->StuckIndex == Max<size_t>()); + actor->StuckIndex = Actors.size(); + Actors.push_back(actor); + } + } +#endif + // Once we unlock the mailbox the actor starts running and we cannot use the pointer any more actor = nullptr; @@ -203,6 +219,16 @@ namespace NActors { if (at >= Stats.MaxActivityType()) at = 0; AtomicIncrement(Stats.ActorsAliveByActivity[at]); +#endif + AtomicIncrement(ActorRegistrations); + + const ui64 localActorId = AllocateID(); + mailbox->AttachActor(localActorId, actor); + + const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); + DoActorInit(ActorSystem, actor, actorId, parentId); + +#ifdef ACTORSLIB_COLLECT_EXEC_STATS if (ActorSystem->MonitorStuckActors()) { with_lock (StuckObserverMutex) { Y_VERIFY(actor->StuckIndex == Max<size_t>()); @@ -211,13 +237,7 @@ namespace NActors { } } #endif - AtomicIncrement(ActorRegistrations); - const ui64 localActorId = AllocateID(); - mailbox->AttachActor(localActorId, actor); - - const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); - DoActorInit(ActorSystem, actor, actorId, parentId); NHPTimer::STime elapsed = GetCycleCountFast() - hpstart; if (elapsed > 1000000) { LWPROBE(SlowRegisterAdd, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0); diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index 60714f8441..e94ffdbad9 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -22,6 +22,7 @@ namespace NActors { // Stuck actor monitoring TMutex StuckObserverMutex; std::vector<IActor*> Actors; + mutable std::vector<std::tuple<ui32, double>> DeadActorsUsage; friend class TExecutorThread; void RecalculateStuckActors(TExecutorThreadStats& stats) const; #endif diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index e8fff6857c..d5323785be 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -93,6 +93,7 @@ namespace NActors { actorPtr = pool->Actors.back(); actorPtr->StuckIndex = i; pool->Actors.pop_back(); + pool->DeadActorsUsage.emplace_back(actor->GetActivityType(), actor->GetUsage(GetCycleCountFast())); } } } @@ -171,6 +172,7 @@ namespace NActors { bool preempted = false; for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) { if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) { + mailbox->ProcessEvents(mailbox); NHPTimer::STime hpnow; recipient = evExt->GetRecipientRewrite(); TActorContext ctx(*mailbox, *this, hpprev, recipient); @@ -178,7 +180,6 @@ namespace NActors { // move for destruct before ctx; auto ev = std::move(evExt); if (actor = mailbox->FindActor(recipient.LocalId())) { - // Since actor is not null there should be no exceptions actorType = &typeid(*actor); @@ -212,11 +213,14 @@ namespace NActors { } actor->Receive(ev); + mailbox->ProcessEvents(mailbox); + actor->OnDequeueEvent(); size_t dyingActorsCnt = DyingActors.size(); Ctx.UpdateActorsStats(dyingActorsCnt); if (dyingActorsCnt) { DropUnregistered(); + mailbox->ProcessEvents(mailbox); actor = nullptr; } diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index df63480d56..b28fdc0771 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -180,6 +180,7 @@ namespace NActors { switch (x->MailboxType) { case TMailboxType::Simple: { TSimpleMailbox* const mailbox = TSimpleMailbox::Get(lineHint, x); + mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); #endif @@ -204,6 +205,7 @@ namespace NActors { return false; TRevolvingMailbox* const mailbox = TRevolvingMailbox::Get(lineHint, x); + mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); #endif @@ -216,6 +218,7 @@ namespace NActors { return true; case TMailboxType::HTSwap: { THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x); + mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); #endif @@ -231,6 +234,7 @@ namespace NActors { return false; TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x); + mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); #endif @@ -246,6 +250,7 @@ namespace NActors { return false; TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x); + mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); #endif @@ -412,6 +417,24 @@ namespace NActors { } } + TMailboxUsageImpl<true>::~TMailboxUsageImpl() { + while (auto *e = PendingEventQueue.Pop()) { + delete e; + } + } + + void TMailboxUsageImpl<true>::Push(ui64 localId) { + PendingEventQueue.Push(new TPendingEvent{localId, GetCycleCountFast()}); + } + + void TMailboxUsageImpl<true>::ProcessEvents(TMailboxHeader *mailbox) { + while (std::unique_ptr<TPendingEvent> e{PendingEventQueue.Pop()}) { + if (IActor *actor = mailbox->FindActor(e->LocalId)) { + actor->OnEnqueueEvent(e->Timestamp); + } + } + } + TMailboxTable::TSimpleMailbox::TSimpleMailbox() : TMailboxHeader(TMailboxType::Simple) , ScheduleMoment(0) diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 6339b6fc23..0f1c3abc10 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -25,7 +25,30 @@ namespace NActors { // 17 bits: line // 12 bits: index of mailbox inside of line - struct TMailboxHeader { + struct TMailboxHeader; + + template<bool> + struct TMailboxUsageImpl { + void Push(ui64 /*localId*/) {} + void ProcessEvents(TMailboxHeader* /*mailbox*/) {} + }; + + template<> + struct TMailboxUsageImpl<true> { + struct TPendingEvent { + ui64 LocalId; + ui64 Timestamp; + }; + NThreading::TReadAsFilledQueue<TPendingEvent> PendingEventQueue; + + ~TMailboxUsageImpl(); + void Push(ui64 localId); + void ProcessEvents(TMailboxHeader *mailbox); + }; + + struct TMailboxHeader + : TMailboxUsageImpl<ActorLibCollectUsageStats> + { struct TMailboxActorPack { enum EType { Simple = 0, @@ -351,7 +374,7 @@ namespace NActors { } static TSimpleMailbox* Get(ui32 hint, void* line) { - return (TSimpleMailbox*)((ui8*)line + hint * 64); // + return (TSimpleMailbox*)((ui8*)line + 64 + (hint - 1) * AlignedSize()); // } static const TMailboxType::EType MailboxType = TMailboxType::Simple; constexpr static ui32 AlignedSize() { @@ -362,8 +385,6 @@ namespace NActors { bool CleanupEvents(); }; - static_assert(sizeof(TSimpleMailbox) == 64, "expect sizeof(TSimpleMailbox) == 64"); - struct TRevolvingMailbox: public TMailboxHeader { // 4 bytes - state // 4 bytes - knobs @@ -387,7 +408,7 @@ namespace NActors { } static TRevolvingMailbox* Get(ui32 hint, void* line) { - return (TRevolvingMailbox*)((ui8*)line + 64 + (hint - 1) * 128); + return (TRevolvingMailbox*)((ui8*)line + 64 + (hint - 1) * AlignedSize()); } constexpr static ui64 MaxMailboxesInLine() { @@ -402,8 +423,6 @@ namespace NActors { bool CleanupEvents(); }; - static_assert(sizeof(TRevolvingMailbox) == 128, "expect sizeof(TRevolvingMailbox) == 128"); - struct THTSwapMailbox: public TMailboxHeader { using TQueueType = NThreading::THTSwapQueue<IEventHandle*>; @@ -430,7 +449,7 @@ namespace NActors { } static THTSwapMailbox* Get(ui32 hint, void* line) { - return (THTSwapMailbox*)((ui8*)line + 64 + (hint - 1) * 64); + return (THTSwapMailbox*)((ui8*)line + 64 + (hint - 1) * AlignedSize()); } constexpr static ui64 MaxMailboxesInLine() { @@ -451,9 +470,6 @@ namespace NActors { } }; - static_assert(sizeof(THTSwapMailbox) == 64, - "expect sizeof(THTSwapMailbox) == 64"); - struct TReadAsFilledMailbox: public TMailboxHeader { using TQueueType = NThreading::TReadAsFilledQueue<IEventHandle>; @@ -480,7 +496,7 @@ namespace NActors { } static TReadAsFilledMailbox* Get(ui32 hint, void* line) { - return (TReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192); + return (TReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * AlignedSize()); } constexpr static ui64 MaxMailboxesInLine() { @@ -502,9 +518,6 @@ namespace NActors { } }; - static_assert(sizeof(TReadAsFilledMailbox) == 192, - "expect sizeof(TReadAsFilledMailbox) == 192"); - struct TTinyReadAsFilledMailbox: public TMailboxHeader { using TQueueType = NThreading::TReadAsFilledQueue< IEventHandle, @@ -533,7 +546,7 @@ namespace NActors { } static TTinyReadAsFilledMailbox* Get(ui32 hint, void* line) { - return (TTinyReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192); + return (TTinyReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * AlignedSize()); } constexpr static ui64 MaxMailboxesInLine() { @@ -554,8 +567,5 @@ namespace NActors { return done; } }; - - static_assert(sizeof(TTinyReadAsFilledMailbox) == 192, - "expect sizeof(TTinyReadAsFilledMailbox) == 192"); }; } diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 536628b518..30db3ac787 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -101,6 +101,7 @@ namespace NActors { TVector<i64> ActorsAliveByActivity; // the sum should be positive, but per-thread might be negative TVector<ui64> ScheduledEventsByActivity; TVector<ui64> StuckActorsByActivity; + TVector<std::array<ui64, 10>> UsageByActivity; ui64 PoolActorRegistrations = 0; ui64 PoolDestroyedActors = 0; ui64 PoolAllocatedMailboxes = 0; @@ -116,6 +117,7 @@ namespace NActors { , ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) , ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) , StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + , UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) {} template <typename T> @@ -159,6 +161,15 @@ namespace NActors { AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity); AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity); + if (UsageByActivity.size() < other.UsageByActivity.size()) { + UsageByActivity.resize(other.UsageByActivity.size()); + } + for (size_t i = 0; i < UsageByActivity.size(); ++i) { + for (size_t j = 0; j < 10; ++j) { + UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]); + } + } + RelaxedStore( &PoolActorRegistrations, std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations))); diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 471fd7211c..9e6550dace 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -57,6 +57,7 @@ private: ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); StuckActorsByActivityBuckets.resize(GetActivityTypeCount()); + UsageByActivityBuckets.resize(GetActivityTypeCount()); } void Set(const TExecutorThreadStats& stats) { @@ -81,6 +82,10 @@ private: *ActorsAliveByActivityBuckets[i] = actors; *ScheduledEventsByActivityBuckets[i] = scheduled; *StuckActorsByActivityBuckets[i] = stuck; + + for (ui32 j = 0; j < 10; ++j) { + *UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j]; + } } } @@ -100,6 +105,10 @@ private: Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); StuckActorsByActivityBuckets[activityType] = Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false); + + for (ui32 i = 0; i < 10; ++i) { + UsageByActivityBuckets[activityType][i] = Group->GetSubgroup("sensor", "UsageByActivity")->GetSubgroup("bin", ToString(i))->GetNamedCounter("activity", bucketName, false); + } } private: @@ -110,6 +119,7 @@ private: TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets; TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets; TVector<NMonitoring::TDynamicCounters::TCounterPtr> StuckActorsByActivityBuckets; + TVector<std::array<NMonitoring::TDynamicCounters::TCounterPtr, 10>> UsageByActivityBuckets; }; struct TExecutorPoolCounters { |