diff options
author | kruall <kruall@ydb.tech> | 2023-06-19 13:29:25 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-06-19 13:29:25 +0300 |
commit | 82d3095647b506337bd3cfdf275a09889854c1bf (patch) | |
tree | a5f8a61bbb79bdf1c485dfcae49f10d9403e76c2 /library/cpp | |
parent | 3dfef4338422fb797915ee5c172b8cbf6b4dd335 (diff) | |
download | ydb-82d3095647b506337bd3cfdf275a09889854c1bf.tar.gz |
Add exchangin threads when cpu usage around limit,
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 81 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 6 |
5 files changed, 85 insertions, 8 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 8e103c3911b..ae106564c00 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -342,8 +342,10 @@ namespace NActors { poolStats.IsStarved = stats.IsStarved; poolStats.IsHoggish = stats.IsHoggish; poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState; + poolStats.IncreasingThreadsByExchange = stats.IncreasingThreadsByExchange; poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; + poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index e729ffd08d9..7842a471262 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -176,8 +176,10 @@ struct TPoolInfo { TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish TAtomic IncreasingThreadsByNeedyState = 0; + TAtomic IncreasingThreadsByExchange = 0; TAtomic DecreasingThreadsByStarvedState = 0; TAtomic DecreasingThreadsByHoggishState = 0; + TAtomic DecreasingThreadsByExchange = 0; TAtomic PotentialMaxThreadCount = 0; TValueHistory<16> Consumed; @@ -347,9 +349,17 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { i64 total = 0; TStackVec<size_t, 8> needyPools; TStackVec<size_t, 8> hoggishPools; + TStackVec<bool, 8> isNeedyByPool; + + size_t sumOfAdditionalThreads = 0; + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { TPoolInfo& pool = Pools[poolIdx]; total += pool.DefaultThreadCount; + + ui32 currentThreadCount = pool.GetThreadCount(); + sumOfAdditionalThreads += currentThreadCount - pool.DefaultThreadCount; + double poolBooked = 0.0; double poolConsumed = 0.0; double lastSecondPoolBooked = 0.0; @@ -365,7 +375,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (isStarved) { isStarvedPresent = true; } - ui32 currentThreadCount = pool.GetThreadCount(); + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount; if (pool.AvgPingCounter) { if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { @@ -374,6 +384,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { pool.LastUpdateTs = ts; } } + isNeedyByPool.push_back(isNeedy); if (isNeedy) { needyPools.push_back(poolIdx); } @@ -400,6 +411,16 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (overbooked < 0) { isStarvedPresent = false; } + + if (needyPools.size()) { + Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs].Priority != Pools[rhs].Priority) { + return Pools[lhs].Priority > Pools[rhs].Priority; + } + return Pools[lhs].Pool->PoolId < Pools[rhs].Pool->PoolId; + }); + } + if (isStarvedPresent) { // last_starved_at_consumed_value = сумма по всем пулам consumed; // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, @@ -407,10 +428,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (beingStopped && beingStopped >= overbooked) { // do nothing } else { - TStackVec<size_t> reorder; - for (size_t i = 0; i < Pools.size(); ++i) { - reorder.push_back(i); - } for (ui16 poolIdx : PriorityOrder) { TPoolInfo &pool = Pools[poolIdx]; i64 threadCount = pool.GetThreadCount(); @@ -418,7 +435,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { pool.SetThreadCount(--threadCount); AtomicIncrement(pool.DecreasingThreadsByStarvedState); overbooked--; - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + sumOfAdditionalThreads--; + + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); if (overbooked < 1) { break; } @@ -435,19 +454,63 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { i64 threadCount = pool.GetThreadCount(); if (threadCount + 1 <= pool.MaxThreadCount) { AtomicIncrement(pool.IncreasingThreadsByNeedyState); + isNeedyByPool[needyPoolIdx] = false; + sumOfAdditionalThreads++; pool.SetThreadCount(threadCount + 1); budget -= 1.0; - LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount); + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount); } } } } + + if (budget < 1.0) { + size_t takingAwayThreads = 0; + for (size_t needyPoolIdx : needyPools) { + TPoolInfo &pool = Pools[needyPoolIdx]; + i64 threadCount = pool.GetThreadCount(); + sumOfAdditionalThreads -= threadCount - pool.DefaultThreadCount; + if (sumOfAdditionalThreads < takingAwayThreads + 1) { + break; + } + if (!isNeedyByPool[needyPoolIdx]) { + continue; + } + AtomicIncrement(pool.IncreasingThreadsByExchange); + isNeedyByPool[needyPoolIdx] = false; + takingAwayThreads++; + pool.SetThreadCount(threadCount + 1); + + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount); + } + + for (ui16 poolIdx : PriorityOrder) { + if (takingAwayThreads <= 0) { + break; + } + + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultThreadCount); + size_t currentTakingAwayThreads = Min(additionalThreadsCount, currentTakingAwayThreads); + + if (!currentTakingAwayThreads) { + continue; + } + takingAwayThreads -= currentTakingAwayThreads; + pool.SetThreadCount(threadCount - currentTakingAwayThreads); + + AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads); + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultThreadCount, pool.MaxThreadCount); + } + } + for (size_t hoggishPoolIdx : hoggishPools) { TPoolInfo &pool = Pools[hoggishPoolIdx]; i64 threadCount = pool.GetThreadCount(); if (threadCount > pool.MinThreadCount) { AtomicIncrement(pool.DecreasingThreadsByHoggishState); - LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); pool.SetThreadCount(threadCount - 1); } } @@ -526,8 +589,10 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { ui64 flags = RelaxedLoad(&pool.LastFlags); return TPoolHarmonizerStats{ .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), + .IncreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByExchange)), .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .DecreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByExchange)), .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxConsumedCpu)), .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MinConsumedCpu)), .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxBookedCpu)), diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index 7c66ff54c67..04eb616b152 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -8,8 +8,10 @@ namespace NActors { struct TPoolHarmonizerStats { ui64 IncreasingThreadsByNeedyState = 0; + ui64 IncreasingThreadsByExchange = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + ui64 DecreasingThreadsByExchange = 0; i64 MaxConsumedCpu = 0.0; i64 MinConsumedCpu = 0.0; i64 MaxBookedCpu = 0.0; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 40e5f651e91..709dce99265 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -61,8 +61,10 @@ namespace NActors { struct TExecutorPoolStats { ui64 MaxUtilizationTime = 0; ui64 IncreasingThreadsByNeedyState = 0; + ui64 IncreasingThreadsByExchange = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + ui64 DecreasingThreadsByExchange = 0; i64 MaxConsumedCpuUs = 0; i64 MinConsumedCpuUs = 0; i64 MaxBookedCpuUs = 0; diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 16a64581efa..a122f7f2f0b 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -139,8 +139,10 @@ private: NMonitoring::TDynamicCounters::TCounterPtr IsStarved; NMonitoring::TDynamicCounters::TCounterPtr IsHoggish; NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState; + NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByExchange; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; + NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange; NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; @@ -199,8 +201,10 @@ private: IsStarved = PoolGroup->GetCounter("IsStarved", false); IsHoggish = PoolGroup->GetCounter("IsHoggish", false); IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true); + IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true); DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true); DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); + DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true); NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); @@ -251,8 +255,10 @@ private: *IsStarved = poolStats.IsStarved; *IsHoggish = poolStats.IsHoggish; *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState; + *IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange; *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState; *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState; + *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange; *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions; LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); |