diff options
author | kruall <kruall@ydb.tech> | 2023-03-14 21:12:48 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-03-14 21:12:48 +0300 |
commit | b8e6cf3ea513ef24dadcfab960f0f0adc1ff4caf (patch) | |
tree | ce7204f05842fecafa0e381ad3b9cb2d35dd188d /library/cpp | |
parent | f11568d26a8fadd2cf7f2460f8ee6e148391b15c (diff) | |
download | ydb-b8e6cf3ea513ef24dadcfab960f0f0adc1ff4caf.tar.gz |
Fix overbooking reaction,
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 51 |
1 files changed, 27 insertions, 24 deletions
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 1b547a9ad6..e2fd0c5f24 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -305,30 +305,33 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } double overbooked = consumed - booked; if (isStarvedPresent) { - // last_starved_at_consumed_value = сумма по всем пулам consumed; - // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, - // использовать вместо total - if (beingStopped && beingStopped >= overbooked) { - // do nothing - } else { - TStackVec<size_t> reorder; - for (size_t i = 0; i < Pools.size(); ++i) { - reorder.push_back(i); - } - for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = Pools[poolIdx]; - i64 threadCount = pool.GetThreadCount(); - if (threadCount > pool.DefaultThreadCount) { - pool.SetThreadCount(threadCount - 1); - AtomicIncrement(pool.DecreasingThreadsByStarvedState); - overbooked--; - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); - if (overbooked < 1) { - break; - } - } - } - } + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + TStackVec<size_t> reorder; + for (size_t i = 0; i < Pools.size(); ++i) { + reorder.push_back(i); + } + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + while (threadCount > pool.DefaultThreadCount) { + pool.SetThreadCount(threadCount - 1); + AtomicIncrement(pool.DecreasingThreadsByStarvedState); + overbooked--; + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + if (overbooked < 1) { + break; + } + } + if (overbooked < 1) { + break; + } + } + } } else { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = Pools[needyPoolIdx]; |