aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-06-19 13:29:25 +0300
committerkruall <kruall@ydb.tech>2023-06-19 13:29:25 +0300
commit82d3095647b506337bd3cfdf275a09889854c1bf (patch)
treea5f8a61bbb79bdf1c485dfcae49f10d9403e76c2 /library/cpp
parent3dfef4338422fb797915ee5c172b8cbf6b4dd335 (diff)
downloadydb-82d3095647b506337bd3cfdf275a09889854c1bf.tar.gz
Add exchangin threads when cpu usage around limit,
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp2
-rw-r--r--library/cpp/actors/core/harmonizer.cpp81
-rw-r--r--library/cpp/actors/core/harmonizer.h2
-rw-r--r--library/cpp/actors/core/mon_stats.h2
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h6
5 files changed, 85 insertions, 8 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 8e103c3911b..ae106564c00 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -342,8 +342,10 @@ namespace NActors {
poolStats.IsStarved = stats.IsStarved;
poolStats.IsHoggish = stats.IsHoggish;
poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState;
+ poolStats.IncreasingThreadsByExchange = stats.IncreasingThreadsByExchange;
poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState;
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
+ poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index e729ffd08d9..7842a471262 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -176,8 +176,10 @@ struct TPoolInfo {
TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
TAtomic IncreasingThreadsByNeedyState = 0;
+ TAtomic IncreasingThreadsByExchange = 0;
TAtomic DecreasingThreadsByStarvedState = 0;
TAtomic DecreasingThreadsByHoggishState = 0;
+ TAtomic DecreasingThreadsByExchange = 0;
TAtomic PotentialMaxThreadCount = 0;
TValueHistory<16> Consumed;
@@ -347,9 +349,17 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
i64 total = 0;
TStackVec<size_t, 8> needyPools;
TStackVec<size_t, 8> hoggishPools;
+ TStackVec<bool, 8> isNeedyByPool;
+
+ size_t sumOfAdditionalThreads = 0;
+
for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
TPoolInfo& pool = Pools[poolIdx];
total += pool.DefaultThreadCount;
+
+ ui32 currentThreadCount = pool.GetThreadCount();
+ sumOfAdditionalThreads += currentThreadCount - pool.DefaultThreadCount;
+
double poolBooked = 0.0;
double poolConsumed = 0.0;
double lastSecondPoolBooked = 0.0;
@@ -365,7 +375,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
if (isStarved) {
isStarvedPresent = true;
}
- ui32 currentThreadCount = pool.GetThreadCount();
+
bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount;
if (pool.AvgPingCounter) {
if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
@@ -374,6 +384,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
pool.LastUpdateTs = ts;
}
}
+ isNeedyByPool.push_back(isNeedy);
if (isNeedy) {
needyPools.push_back(poolIdx);
}
@@ -400,6 +411,16 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
if (overbooked < 0) {
isStarvedPresent = false;
}
+
+ if (needyPools.size()) {
+ Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs].Priority != Pools[rhs].Priority) {
+ return Pools[lhs].Priority > Pools[rhs].Priority;
+ }
+ return Pools[lhs].Pool->PoolId < Pools[rhs].Pool->PoolId;
+ });
+ }
+
if (isStarvedPresent) {
// last_starved_at_consumed_value = сумма по всем пулам consumed;
// TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
@@ -407,10 +428,6 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
if (beingStopped && beingStopped >= overbooked) {
// do nothing
} else {
- TStackVec<size_t> reorder;
- for (size_t i = 0; i < Pools.size(); ++i) {
- reorder.push_back(i);
- }
for (ui16 poolIdx : PriorityOrder) {
TPoolInfo &pool = Pools[poolIdx];
i64 threadCount = pool.GetThreadCount();
@@ -418,7 +435,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
pool.SetThreadCount(--threadCount);
AtomicIncrement(pool.DecreasingThreadsByStarvedState);
overbooked--;
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ sumOfAdditionalThreads--;
+
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
if (overbooked < 1) {
break;
}
@@ -435,19 +454,63 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
i64 threadCount = pool.GetThreadCount();
if (threadCount + 1 <= pool.MaxThreadCount) {
AtomicIncrement(pool.IncreasingThreadsByNeedyState);
+ isNeedyByPool[needyPoolIdx] = false;
+ sumOfAdditionalThreads++;
pool.SetThreadCount(threadCount + 1);
budget -= 1.0;
- LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
}
}
}
}
+
+ if (budget < 1.0) {
+ size_t takingAwayThreads = 0;
+ for (size_t needyPoolIdx : needyPools) {
+ TPoolInfo &pool = Pools[needyPoolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ sumOfAdditionalThreads -= threadCount - pool.DefaultThreadCount;
+ if (sumOfAdditionalThreads < takingAwayThreads + 1) {
+ break;
+ }
+ if (!isNeedyByPool[needyPoolIdx]) {
+ continue;
+ }
+ AtomicIncrement(pool.IncreasingThreadsByExchange);
+ isNeedyByPool[needyPoolIdx] = false;
+ takingAwayThreads++;
+ pool.SetThreadCount(threadCount + 1);
+
+ LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ }
+
+ for (ui16 poolIdx : PriorityOrder) {
+ if (takingAwayThreads <= 0) {
+ break;
+ }
+
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultThreadCount);
+ size_t currentTakingAwayThreads = Min(additionalThreadsCount, currentTakingAwayThreads);
+
+ if (!currentTakingAwayThreads) {
+ continue;
+ }
+ takingAwayThreads -= currentTakingAwayThreads;
+ pool.SetThreadCount(threadCount - currentTakingAwayThreads);
+
+ AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads);
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultThreadCount, pool.MaxThreadCount);
+ }
+ }
+
for (size_t hoggishPoolIdx : hoggishPools) {
TPoolInfo &pool = Pools[hoggishPoolIdx];
i64 threadCount = pool.GetThreadCount();
if (threadCount > pool.MinThreadCount) {
AtomicIncrement(pool.DecreasingThreadsByHoggishState);
- LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
pool.SetThreadCount(threadCount - 1);
}
}
@@ -526,8 +589,10 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
ui64 flags = RelaxedLoad(&pool.LastFlags);
return TPoolHarmonizerStats{
.IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
+ .IncreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByExchange)),
.DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
.DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .DecreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByExchange)),
.MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxConsumedCpu)),
.MinConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MinConsumedCpu)),
.MaxBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxBookedCpu)),
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index 7c66ff54c67..04eb616b152 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -8,8 +8,10 @@ namespace NActors {
struct TPoolHarmonizerStats {
ui64 IncreasingThreadsByNeedyState = 0;
+ ui64 IncreasingThreadsByExchange = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ ui64 DecreasingThreadsByExchange = 0;
i64 MaxConsumedCpu = 0.0;
i64 MinConsumedCpu = 0.0;
i64 MaxBookedCpu = 0.0;
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 40e5f651e91..709dce99265 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -61,8 +61,10 @@ namespace NActors {
struct TExecutorPoolStats {
ui64 MaxUtilizationTime = 0;
ui64 IncreasingThreadsByNeedyState = 0;
+ ui64 IncreasingThreadsByExchange = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ ui64 DecreasingThreadsByExchange = 0;
i64 MaxConsumedCpuUs = 0;
i64 MinConsumedCpuUs = 0;
i64 MaxBookedCpuUs = 0;
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 16a64581efa..a122f7f2f0b 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -139,8 +139,10 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
+ NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByExchange;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
+ NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange;
NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
@@ -199,8 +201,10 @@ private:
IsStarved = PoolGroup->GetCounter("IsStarved", false);
IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
+ IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true);
DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
+ DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false);
MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false);
@@ -251,8 +255,10 @@ private:
*IsStarved = poolStats.IsStarved;
*IsHoggish = poolStats.IsHoggish;
*IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
+ *IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange;
*DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
*DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
+ *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange;
*NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);