diff options
author | kruall <kruall@ydb.tech> | 2022-12-08 18:14:12 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-12-08 18:14:12 +0300 |
commit | 88bfc0de583a421fc9582fc016b543cad6b30e84 (patch) | |
tree | fb0bdca7804cead088934831cc9432262f32c354 | |
parent | f82e9035da59f4625a16b5c3c03a9818f5be1398 (diff) | |
download | ydb-88bfc0de583a421fc9582fc016b543cad6b30e84.tar.gz |
Improve AS14,
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/config.h | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/cpu_manager.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 11 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 159 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/probes.h | 24 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor.cpp | 47 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/helpers/selfping_actor_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 34 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 4 |
13 files changed, 247 insertions, 68 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 6cb0f93792..fd635e86c6 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -126,6 +126,10 @@ namespace NActors { return 1; } + virtual i16 GetPriority() const { + return 0; + } + // generic virtual TAffinity* Affinity() const = 0; diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 1d875f4c12..0bf4b871d7 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -44,6 +44,7 @@ namespace NActors { i16 MinThreadCount = 0; i16 MaxThreadCount = 0; i16 DefaultThreadCount = 0; + i16 Priority = 0; }; struct TIOExecutorPoolConfig { @@ -91,11 +92,18 @@ namespace NActors { TBalancerConfig Balancer; }; + struct TSelfPingInfo { + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs; + }; + struct TCpuManagerConfig { TUnitedWorkersConfig UnitedWorkers; TVector<TBasicExecutorPoolConfig> Basic; TVector<TIOExecutorPoolConfig> IO; TVector<TUnitedExecutorPoolConfig> United; + TVector<TSelfPingInfo> PingInfoByPool; ui32 GetExecutorsCount() const { return Basic.size() + IO.size() + United.size(); diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp index d9672272a0..0736caa539 100644 --- a/library/cpp/actors/core/cpu_manager.cpp +++ b/library/cpp/actors/core/cpu_manager.cpp @@ -23,7 +23,11 @@ namespace NActors { for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx].Reset(CreateExecutorPool(excIdx)); - Harmonizer->AddPool(Executors[excIdx].Get()); + if (excIdx < Config.PingInfoByPool.size()) { + Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]); + } else { + Harmonizer->AddPool(Executors[excIdx].Get()); + } } } diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 36e295c231..d4209ef205 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -26,7 +26,8 @@ namespace NActors { ui32 maxActivityType, i16 minThreadCount, i16 maxThreadCount, - i16 defaultThreadCount) + i16 defaultThreadCount, + i16 priority) : TExecutorPoolBase(poolId, threads, affinity, maxActivityType) , SpinThreshold(spinThreshold) , SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles @@ -43,6 +44,7 @@ namespace NActors { , MaxThreadCount(maxThreadCount) , DefaultThreadCount(defaultThreadCount) , Harmonizer(harmonizer) + , Priority(priority) { i16 limit = Min(threads, (ui32)Max<i16>()); if (DefaultThreadCount) { @@ -77,7 +79,8 @@ namespace NActors { cfg.MaxActivityType, cfg.MinThreadCount, cfg.MaxThreadCount, - cfg.DefaultThreadCount + cfg.DefaultThreadCount, + cfg.Priority ) {} @@ -493,4 +496,8 @@ namespace NActors { TSemaphore semaphore = TSemaphore::GetSemaphore(x); return -Min<i16>(semaphore.CurrentSleepThreadCount, 0); } + + i16 TBasicExecutorPool::GetPriority() const { + return Priority; + } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 9185ed18f1..7bdf2349ac 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -80,6 +80,8 @@ namespace NActors { i16 DefaultThreadCount; IHarmonizer *Harmonizer; + const i16 Priority = 0; + public: struct TSemaphore { i64 OldSemaphore = 0; // 34 bits @@ -121,7 +123,8 @@ namespace NActors { ui32 maxActivityType = 1, i16 minThreadCount = 0, i16 maxThreadCount = 0, - i16 defaultThreadCount = 0); + i16 defaultThreadCount = 0, + i16 priority = 0); explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer); ~TBasicExecutorPool(); @@ -154,6 +157,7 @@ namespace NActors { double GetThreadConsumedUs(i16 threadIdx) override; double GetThreadBookedUs(i16 threadIdx) override; i16 GetBlockingThreadCount() const override; + i16 GetPriority() const override; private: void WakeUpLoop(); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 3a0f4109bb..75f3179a27 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -15,24 +15,48 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); +constexpr bool CheckBinaryPower(ui64 value) { + return !(value & (value - 1)); +} + struct TValueHistory { - double History[8] = {0.0}; + static constexpr ui64 HistoryBufferSize = 8; + static_assert(CheckBinaryPower(HistoryBufferSize)); + + double History[HistoryBufferSize] = {0.0}; ui64 HistoryIdx = 0; ui64 LastTs = Max<ui64>(); double LastUs = 0.0; double AccumulatedUs = 0.0; ui64 AccumulatedTs = 0; - double GetAvgPart() { + template <bool WithTail=false> + double GetAvgPartForLastSeconds(ui8 seconds) { double sum = AccumulatedUs; - for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) { - sum += History[idx]; - } - double duration = 1'000'000.0 * (sizeof(History) / sizeof(History[0])) + Ts2Us(AccumulatedTs); + size_t idx = HistoryIdx; + ui8 leftSeconds = seconds; + do { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + if (WithTail || leftSeconds) { + sum += History[idx]; + } else { + ui64 tsInSecond = Us2Ts(1'000'000.0); + sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond; + } + } while (leftSeconds); + double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0); double avg = sum / duration; return avg; } + double GetAvgPart() { + return GetAvgPartForLastSeconds<true>(HistoryBufferSize); + } + void Register(ui64 ts, double valueUs) { if (ts < LastTs) { LastTs = ts; @@ -49,7 +73,7 @@ struct TValueHistory { if (dTs > Us2Ts(8'000'000.0)) { dUs = dUs * 1'000'000.0 / Ts2Us(dTs); - for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) { + for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { History[idx] = dUs; } AccumulatedUs = 0.0; @@ -68,7 +92,7 @@ struct TValueHistory { dTs -= addTs; dUs -= addUs; History[HistoryIdx] = AccumulatedUs + addUs; - HistoryIdx = (HistoryIdx + 1) % (sizeof(History) / sizeof(History[0])); + HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; AccumulatedUs = 0.0; AccumulatedTs = 0; } @@ -87,13 +111,21 @@ struct TPoolInfo { i16 DefaultThreadCount = 0; i16 MinThreadCount = 0; i16 MaxThreadCount = 0; + i16 Priority = 0; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs = 0; + ui64 LastUpdateTs = 0; bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); + double GetlastSecondPoolBooked(i16 threadIdx); double GetConsumed(i16 threadIdx); + double GetlastSecondPoolConsumed(i16 threadIdx); void PullStats(ui64 ts); i16 GetThreadCount(); void SetThreadCount(i16 threadCount); + bool IsAvgPingGood(); }; bool TPoolInfo::IsBeingStopped(i16 threadIdx) { @@ -105,7 +137,13 @@ double TPoolInfo::GetBooked(i16 threadIdx) { return ThreadInfo[threadIdx].Booked.GetAvgPart(); } return 0.0; - //return Pool->GetThreadBooked(threadIdx); +} + +double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + } + return 0.0; } double TPoolInfo::GetConsumed(i16 threadIdx) { @@ -113,7 +151,13 @@ double TPoolInfo::GetConsumed(i16 threadIdx) { return ThreadInfo[threadIdx].Consumed.GetAvgPart(); } return 0.0; - //return Pool->GetThreadConsumed(threadIdx); +} + +double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + } + return 0.0; } #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] @@ -136,22 +180,35 @@ void TPoolInfo::SetThreadCount(i16 threadCount) { Pool->SetThreadCount(threadCount); } +bool TPoolInfo::IsAvgPingGood() { + bool res = true; + if (AvgPingCounter) { + res &= *AvgPingCounter > MaxAvgPingUs; + } + if (AvgPingCounterWithSmallWindow) { + res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; + } + return res; +} + class THarmonizer: public IHarmonizer { private: std::atomic<bool> IsDisabled = false; TSpinLock Lock; std::atomic<ui64> NextHarmonizeTs = 0; std::vector<TPoolInfo> Pools; + std::vector<ui16> PriorityOrder; void PullStats(ui64 ts); - void HarmonizeImpl(); + void HarmonizeImpl(ui64 ts); + void CalculatePriorityOrder(); public: THarmonizer(ui64 ts); virtual ~THarmonizer(); double Rescale(double value) const; void Harmonize(ui64 ts) override; void DeclareEmergency(ui64 ts) override; - void AddPool(IExecutorPool* pool) override; + void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; void Enable(bool enable) override; }; @@ -172,10 +229,19 @@ void THarmonizer::PullStats(ui64 ts) { } } -void THarmonizer::HarmonizeImpl() { +Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { + return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; +} + +Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { + return booked < currentThreadCount - 1; +} + +void THarmonizer::HarmonizeImpl(ui64 ts) { bool isStarvedPresent = false; double booked = 0.0; double consumed = 0.0; + double lastSecondBooked = 0.0; i64 beingStopped = 0; i64 total = 0; TStackVec<size_t, 8> needyPools; @@ -185,32 +251,44 @@ void THarmonizer::HarmonizeImpl() { total += pool.DefaultThreadCount; double poolBooked = 0.0; double poolConsumed = 0.0; + double lastSecondPoolBooked = 0.0; + double lastSecondPoolConsumed = 0.0; beingStopped += pool.Pool->GetBlockingThreadCount(); for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { poolBooked += Rescale(pool.GetBooked(threadIdx)); + lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx)); poolConsumed += Rescale(pool.GetConsumed(threadIdx)); + lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx)); } - bool isStarved = false; - if (Max(consumed, booked) > 0.1 && consumed < booked * 0.7) { + bool isStarved = IsStarved(consumed, booked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); + if (isStarved) { isStarvedPresent = true; - isStarved = true; } ui32 currentThreadCount = pool.GetThreadCount(); - bool isNeedy = false; - if (poolBooked >= currentThreadCount) { + bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount; + if (pool.AvgPingCounter) { + if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { + isNeedy = false; + } else { + pool.LastUpdateTs = ts; + } + } + if (isNeedy) { needyPools.push_back(poolIdx); - isNeedy = true; } - bool isHoggish = false; - if (poolBooked < currentThreadCount - 1) { + bool isHoggish = IsHoggish(poolBooked, currentThreadCount) + || IsHoggish(lastSecondPoolBooked, currentThreadCount); + if (isHoggish) { hoggishPools.push_back(poolIdx); - isHoggish = true; } booked += poolBooked; consumed += poolConsumed; - LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); + } + double budget = total - Max(booked, lastSecondBooked); + if (budget < -0.1) { + isStarvedPresent = true; } - double budget = total - booked; double overbooked = consumed - booked; if (isStarvedPresent) { // last_starved_at_consumed_value = сумма по всем пулам consumed; @@ -223,12 +301,7 @@ void THarmonizer::HarmonizeImpl() { for (size_t i = 0; i < Pools.size(); ++i) { reorder.push_back(i); } - while (!reorder.empty()) { - size_t rndIdx = rand() % reorder.size(); - size_t poolIdx = reorder[rndIdx]; - reorder[rndIdx] = reorder.back(); - reorder.pop_back(); - + for (ui16 poolIdx : PriorityOrder) { TPoolInfo &pool = Pools[poolIdx]; i64 threadCount = pool.GetThreadCount(); if (threadCount > pool.DefaultThreadCount) { @@ -264,6 +337,17 @@ void THarmonizer::HarmonizeImpl() { } } +void THarmonizer::CalculatePriorityOrder() { + PriorityOrder.resize(Pools.size()); + Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); + Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs].Priority != Pools[rhs].Priority) { + return Pools[lhs].Priority < Pools[rhs].Priority; + } + return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId; + }); +} + void THarmonizer::Harmonize(ui64 ts) { if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); @@ -279,8 +363,12 @@ void THarmonizer::Harmonize(ui64 ts) { ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull)); LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs); + if (PriorityOrder.empty()) { + CalculatePriorityOrder(); + } + PullStats(ts); - HarmonizeImpl(); + HarmonizeImpl(ts); Lock.Release(); } @@ -289,7 +377,7 @@ void THarmonizer::DeclareEmergency(ui64 ts) { NextHarmonizeTs = ts; } -void THarmonizer::AddPool(IExecutorPool* pool) { +void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { TGuard<TSpinLock> guard(Lock); TPoolInfo poolInfo; poolInfo.Pool = pool; @@ -297,8 +385,15 @@ void THarmonizer::AddPool(IExecutorPool* pool) { poolInfo.MinThreadCount = pool->GetMinThreadCount(); poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount); + poolInfo.Priority = pool->GetPriority(); pool->SetThreadCount(poolInfo.DefaultThreadCount); + if (pingInfo) { + poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; + poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; + poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; + } Pools.push_back(poolInfo); + PriorityOrder.clear(); }; void THarmonizer::Enable(bool enable) { diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index dae8d7de0c..a171930d9a 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -12,7 +12,7 @@ namespace NActors { virtual ~IHarmonizer() {} virtual void Harmonize(ui64 ts) = 0; virtual void DeclareEmergency(ui64 ts) = 0; - virtual void AddPool(IExecutorPool* pool) = 0; + virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0; virtual void Enable(bool enable) = 0; }; diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index 33ac7b0f5e..11bbf81287 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -169,27 +169,27 @@ PROBE(ThreadCount, GROUPS("BasicThreadPool"), \ TYPES(ui32, TString, ui32, ui32, ui32, ui32), \ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ - PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ - TYPES(ui32, TString, double, double, ui32, ui32, bool, bool, bool), \ - NAMES("poolId", "pool", "booked", "consumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \ - PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \ + PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ + NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \ + PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \ TYPES(ui32, TString, TString, ui32, ui32, ui32), \ NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \ - PROBE(TryToHarmonize, GROUPS("Harmonizer"), \ + PROBE(TryToHarmonize, GROUPS("Harmonizer"), \ TYPES(ui32, TString), \ NAMES("poolId", "pool")) \ - PROBE(SavedValues, GROUPS("Harmonizer"), \ + PROBE(SavedValues, GROUPS("Harmonizer"), \ TYPES(ui32, TString, TString, double, double, double, double, double, double, double, double), \ NAMES("poolId", "pool", "valueName", "[0]", "[1]", "[2]", "[3]", "[4]", "[5]", "[6]", "[7]")) \ - PROBE(RegisterValue, GROUPS("Harmonizer"), \ + PROBE(RegisterValue, GROUPS("Harmonizer"), \ TYPES(ui64, ui64, ui64, ui64, double, double, double), \ NAMES("ts", "lastTs", "dTs", "8sTs", "us", "lastUs", "dUs")) \ - PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \ - TYPES(ui64, ui64, bool, bool), \ - NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \ + PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, bool, bool), \ + NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \ PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \ - TYPES(ui64, ui64, ui64), \ - NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \ + TYPES(ui64, ui64, ui64), \ + NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \ /**/ LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER) diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index f9bfaf8dc0..dc383f8c4c 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -61,10 +61,14 @@ struct TAvgOperation { class TSelfPingActor : public TActorBootstrapped<TSelfPingActor> { private: const TDuration SendInterval; - const NMonitoring::TDynamicCounters::TCounterPtr Counter; + const NMonitoring::TDynamicCounters::TCounterPtr MaxPingCounter; + const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter; - NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow; + NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> MaxPingSlidingWindow; + NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSlidingWindow; + NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSmallSlidingWindow; NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow; THPTimer Timer; @@ -74,12 +78,19 @@ public: return SELF_PING_ACTOR; } - TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, + TSelfPingActor(TDuration sendInterval, + const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) : SendInterval(sendInterval) - , Counter(counter) + , MaxPingCounter(maxPingCounter) + , AvgPingCounter(avgPingCounter) + , AvgPingCounterWithSmallWindow(avgPingSmallWindowCounter) , CalculationTimeCounter(calculationTimeCounter) - , SlidingWindow(TDuration::Seconds(15), 100) + , MaxPingSlidingWindow(TDuration::Seconds(15), 100) + , AvgPingSlidingWindow(TDuration::Seconds(15), 100) + , AvgPingSmallSlidingWindow(TDuration::Seconds(1), 100) , CalculationSlidingWindow(TDuration::Seconds(15), 100) { } @@ -154,11 +165,23 @@ public: const double passedTime = hpNow - e.TimeStart; const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0; - *Counter = SlidingWindow.Update(delayUs, now); + if (MaxPingCounter) { + *MaxPingCounter = MaxPingSlidingWindow.Update(delayUs, now); + } + if (AvgPingCounter) { + auto res = AvgPingSlidingWindow.Update({1, delayUs}, now); + *AvgPingCounter = double(res.Sum) / double(res.Count + 1); + } + if (AvgPingCounterWithSmallWindow) { + auto res = AvgPingSmallSlidingWindow.Update({1, delayUs}, now); + *AvgPingCounterWithSmallWindow = double(res.Sum) / double(res.Count + 1); + } - ui64 d = MeasureTaskDurationNs(); - auto res = CalculationSlidingWindow.Update({1, d}, now); - *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); + if (CalculationTimeCounter) { + ui64 d = MeasureTaskDurationNs(); + auto res = CalculationSlidingWindow.Update({1, d}, now); + *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); + } SchedulePing(ctx, hpNow); } @@ -174,10 +197,12 @@ private: IActor* CreateSelfPingActor( TDuration sendInterval, - const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) { - return new TSelfPingActor(sendInterval, counter, calculationTimeCounter); + return new TSelfPingActor(sendInterval, maxPingCounter, avgPingCounter, avgPingSmallWindowCounter, calculationTimeCounter); } } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h index d7d07f9fa8..a976a4f425 100644 --- a/library/cpp/actors/helpers/selfping_actor.h +++ b/library/cpp/actors/helpers/selfping_actor.h @@ -7,7 +7,9 @@ namespace NActors { NActors::IActor* CreateSelfPingActor( TDuration sendInterval, - const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter); } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index 459635fa24..542f817755 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -22,13 +22,17 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr()); NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr()); + NMonitoring::TDynamicCounters::TCounterPtr counter3(new NMonitoring::TCounterForPtr()); + NMonitoring::TDynamicCounters::TCounterPtr counter4(new NMonitoring::TCounterForPtr()); auto actor = CreateSelfPingActor( TDuration::MilliSeconds(100), // sendInterval (unused in test) - counter, counter2); + counter, counter2, counter3, counter4); UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0); UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counter3->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counter4->Val(), 0); const TActorId actorId = runtime->Register(actor); Y_UNUSED(actorId); diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index e037c7a673..e4c1025bf5 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -236,19 +236,35 @@ static TCpuMask ParseAffinity(const TConfig& cfg) { return result; } +TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) { + return systemConfig.HasSelfPingInterval() + ? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval()) + : TDuration::MilliSeconds(10); +} + void AddExecutorPool( TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, ui32 maxActivityType, - ui32& unitedThreads) + ui32& unitedThreads, + const NKikimr::TAppData* appData) { + const auto counters = GetServiceCounters(appData->Counters, "utils"); switch (poolConfig.GetType()) { case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: { TBasicExecutorPoolConfig basic; basic.PoolId = poolId; basic.PoolName = poolConfig.GetName(); + if (poolConfig.HasMaxAvgPingDeviation()) { + auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName); + auto &poolInfo = cpuManager.PingInfoByPool[poolId]; + poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false); + poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); + TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation()); + poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds(); + } basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads()); basic.SpinThreshold = poolConfig.GetSpinThreshold(); basic.Affinity = ParseAffinity(poolConfig.GetAffinity()); @@ -268,6 +284,7 @@ void AddExecutorPool( basic.MinThreadCount = poolConfig.GetMinThreads(); basic.MaxThreadCount = poolConfig.GetMaxThreads(); basic.DefaultThreadCount = poolConfig.GetThreads(); + basic.Priority = poolConfig.GetPriority(); cpuManager.Basic.emplace_back(std::move(basic)); break; } @@ -353,11 +370,14 @@ static TUnitedWorkersConfig CreateUnitedWorkersConfig(const NKikimrConfig::TActo return result; } -static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, ui32 maxActivityType) { +static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, ui32 maxActivityType, + const NKikimr::TAppData* appData) +{ TCpuManagerConfig cpuManager; ui32 unitedThreads = 0; + cpuManager.PingInfoByPool.resize(config.GetExecutor().size()); for (int poolId = 0; poolId < config.GetExecutor().size(); poolId++) { - AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, maxActivityType, unitedThreads); + AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, maxActivityType, unitedThreads, appData); } cpuManager.UnitedWorkers = CreateUnitedWorkersConfig(config.GetUnitedWorkers(), unitedThreads); return cpuManager; @@ -553,7 +573,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s setup->NodeId = NodeId; setup->MaxActivityType = GetActivityTypeCount(); - setup->CpuManager = CreateCpuManagerConfig(systemConfig, setup->MaxActivityType); + setup->CpuManager = CreateCpuManagerConfig(systemConfig, setup->MaxActivityType, appData); for (ui32 poolId = 0; poolId != setup->GetExecutorsCount(); ++poolId) { const auto &execConfig = systemConfig.GetExecutor(poolId); if (execConfig.HasInjectMadSquirrels()) { @@ -1743,9 +1763,11 @@ void TSelfPingInitializer::InitializeServices( for (size_t poolId = 0; poolId < setup->GetExecutorsCount(); ++poolId) { const auto& poolName = setup->GetPoolName(poolId); auto poolGroup = counters->GetSubgroup("execpool", poolName); - auto counter = poolGroup->GetCounter("SelfPingMaxUs", false); + auto maxPingCounter = poolGroup->GetCounter("SelfPingMaxUs", false); + auto avgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false); + auto avgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false); auto cpuTimeCounter = poolGroup->GetCounter("CpuMatBenchNs", false); - IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, counter, cpuTimeCounter); + IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, maxPingCounter, avgPingCounter, avgPingCounterWithSmallWindow, cpuTimeCounter); setup->LocalServices.push_back(std::make_pair(TActorId(), TActorSetupCmd(selfPingActor, TMailboxType::HTSwap, diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3bef73182b..98c8f61b6b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -66,6 +66,10 @@ message TActorSystemConfig { optional uint32 MaxThreads = 13; // Higher balancing bound, should be not lower than `Threads` optional uint32 BalancingPriority = 14; // Priority of pool to obtain cpu due to balancing (higher is better) optional uint64 ToleratedLatencyUs = 15; // p100-latency threshold indicating that more cpus are required by pool + + // Actorsystem 1.4 + optional int32 Priority = 16; + optional int32 MaxAvgPingDeviation = 17; } message TScheduler { |