aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2025-07-15 17:08:05 +0300
committerGitHub <noreply@github.com>2025-07-15 17:08:05 +0300
commit7ca01aaa004d67bfb3e4e97266a45b25cb3024df (patch)
tree86914a7f7d15c29f895961069bffbe94c0f411b2
parent34938e8080238fd512465aa6f9c57d7cd8fb54c4 (diff)
downloadydb-7ca01aaa004d67bfb3e4e97266a45b25cb3024df.tar.gz
Add test for pulling stats when threads were blocked (#21144)
-rw-r--r--ydb/library/actors/core/execution_stats.h8
-rw-r--r--ydb/library/actors/core/executor_thread.cpp1
-rw-r--r--ydb/library/actors/core/ut/actor_ut.cpp90
3 files changed, 97 insertions, 2 deletions
diff --git a/ydb/library/actors/core/execution_stats.h b/ydb/library/actors/core/execution_stats.h
index 44f59f29255..03de5491070 100644
--- a/ydb/library/actors/core/execution_stats.h
+++ b/ydb/library/actors/core/execution_stats.h
@@ -154,8 +154,7 @@ namespace NActors {
} else {
RelaxedStore(&Stats->CpuUs, (ui64)RelaxedLoad(&Stats->CpuUs) + cpuUs);
}
- RelaxedStore(&Stats->SafeElapsedTicks, (ui64)RelaxedLoad(&Stats->ElapsedTicks));
- RelaxedStore(&Stats->SafeParkedTicks, (ui64)RelaxedLoad(&Stats->ParkedTicks));
+ CopySafeTicks();
}
void IncreaseNotEnoughCpuExecutions() {
@@ -188,5 +187,10 @@ namespace NActors {
{
Stats = stats;
}
+
+ void CopySafeTicks() {
+ RelaxedStore(&Stats->SafeElapsedTicks, (ui64)RelaxedLoad(&Stats->ElapsedTicks));
+ RelaxedStore(&Stats->SafeParkedTicks, (ui64)RelaxedLoad(&Stats->ParkedTicks));
+ }
};
}
diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp
index 42ec51d28d9..2b0b20354b2 100644
--- a/ydb/library/actors/core/executor_thread.cpp
+++ b/ydb/library/actors/core/executor_thread.cpp
@@ -554,6 +554,7 @@ namespace NActors {
NHPTimer::STime passedTime = Max<i64>(hpnow - activationStart, 0);
ExecutionStats.SetCurrentActivationTime(activityType, Ts2Us(passedTime));
}
+ ExecutionStats.CopySafeTicks();
}
void TExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) {
diff --git a/ydb/library/actors/core/ut/actor_ut.cpp b/ydb/library/actors/core/ut/actor_ut.cpp
index 76b698eafb3..98a0eff901c 100644
--- a/ydb/library/actors/core/ut/actor_ut.cpp
+++ b/ydb/library/actors/core/ut/actor_ut.cpp
@@ -24,6 +24,96 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSettings = TActorBenchmark::TSettings;
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;
+ class TLongRunningActor : public TActorBootstrapped<TLongRunningActor> {
+ public:
+ TLongRunningActor(std::atomic<bool>& stop, ui64 seconds, TThreadParkPad* startPad)
+ : Seconds(seconds)
+ , StartPad(startPad)
+ , StopFlag(stop)
+ {}
+
+ void Bootstrap() {
+ Cerr << "Unpark startPad" << Endl;
+ StartPad->Unpark();
+ constexpr ui64 BufferSize = 10000;
+ TInstant start = TActivationContext::Now();
+ ui64 counter = 0;
+ Buffer.resize(BufferSize);
+ while (!StopFlag && TActivationContext::Now() - start < TDuration::Seconds(Seconds)) {
+ counter++;
+ ui64 sum = counter;
+ for (ui64 i = 0; i < BufferSize; i++) {
+ sum += Buffer[i];
+ }
+ Buffer[counter % BufferSize] = sum;
+ }
+ PassAway();
+ }
+ private:
+ TVector<ui64> Buffer;
+ ui64 Seconds;
+ TThreadParkPad* StartPad;
+ std::atomic<bool>& StopFlag;
+ };
+
+ Y_UNIT_TEST(GetStatisticsWithLongRunningActor) {
+ THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
+ TActorBenchmark::AddBasicPool(setup, 1, true, false);
+
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ TThreadParkPad startPad;
+ TThreadParkPad stopPad;
+ TAtomic actorsAlive = 0;
+ std::atomic<bool> stop = false;
+
+ THolder<IActor> longRunningActor{
+ new TTestEndDecorator(THolder(new TLongRunningActor(stop, 1, &startPad)), &stopPad, &actorsAlive)
+ };
+ actorSystem.Register(longRunningActor.Release(), TMailboxType::HTSwap, 0);
+
+ Cerr << "Park startPad" << Endl;
+ startPad.Park();
+
+ ui64 CpuUs = 0;
+ ui64 ElapsedUs = 0;
+ ui64 SafeElapsedUs= 0;
+ for (ui32 i = 0; i < 10; ++i) {
+ NanoSleep(1'000'000);
+ TVector<TExecutorThreadStats> executorThreadStats;
+ TExecutorPoolStats poolStats;
+ actorSystem.GetPoolStats(0, poolStats, executorThreadStats);
+
+ ui64 newCpuUs = 0;
+ ui64 newElapsedUs = 0;
+ ui64 newSafeElapsedUs = 0;
+ for (auto &thread : executorThreadStats) {
+ newCpuUs += thread.CpuUs;
+ newElapsedUs += Ts2Us(thread.ElapsedTicks);
+ newSafeElapsedUs += Ts2Us(thread.SafeElapsedTicks);
+ }
+
+ Cerr << "Iteration " << i << " completed" << Endl;
+ Cerr << "CpuUs: " << CpuUs << " new: " << newCpuUs << Endl;
+ Cerr << "ElapsedUs: " << ElapsedUs << " new: " << newElapsedUs << Endl;
+ Cerr << "SafeElapsedUs: " << SafeElapsedUs << " new: " << newSafeElapsedUs << Endl;
+
+ UNIT_ASSERT_VALUES_UNEQUAL(CpuUs, newCpuUs);
+ UNIT_ASSERT_VALUES_UNEQUAL(ElapsedUs, newElapsedUs);
+ UNIT_ASSERT_VALUES_UNEQUAL(SafeElapsedUs, newSafeElapsedUs);
+
+ CpuUs = newCpuUs;
+ ElapsedUs = newElapsedUs;
+ SafeElapsedUs = newSafeElapsedUs;
+ }
+ stop = true;
+
+ Cerr << "Park stopPad" << Endl;
+ stopPad.Park();
+ actorSystem.Stop();
+ }
+
Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 1, 1, true);