aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors/core
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors/core')
-rw-r--r--library/cpp/actors/core/actorsystem.h16
-rw-r--r--library/cpp/actors/core/config.h12
-rw-r--r--library/cpp/actors/core/executor_pool.h14
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp19
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h3
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp2
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp2
-rw-r--r--library/cpp/actors/core/harmonizer.cpp65
-rw-r--r--library/cpp/actors/core/harmonizer.h1
-rw-r--r--library/cpp/actors/core/mon_stats.h9
-rw-r--r--library/cpp/actors/core/worker_context.h3
11 files changed, 88 insertions, 58 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 8051f5ee57..cd2cfda1bb 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -122,7 +122,21 @@ namespace NActors {
}
ui32 GetThreads(ui32 poolId) const {
- return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId);
+ auto result = GetThreadsOptional(poolId);
+ Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
+ return *result;
+ }
+
+ std::optional<ui32> GetThreadsOptional(const ui32 poolId) const {
+ if (Y_LIKELY(Executors)) {
+ if (Y_LIKELY(poolId < ExecutorsCount)) {
+ return Executors[poolId]->GetDefaultThreadCount();
+ } else {
+ return {};
+ }
+ } else {
+ return CpuManager.GetThreadsOptional(poolId);
+ }
}
};
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 0bf4b871d7..650b1f39f5 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -128,10 +128,10 @@ namespace NActors {
Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
}
- ui32 GetThreads(ui32 poolId) const {
+ std::optional<ui32> GetThreadsOptional(ui32 poolId) const {
for (const auto& p : Basic) {
if (p.PoolId == poolId) {
- return p.Threads;
+ return p.DefaultThreadCount;
}
}
for (const auto& p : IO) {
@@ -144,7 +144,13 @@ namespace NActors {
return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount;
}
}
- Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
+ return {};
+ }
+
+ ui32 GetThreads(ui32 poolId) const {
+ auto result = GetThreadsOptional(poolId);
+ Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
+ return *result;
}
};
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index f39415c7e2..c7c85e61fd 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -10,6 +10,11 @@ namespace NActors {
struct TWorkerContext;
class ISchedulerCookie;
+ struct TCpuConsumption {
+ double ConsumedUs = 0;
+ double BookedUs = 0;
+ };
+
class IExecutorPool : TNonCopyable {
public:
const ui32 PoolId;
@@ -131,14 +136,9 @@ namespace NActors {
return false;
}
- virtual double GetThreadConsumedUs(i16 threadIdx) {
- Y_UNUSED(threadIdx);
- return 0.0;
- }
-
- virtual double GetThreadBookedUs(i16 threadIdx) {
+ virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
- return 0.0;
+ return TCpuConsumption{0.0, 0.0};
}
};
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 0c984f8fb0..de04105991 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -334,6 +334,8 @@ namespace NActors {
poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000);
poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
+ poolStats.DefaultThreadCount = DefaultThreadCount;
+ poolStats.MaxThreadCount = MaxThreadCount;
if (Harmonizer) {
TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId);
poolStats.IsNeedy = stats.IsNeedy;
@@ -342,6 +344,7 @@ namespace NActors {
poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState;
poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState;
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
+ poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
}
statsCopy.resize(PoolThreads + 1);
@@ -490,24 +493,14 @@ namespace NActors {
return false;
}
- double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) {
+ TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) {
if ((ui32)threadIdx >= PoolThreads) {
- return 0;
+ return {0.0, 0.0};
}
TThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStats(stats);
- return Ts2Us(stats.ElapsedTicks);
- }
-
- double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) {
- if ((ui32)threadIdx >= PoolThreads) {
- return 0;
- }
- TThreadCtx& threadCtx = Threads[threadIdx];
- TExecutorThreadStats stats;
- threadCtx.Thread->GetCurrentStats(stats);
- return stats.CpuNs / 1000.0;
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index cd94a998f1..813f91dc9a 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -153,8 +153,7 @@ namespace NActors {
i16 GetMinThreadCount() const override;
i16 GetMaxThreadCount() const override;
bool IsThreadBeingStopped(i16 threadIdx) const override;
- double GetThreadConsumedUs(i16 threadIdx) override;
- double GetThreadBookedUs(i16 threadIdx) override;
+ TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override;
i16 GetBlockingThreadCount() const override;
i16 GetPriority() const override;
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 6361bc6662..f96f65931a 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined
+ //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
UNIT_ASSERT(stats[0].ParkedTicks > 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index 133e9c5f2a..a7c7399d73 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
//UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined
+ //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
//UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index f318d8909c..e2fd0c5f24 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -121,6 +121,7 @@ struct TPoolInfo {
TAtomic IncreasingThreadsByNeedyState = 0;
TAtomic DecreasingThreadsByStarvedState = 0;
TAtomic DecreasingThreadsByHoggishState = 0;
+ TAtomic PotentialMaxThreadCount = 0;
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
@@ -169,9 +170,10 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
void TPoolInfo::PullStats(ui64 ts) {
for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
TThreadInfo &threadInfo = ThreadInfo[threadIdx];
- threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx));
+ TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
+ threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
- threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx));
+ threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
}
}
@@ -236,7 +238,7 @@ void THarmonizer::PullStats(ui64 ts) {
}
Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
- return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+ return consumed < booked * 0.7;
}
Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
@@ -293,35 +295,43 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
}
double budget = total - Max(booked, lastSecondBooked);
+ i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
if (budget < -0.1) {
isStarvedPresent = true;
}
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt));
+ }
double overbooked = consumed - booked;
if (isStarvedPresent) {
- // last_starved_at_consumed_value = сумма по всем пулам consumed;
- // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
- // использовать вместо total
- if (beingStopped && beingStopped >= overbooked) {
- // do nothing
- } else {
- TStackVec<size_t> reorder;
- for (size_t i = 0; i < Pools.size(); ++i) {
- reorder.push_back(i);
- }
- for (ui16 poolIdx : PriorityOrder) {
- TPoolInfo &pool = Pools[poolIdx];
- i64 threadCount = pool.GetThreadCount();
- if (threadCount > pool.DefaultThreadCount) {
- pool.SetThreadCount(threadCount - 1);
- AtomicIncrement(pool.DecreasingThreadsByStarvedState);
- overbooked--;
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
- if (overbooked < 1) {
- break;
- }
- }
- }
- }
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ TStackVec<size_t> reorder;
+ for (size_t i = 0; i < Pools.size(); ++i) {
+ reorder.push_back(i);
+ }
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ while (threadCount > pool.DefaultThreadCount) {
+ pool.SetThreadCount(threadCount - 1);
+ AtomicIncrement(pool.DecreasingThreadsByStarvedState);
+ overbooked--;
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
} else {
for (size_t needyPoolIdx : needyPools) {
TPoolInfo &pool = Pools[needyPoolIdx];
@@ -422,6 +432,7 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const {
.IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
.DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
.DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
.IsNeedy = static_cast<bool>(flags & 1),
.IsStarved = static_cast<bool>(flags & 2),
.IsHoggish = static_cast<bool>(flags & 4),
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index 61f13e43ac..bc6b938fe8 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -10,6 +10,7 @@ namespace NActors {
ui64 IncreasingThreadsByNeedyState = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ i16 PotentialMaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 38629e2aa1..4c664a964a 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -65,6 +65,9 @@ namespace NActors {
ui64 DecreasingThreadsByHoggishState = 0;
i16 WrongWakenedThreadCount = 0;
i16 CurrentThreadCount = 0;
+ i16 PotentialMaxThreadCount = 0;
+ i16 DefaultThreadCount = 0;
+ i16 MaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
@@ -76,7 +79,8 @@ namespace NActors {
ui64 PreemptedEvents = 0; // Number of events experienced hard preemption
ui64 NonDeliveredEvents = 0;
ui64 EmptyMailboxActivation = 0;
- ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion)
+ ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
+ ui64 SafeElapsedTicks = 0;
ui64 WorstActivationTimeUs = 0;
NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
@@ -120,7 +124,8 @@ namespace NActors {
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
- CpuNs += RelaxedLoad(&other.CpuNs);
+ CpuUs += RelaxedLoad(&other.CpuUs);
+ SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
RelaxedStore(
&WorstActivationTimeUs,
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index 2179771fb6..c3a2947df1 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -137,7 +137,8 @@ namespace NActors {
}
void UpdateThreadTime() {
- RelaxedStore(&WorkerStats.CpuNs, ThreadCPUTime() * 1000);
+ RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks));
+ RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime());
}
#else
void GetCurrentStats(TExecutorThreadStats&) const {}