aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-14 17:02:27 +0300
committeralexvru <alexvru@ydb.tech>2023-04-14 17:02:27 +0300
commit736b3322b1251e1eeee862edb467fb3bf3ce1659 (patch)
tree4cbc43c59d5165007ebf12b12bc38789cc0ed804 /library/cpp
parent88ed50008f008d06269417d8d9fe2fa166288d17 (diff)
downloadydb-736b3322b1251e1eeee862edb467fb3bf3ce1659.tar.gz
Report stuck actors
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/actor.h8
-rw-r--r--library/cpp/actors/core/actorsystem.h4
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp41
-rw-r--r--library/cpp/actors/core/executor_pool_base.h6
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp3
-rw-r--r--library/cpp/actors/core/executor_thread.cpp16
-rw-r--r--library/cpp/actors/core/mon_stats.h3
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h6
8 files changed, 87 insertions, 0 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index b91618a5f5..4a674f2108 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -313,6 +313,13 @@ namespace NActors {
i64 ElapsedTicks;
friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
friend class TDecorator;
+
+ private: // stuck actor monitoring
+ TMonotonic LastReceiveTimestamp;
+ size_t StuckIndex = Max<size_t>();
+ friend class TExecutorPoolBaseMailboxed;
+ friend class TExecutorThread;
+
protected:
TActorCallbackBehaviour CImpl;
public:
@@ -494,6 +501,7 @@ namespace NActors {
void Receive(TAutoPtr<IEventHandle>& ev, const TActorContext& /*ctx*/) {
++HandledEvents;
+ LastReceiveTimestamp = TActivationContext::Monotonic();
if (CImpl.Initialized()) {
CImpl.Receive(this, ev);
} else {
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 73fb317707..9db3075ccf 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -110,6 +110,8 @@ namespace NActors {
TInterconnectSetup Interconnect;
+ bool MonitorStuckActors = false;
+
using TLocalServices = TVector<std::pair<TActorId, TActorSetupCmd>>;
TLocalServices LocalServices;
@@ -190,6 +192,8 @@ namespace NActors {
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
+ bool MonitorStuckActors() const { return SystemSetup->MonitorStuckActors; }
+
private:
typedef bool (IExecutorPool::*TEPSendFunction)(TAutoPtr<IEventHandle>& ev);
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index a997cc21f9..047503b78e 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -27,6 +27,33 @@ namespace NActors {
MailboxTable.Destroy();
}
+#if defined(ACTORSLIB_COLLECT_EXEC_STATS)
+ void TExecutorPoolBaseMailboxed::RecalculateStuckActors(TExecutorThreadStats& stats) const {
+ if (!ActorSystem || !ActorSystem->MonitorStuckActors()) {
+ return;
+ }
+
+ const TMonotonic now = ActorSystem->Monotonic();
+
+ std::vector<ui32> stuckActors;
+ stuckActors.reserve(Actors.size());
+
+ with_lock (StuckObserverMutex) {
+ for (IActor *actor : Actors) {
+ const TDuration delta = now - actor->LastReceiveTimestamp;
+ if (delta > TDuration::Seconds(30)) {
+ stuckActors.push_back(actor->GetActivityType());
+ }
+ }
+ }
+
+ std::fill(stats.StuckActorsByActivity.begin(), stats.StuckActorsByActivity.end(), 0);
+ for (ui32 activity : stuckActors) {
+ ++stats.StuckActorsByActivity[activity];
+ }
+ }
+#endif
+
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, ui32 maxActivityType)
: TExecutorPoolBaseMailboxed(poolId, maxActivityType)
, PoolThreads(threads)
@@ -95,6 +122,13 @@ namespace NActors {
if (at >= Stats.MaxActivityType())
at = 0;
AtomicIncrement(Stats.ActorsAliveByActivity[at]);
+ if (ActorSystem->MonitorStuckActors()) {
+ with_lock (StuckObserverMutex) {
+ Y_VERIFY(actor->StuckIndex == Max<size_t>());
+ actor->StuckIndex = Actors.size();
+ Actors.push_back(actor);
+ }
+ }
#endif
AtomicIncrement(ActorRegistrations);
@@ -169,6 +203,13 @@ namespace NActors {
if (at >= Stats.MaxActivityType())
at = 0;
AtomicIncrement(Stats.ActorsAliveByActivity[at]);
+ if (ActorSystem->MonitorStuckActors()) {
+ with_lock (StuckObserverMutex) {
+ Y_VERIFY(actor->StuckIndex == Max<size_t>());
+ actor->StuckIndex = Actors.size();
+ Actors.push_back(actor);
+ }
+ }
#endif
AtomicIncrement(ActorRegistrations);
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index 959b271e89..d9286f6b11 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -18,6 +18,12 @@ namespace NActors {
// Need to have per pool object to collect stats like actor registrations (because
// registrations might be done in threads from other pools)
TExecutorThreadStats Stats;
+
+ // Stuck actor monitoring
+ TMutex StuckObserverMutex;
+ std::vector<IActor*> Actors;
+ friend class TExecutorThread;
+ void RecalculateStuckActors(TExecutorThreadStats& stats) const;
#endif
TAtomic RegisterRevolvingCounter = 0;
ui64 AllocateID();
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index fc87daed77..8e103c3911 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -355,6 +355,9 @@ namespace NActors {
// Save counters from the pool object
statsCopy[0] = TExecutorThreadStats();
statsCopy[0].Aggregate(Stats);
+#if defined(ACTORSLIB_COLLECT_EXEC_STATS)
+ RecalculateStuckActors(statsCopy[0]);
+#endif
// Per-thread stats
for (size_t i = 0; i < PoolThreads; ++i) {
Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]);
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 611899c6ef..cd6e8c6d7b 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -5,6 +5,7 @@
#include "mailbox.h"
#include "event.h"
#include "events.h"
+#include "executor_pool_base.h"
#include <library/cpp/actors/prof/tag.h>
#include <library/cpp/actors/util/affinity.h>
@@ -64,6 +65,21 @@ namespace NActors {
}
void TExecutorThread::DropUnregistered() {
+#if defined(ACTORSLIB_COLLECT_EXEC_STATS)
+ if (ActorSystem->MonitorStuckActors()) {
+ if (auto *pool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ExecutorPool)) {
+ with_lock (pool->StuckObserverMutex) {
+ for (const auto& actor : DyingActors) {
+ const size_t i = actor->StuckIndex;
+ auto& actorPtr = pool->Actors[i];
+ actorPtr = pool->Actors.back();
+ actorPtr->StuckIndex = i;
+ pool->Actors.pop_back();
+ }
+ }
+ }
+ }
+#endif
DyingActors.clear(); // here is actual destruction of actors
}
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 5d61c9f87c..40e5f651e9 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -97,6 +97,7 @@ namespace NActors {
TVector<ui64> ReceivedEventsByActivity;
TVector<i64> ActorsAliveByActivity; // the sum should be positive, but per-thread might be negative
TVector<ui64> ScheduledEventsByActivity;
+ TVector<ui64> StuckActorsByActivity;
ui64 PoolActorRegistrations = 0;
ui64 PoolDestroyedActors = 0;
ui64 PoolAllocatedMailboxes = 0;
@@ -111,6 +112,7 @@ namespace NActors {
, ReceivedEventsByActivity(activityVecSize)
, ActorsAliveByActivity(activityVecSize)
, ScheduledEventsByActivity(activityVecSize)
+ , StuckActorsByActivity(activityVecSize)
{}
template <typename T>
@@ -152,6 +154,7 @@ namespace NActors {
AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity);
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
+ AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
RelaxedStore(
&PoolActorRegistrations,
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 51ace0e3cc..16a64581ef 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -56,6 +56,7 @@ private:
ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
+ StuckActorsByActivityBuckets.resize(GetActivityTypeCount());
}
void Set(const TExecutorThreadStats& stats) {
@@ -65,6 +66,7 @@ private:
ui64 events = stats.ReceivedEventsByActivity[i];
ui64 actors = stats.ActorsAliveByActivity[i];
ui64 scheduled = stats.ScheduledEventsByActivity[i];
+ ui64 stuck = stats.StuckActorsByActivity[i];
if (!ActorsAliveByActivityBuckets[i]) {
if (ticks || events || actors || scheduled) {
@@ -78,6 +80,7 @@ private:
*ReceivedEventsByActivityBuckets[i] = events;
*ActorsAliveByActivityBuckets[i] = actors;
*ScheduledEventsByActivityBuckets[i] = scheduled;
+ *StuckActorsByActivityBuckets[i] = stuck;
}
}
@@ -95,6 +98,8 @@ private:
Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
ScheduledEventsByActivityBuckets[activityType] =
Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+ StuckActorsByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false);
}
private:
@@ -104,6 +109,7 @@ private:
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> StuckActorsByActivityBuckets;
};
struct TExecutorPoolCounters {