diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors/core | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors/core')
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 16 | ||||
-rw-r--r-- | library/cpp/actors/core/config.h | 12 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool.h | 14 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 19 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 65 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 9 | ||||
-rw-r--r-- | library/cpp/actors/core/worker_context.h | 3 |
11 files changed, 88 insertions, 58 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 8051f5ee57..cd2cfda1bb 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -122,7 +122,21 @@ namespace NActors { } ui32 GetThreads(ui32 poolId) const { - return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId); + auto result = GetThreadsOptional(poolId); + Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId); + return *result; + } + + std::optional<ui32> GetThreadsOptional(const ui32 poolId) const { + if (Y_LIKELY(Executors)) { + if (Y_LIKELY(poolId < ExecutorsCount)) { + return Executors[poolId]->GetDefaultThreadCount(); + } else { + return {}; + } + } else { + return CpuManager.GetThreadsOptional(poolId); + } } }; diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 0bf4b871d7..650b1f39f5 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -128,10 +128,10 @@ namespace NActors { Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); } - ui32 GetThreads(ui32 poolId) const { + std::optional<ui32> GetThreadsOptional(ui32 poolId) const { for (const auto& p : Basic) { if (p.PoolId == poolId) { - return p.Threads; + return p.DefaultThreadCount; } } for (const auto& p : IO) { @@ -144,7 +144,13 @@ namespace NActors { return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount; } } - Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); + return {}; + } + + ui32 GetThreads(ui32 poolId) const { + auto result = GetThreadsOptional(poolId); + Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId); + return *result; } }; diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index f39415c7e2..c7c85e61fd 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -10,6 +10,11 @@ namespace NActors { struct TWorkerContext; class ISchedulerCookie; + struct TCpuConsumption { + double ConsumedUs = 0; + double BookedUs = 0; + }; + class IExecutorPool : TNonCopyable { public: const ui32 PoolId; @@ -131,14 +136,9 @@ namespace NActors { return false; } - virtual double GetThreadConsumedUs(i16 threadIdx) { - Y_UNUSED(threadIdx); - return 0.0; - } - - virtual double GetThreadBookedUs(i16 threadIdx) { + virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { Y_UNUSED(threadIdx); - return 0.0; + return TCpuConsumption{0.0, 0.0}; } }; diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 0c984f8fb0..de04105991 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -334,6 +334,8 @@ namespace NActors { poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000); poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); + poolStats.DefaultThreadCount = DefaultThreadCount; + poolStats.MaxThreadCount = MaxThreadCount; if (Harmonizer) { TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId); poolStats.IsNeedy = stats.IsNeedy; @@ -342,6 +344,7 @@ namespace NActors { poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState; poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; + poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; } statsCopy.resize(PoolThreads + 1); @@ -490,24 +493,14 @@ namespace NActors { return false; } - double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) { + TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) { if ((ui32)threadIdx >= PoolThreads) { - return 0; + return {0.0, 0.0}; } TThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStats(stats); - return Ts2Us(stats.ElapsedTicks); - } - - double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) { - if ((ui32)threadIdx >= PoolThreads) { - return 0; - } - TThreadCtx& threadCtx = Threads[threadIdx]; - TExecutorThreadStats stats; - threadCtx.Thread->GetCurrentStats(stats); - return stats.CpuNs / 1000.0; + return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index cd94a998f1..813f91dc9a 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -153,8 +153,7 @@ namespace NActors { i16 GetMinThreadCount() const override; i16 GetMaxThreadCount() const override; bool IsThreadBeingStopped(i16 threadIdx) const override; - double GetThreadConsumedUs(i16 threadIdx) override; - double GetThreadBookedUs(i16 threadIdx) override; + TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override; i16 GetBlockingThreadCount() const override; i16 GetPriority() const override; diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 6361bc6662..f96f65931a 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined + //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined UNIT_ASSERT(stats[0].ElapsedTicks > 0); UNIT_ASSERT(stats[0].ParkedTicks > 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index 133e9c5f2a..a7c7399d73 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { //UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined + //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined UNIT_ASSERT(stats[0].ElapsedTicks > 0); //UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index f318d8909c..e2fd0c5f24 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -121,6 +121,7 @@ struct TPoolInfo { TAtomic IncreasingThreadsByNeedyState = 0; TAtomic DecreasingThreadsByStarvedState = 0; TAtomic DecreasingThreadsByHoggishState = 0; + TAtomic PotentialMaxThreadCount = 0; bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); @@ -169,9 +170,10 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { void TPoolInfo::PullStats(ui64 ts) { for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { TThreadInfo &threadInfo = ThreadInfo[threadIdx]; - threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx)); + TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); - threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx)); + threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); } } @@ -236,7 +238,7 @@ void THarmonizer::PullStats(ui64 ts) { } Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { - return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; + return consumed < booked * 0.7; } Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { @@ -293,35 +295,43 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); } double budget = total - Max(booked, lastSecondBooked); + i16 budgetInt = static_cast<i16>(Max(budget, 0.0)); if (budget < -0.1) { isStarvedPresent = true; } + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt)); + } double overbooked = consumed - booked; if (isStarvedPresent) { - // last_starved_at_consumed_value = сумма по всем пулам consumed; - // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, - // использовать вместо total - if (beingStopped && beingStopped >= overbooked) { - // do nothing - } else { - TStackVec<size_t> reorder; - for (size_t i = 0; i < Pools.size(); ++i) { - reorder.push_back(i); - } - for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = Pools[poolIdx]; - i64 threadCount = pool.GetThreadCount(); - if (threadCount > pool.DefaultThreadCount) { - pool.SetThreadCount(threadCount - 1); - AtomicIncrement(pool.DecreasingThreadsByStarvedState); - overbooked--; - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); - if (overbooked < 1) { - break; - } - } - } - } + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + TStackVec<size_t> reorder; + for (size_t i = 0; i < Pools.size(); ++i) { + reorder.push_back(i); + } + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + while (threadCount > pool.DefaultThreadCount) { + pool.SetThreadCount(threadCount - 1); + AtomicIncrement(pool.DecreasingThreadsByStarvedState); + overbooked--; + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + if (overbooked < 1) { + break; + } + } + if (overbooked < 1) { + break; + } + } + } } else { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = Pools[needyPoolIdx]; @@ -422,6 +432,7 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast<bool>(flags & 1), .IsStarved = static_cast<bool>(flags & 2), .IsHoggish = static_cast<bool>(flags & 4), diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index 61f13e43ac..bc6b938fe8 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -10,6 +10,7 @@ namespace NActors { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 38629e2aa1..4c664a964a 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -65,6 +65,9 @@ namespace NActors { ui64 DecreasingThreadsByHoggishState = 0; i16 WrongWakenedThreadCount = 0; i16 CurrentThreadCount = 0; + i16 PotentialMaxThreadCount = 0; + i16 DefaultThreadCount = 0; + i16 MaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; @@ -76,7 +79,8 @@ namespace NActors { ui64 PreemptedEvents = 0; // Number of events experienced hard preemption ui64 NonDeliveredEvents = 0; ui64 EmptyMailboxActivation = 0; - ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion) + ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion) + ui64 SafeElapsedTicks = 0; ui64 WorstActivationTimeUs = 0; NHPTimer::STime ElapsedTicks = 0; NHPTimer::STime ParkedTicks = 0; @@ -120,7 +124,8 @@ namespace NActors { PreemptedEvents += RelaxedLoad(&other.PreemptedEvents); NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); - CpuNs += RelaxedLoad(&other.CpuNs); + CpuUs += RelaxedLoad(&other.CpuUs); + SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks); RelaxedStore( &WorstActivationTimeUs, std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index 2179771fb6..c3a2947df1 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -137,7 +137,8 @@ namespace NActors { } void UpdateThreadTime() { - RelaxedStore(&WorkerStats.CpuNs, ThreadCPUTime() * 1000); + RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks)); + RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime()); } #else void GetCurrentStats(TExecutorThreadStats&) const {} |