aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-03-14 21:12:48 +0300
committerkruall <kruall@ydb.tech>2023-03-14 21:12:48 +0300
commitb8e6cf3ea513ef24dadcfab960f0f0adc1ff4caf (patch)
treece7204f05842fecafa0e381ad3b9cb2d35dd188d /library/cpp
parentf11568d26a8fadd2cf7f2460f8ee6e148391b15c (diff)
downloadydb-b8e6cf3ea513ef24dadcfab960f0f0adc1ff4caf.tar.gz
Fix overbooking reaction,
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/harmonizer.cpp51
1 files changed, 27 insertions, 24 deletions
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index 1b547a9ad6..e2fd0c5f24 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -305,30 +305,33 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
}
double overbooked = consumed - booked;
if (isStarvedPresent) {
- // last_starved_at_consumed_value = сумма по всем пулам consumed;
- // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
- // использовать вместо total
- if (beingStopped && beingStopped >= overbooked) {
- // do nothing
- } else {
- TStackVec<size_t> reorder;
- for (size_t i = 0; i < Pools.size(); ++i) {
- reorder.push_back(i);
- }
- for (ui16 poolIdx : PriorityOrder) {
- TPoolInfo &pool = Pools[poolIdx];
- i64 threadCount = pool.GetThreadCount();
- if (threadCount > pool.DefaultThreadCount) {
- pool.SetThreadCount(threadCount - 1);
- AtomicIncrement(pool.DecreasingThreadsByStarvedState);
- overbooked--;
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
- if (overbooked < 1) {
- break;
- }
- }
- }
- }
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ TStackVec<size_t> reorder;
+ for (size_t i = 0; i < Pools.size(); ++i) {
+ reorder.push_back(i);
+ }
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ while (threadCount > pool.DefaultThreadCount) {
+ pool.SetThreadCount(threadCount - 1);
+ AtomicIncrement(pool.DecreasingThreadsByStarvedState);
+ overbooked--;
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
} else {
for (size_t needyPoolIdx : needyPools) {
TPoolInfo &pool = Pools[needyPoolIdx];