aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-12-08 18:14:12 +0300
committerkruall <kruall@ydb.tech>2022-12-08 18:14:12 +0300
commit88bfc0de583a421fc9582fc016b543cad6b30e84 (patch)
treefb0bdca7804cead088934831cc9432262f32c354
parentf82e9035da59f4625a16b5c3c03a9818f5be1398 (diff)
downloadydb-88bfc0de583a421fc9582fc016b543cad6b30e84.tar.gz
Improve AS14,
-rw-r--r--library/cpp/actors/core/actorsystem.h4
-rw-r--r--library/cpp/actors/core/config.h8
-rw-r--r--library/cpp/actors/core/cpu_manager.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp11
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h6
-rw-r--r--library/cpp/actors/core/harmonizer.cpp159
-rw-r--r--library/cpp/actors/core/harmonizer.h2
-rw-r--r--library/cpp/actors/core/probes.h24
-rw-r--r--library/cpp/actors/helpers/selfping_actor.cpp47
-rw-r--r--library/cpp/actors/helpers/selfping_actor.h4
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp6
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp34
-rw-r--r--ydb/core/protos/config.proto4
13 files changed, 247 insertions, 68 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 6cb0f93792..fd635e86c6 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -126,6 +126,10 @@ namespace NActors {
return 1;
}
+ virtual i16 GetPriority() const {
+ return 0;
+ }
+
// generic
virtual TAffinity* Affinity() const = 0;
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 1d875f4c12..0bf4b871d7 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -44,6 +44,7 @@ namespace NActors {
i16 MinThreadCount = 0;
i16 MaxThreadCount = 0;
i16 DefaultThreadCount = 0;
+ i16 Priority = 0;
};
struct TIOExecutorPoolConfig {
@@ -91,11 +92,18 @@ namespace NActors {
TBalancerConfig Balancer;
};
+ struct TSelfPingInfo {
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs;
+ };
+
struct TCpuManagerConfig {
TUnitedWorkersConfig UnitedWorkers;
TVector<TBasicExecutorPoolConfig> Basic;
TVector<TIOExecutorPoolConfig> IO;
TVector<TUnitedExecutorPoolConfig> United;
+ TVector<TSelfPingInfo> PingInfoByPool;
ui32 GetExecutorsCount() const {
return Basic.size() + IO.size() + United.size();
diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp
index d9672272a0..0736caa539 100644
--- a/library/cpp/actors/core/cpu_manager.cpp
+++ b/library/cpp/actors/core/cpu_manager.cpp
@@ -23,7 +23,11 @@ namespace NActors {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx].Reset(CreateExecutorPool(excIdx));
- Harmonizer->AddPool(Executors[excIdx].Get());
+ if (excIdx < Config.PingInfoByPool.size()) {
+ Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]);
+ } else {
+ Harmonizer->AddPool(Executors[excIdx].Get());
+ }
}
}
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 36e295c231..d4209ef205 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -26,7 +26,8 @@ namespace NActors {
ui32 maxActivityType,
i16 minThreadCount,
i16 maxThreadCount,
- i16 defaultThreadCount)
+ i16 defaultThreadCount,
+ i16 priority)
: TExecutorPoolBase(poolId, threads, affinity, maxActivityType)
, SpinThreshold(spinThreshold)
, SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
@@ -43,6 +44,7 @@ namespace NActors {
, MaxThreadCount(maxThreadCount)
, DefaultThreadCount(defaultThreadCount)
, Harmonizer(harmonizer)
+ , Priority(priority)
{
i16 limit = Min(threads, (ui32)Max<i16>());
if (DefaultThreadCount) {
@@ -77,7 +79,8 @@ namespace NActors {
cfg.MaxActivityType,
cfg.MinThreadCount,
cfg.MaxThreadCount,
- cfg.DefaultThreadCount
+ cfg.DefaultThreadCount,
+ cfg.Priority
)
{}
@@ -493,4 +496,8 @@ namespace NActors {
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
return -Min<i16>(semaphore.CurrentSleepThreadCount, 0);
}
+
+ i16 TBasicExecutorPool::GetPriority() const {
+ return Priority;
+ }
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 9185ed18f1..7bdf2349ac 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -80,6 +80,8 @@ namespace NActors {
i16 DefaultThreadCount;
IHarmonizer *Harmonizer;
+ const i16 Priority = 0;
+
public:
struct TSemaphore {
i64 OldSemaphore = 0; // 34 bits
@@ -121,7 +123,8 @@ namespace NActors {
ui32 maxActivityType = 1,
i16 minThreadCount = 0,
i16 maxThreadCount = 0,
- i16 defaultThreadCount = 0);
+ i16 defaultThreadCount = 0,
+ i16 priority = 0);
explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer);
~TBasicExecutorPool();
@@ -154,6 +157,7 @@ namespace NActors {
double GetThreadConsumedUs(i16 threadIdx) override;
double GetThreadBookedUs(i16 threadIdx) override;
i16 GetBlockingThreadCount() const override;
+ i16 GetPriority() const override;
private:
void WakeUpLoop();
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index 3a0f4109bb..75f3179a27 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -15,24 +15,48 @@ namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
+constexpr bool CheckBinaryPower(ui64 value) {
+ return !(value & (value - 1));
+}
+
struct TValueHistory {
- double History[8] = {0.0};
+ static constexpr ui64 HistoryBufferSize = 8;
+ static_assert(CheckBinaryPower(HistoryBufferSize));
+
+ double History[HistoryBufferSize] = {0.0};
ui64 HistoryIdx = 0;
ui64 LastTs = Max<ui64>();
double LastUs = 0.0;
double AccumulatedUs = 0.0;
ui64 AccumulatedTs = 0;
- double GetAvgPart() {
+ template <bool WithTail=false>
+ double GetAvgPartForLastSeconds(ui8 seconds) {
double sum = AccumulatedUs;
- for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) {
- sum += History[idx];
- }
- double duration = 1'000'000.0 * (sizeof(History) / sizeof(History[0])) + Ts2Us(AccumulatedTs);
+ size_t idx = HistoryIdx;
+ ui8 leftSeconds = seconds;
+ do {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ if (WithTail || leftSeconds) {
+ sum += History[idx];
+ } else {
+ ui64 tsInSecond = Us2Ts(1'000'000.0);
+ sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond;
+ }
+ } while (leftSeconds);
+ double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0);
double avg = sum / duration;
return avg;
}
+ double GetAvgPart() {
+ return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
+ }
+
void Register(ui64 ts, double valueUs) {
if (ts < LastTs) {
LastTs = ts;
@@ -49,7 +73,7 @@ struct TValueHistory {
if (dTs > Us2Ts(8'000'000.0)) {
dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
- for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) {
+ for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
History[idx] = dUs;
}
AccumulatedUs = 0.0;
@@ -68,7 +92,7 @@ struct TValueHistory {
dTs -= addTs;
dUs -= addUs;
History[HistoryIdx] = AccumulatedUs + addUs;
- HistoryIdx = (HistoryIdx + 1) % (sizeof(History) / sizeof(History[0]));
+ HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
AccumulatedUs = 0.0;
AccumulatedTs = 0;
}
@@ -87,13 +111,21 @@ struct TPoolInfo {
i16 DefaultThreadCount = 0;
i16 MinThreadCount = 0;
i16 MaxThreadCount = 0;
+ i16 Priority = 0;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs = 0;
+ ui64 LastUpdateTs = 0;
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
+ double GetlastSecondPoolBooked(i16 threadIdx);
double GetConsumed(i16 threadIdx);
+ double GetlastSecondPoolConsumed(i16 threadIdx);
void PullStats(ui64 ts);
i16 GetThreadCount();
void SetThreadCount(i16 threadCount);
+ bool IsAvgPingGood();
};
bool TPoolInfo::IsBeingStopped(i16 threadIdx) {
@@ -105,7 +137,13 @@ double TPoolInfo::GetBooked(i16 threadIdx) {
return ThreadInfo[threadIdx].Booked.GetAvgPart();
}
return 0.0;
- //return Pool->GetThreadBooked(threadIdx);
+}
+
+double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
}
double TPoolInfo::GetConsumed(i16 threadIdx) {
@@ -113,7 +151,13 @@ double TPoolInfo::GetConsumed(i16 threadIdx) {
return ThreadInfo[threadIdx].Consumed.GetAvgPart();
}
return 0.0;
- //return Pool->GetThreadConsumed(threadIdx);
+}
+
+double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
}
#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
@@ -136,22 +180,35 @@ void TPoolInfo::SetThreadCount(i16 threadCount) {
Pool->SetThreadCount(threadCount);
}
+bool TPoolInfo::IsAvgPingGood() {
+ bool res = true;
+ if (AvgPingCounter) {
+ res &= *AvgPingCounter > MaxAvgPingUs;
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
+ }
+ return res;
+}
+
class THarmonizer: public IHarmonizer {
private:
std::atomic<bool> IsDisabled = false;
TSpinLock Lock;
std::atomic<ui64> NextHarmonizeTs = 0;
std::vector<TPoolInfo> Pools;
+ std::vector<ui16> PriorityOrder;
void PullStats(ui64 ts);
- void HarmonizeImpl();
+ void HarmonizeImpl(ui64 ts);
+ void CalculatePriorityOrder();
public:
THarmonizer(ui64 ts);
virtual ~THarmonizer();
double Rescale(double value) const;
void Harmonize(ui64 ts) override;
void DeclareEmergency(ui64 ts) override;
- void AddPool(IExecutorPool* pool) override;
+ void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
void Enable(bool enable) override;
};
@@ -172,10 +229,19 @@ void THarmonizer::PullStats(ui64 ts) {
}
}
-void THarmonizer::HarmonizeImpl() {
+Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
+ return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+}
+
+Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
+ return booked < currentThreadCount - 1;
+}
+
+void THarmonizer::HarmonizeImpl(ui64 ts) {
bool isStarvedPresent = false;
double booked = 0.0;
double consumed = 0.0;
+ double lastSecondBooked = 0.0;
i64 beingStopped = 0;
i64 total = 0;
TStackVec<size_t, 8> needyPools;
@@ -185,32 +251,44 @@ void THarmonizer::HarmonizeImpl() {
total += pool.DefaultThreadCount;
double poolBooked = 0.0;
double poolConsumed = 0.0;
+ double lastSecondPoolBooked = 0.0;
+ double lastSecondPoolConsumed = 0.0;
beingStopped += pool.Pool->GetBlockingThreadCount();
for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
poolBooked += Rescale(pool.GetBooked(threadIdx));
+ lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx));
poolConsumed += Rescale(pool.GetConsumed(threadIdx));
+ lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx));
}
- bool isStarved = false;
- if (Max(consumed, booked) > 0.1 && consumed < booked * 0.7) {
+ bool isStarved = IsStarved(consumed, booked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked);
+ if (isStarved) {
isStarvedPresent = true;
- isStarved = true;
}
ui32 currentThreadCount = pool.GetThreadCount();
- bool isNeedy = false;
- if (poolBooked >= currentThreadCount) {
+ bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount;
+ if (pool.AvgPingCounter) {
+ if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
+ isNeedy = false;
+ } else {
+ pool.LastUpdateTs = ts;
+ }
+ }
+ if (isNeedy) {
needyPools.push_back(poolIdx);
- isNeedy = true;
}
- bool isHoggish = false;
- if (poolBooked < currentThreadCount - 1) {
+ bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
+ || IsHoggish(lastSecondPoolBooked, currentThreadCount);
+ if (isHoggish) {
hoggishPools.push_back(poolIdx);
- isHoggish = true;
}
booked += poolBooked;
consumed += poolConsumed;
- LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
+ LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
+ }
+ double budget = total - Max(booked, lastSecondBooked);
+ if (budget < -0.1) {
+ isStarvedPresent = true;
}
- double budget = total - booked;
double overbooked = consumed - booked;
if (isStarvedPresent) {
// last_starved_at_consumed_value = сумма по всем пулам consumed;
@@ -223,12 +301,7 @@ void THarmonizer::HarmonizeImpl() {
for (size_t i = 0; i < Pools.size(); ++i) {
reorder.push_back(i);
}
- while (!reorder.empty()) {
- size_t rndIdx = rand() % reorder.size();
- size_t poolIdx = reorder[rndIdx];
- reorder[rndIdx] = reorder.back();
- reorder.pop_back();
-
+ for (ui16 poolIdx : PriorityOrder) {
TPoolInfo &pool = Pools[poolIdx];
i64 threadCount = pool.GetThreadCount();
if (threadCount > pool.DefaultThreadCount) {
@@ -264,6 +337,17 @@ void THarmonizer::HarmonizeImpl() {
}
}
+void THarmonizer::CalculatePriorityOrder() {
+ PriorityOrder.resize(Pools.size());
+ Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
+ Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs].Priority != Pools[rhs].Priority) {
+ return Pools[lhs].Priority < Pools[rhs].Priority;
+ }
+ return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId;
+ });
+}
+
void THarmonizer::Harmonize(ui64 ts) {
if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
@@ -279,8 +363,12 @@ void THarmonizer::Harmonize(ui64 ts) {
ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
+ if (PriorityOrder.empty()) {
+ CalculatePriorityOrder();
+ }
+
PullStats(ts);
- HarmonizeImpl();
+ HarmonizeImpl(ts);
Lock.Release();
}
@@ -289,7 +377,7 @@ void THarmonizer::DeclareEmergency(ui64 ts) {
NextHarmonizeTs = ts;
}
-void THarmonizer::AddPool(IExecutorPool* pool) {
+void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
TGuard<TSpinLock> guard(Lock);
TPoolInfo poolInfo;
poolInfo.Pool = pool;
@@ -297,8 +385,15 @@ void THarmonizer::AddPool(IExecutorPool* pool) {
poolInfo.MinThreadCount = pool->GetMinThreadCount();
poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount);
+ poolInfo.Priority = pool->GetPriority();
pool->SetThreadCount(poolInfo.DefaultThreadCount);
+ if (pingInfo) {
+ poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
+ poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
+ poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
+ }
Pools.push_back(poolInfo);
+ PriorityOrder.clear();
};
void THarmonizer::Enable(bool enable) {
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index dae8d7de0c..a171930d9a 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -12,7 +12,7 @@ namespace NActors {
virtual ~IHarmonizer() {}
virtual void Harmonize(ui64 ts) = 0;
virtual void DeclareEmergency(ui64 ts) = 0;
- virtual void AddPool(IExecutorPool* pool) = 0;
+ virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0;
virtual void Enable(bool enable) = 0;
};
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index 33ac7b0f5e..11bbf81287 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -169,27 +169,27 @@
PROBE(ThreadCount, GROUPS("BasicThreadPool"), \
TYPES(ui32, TString, ui32, ui32, ui32, ui32), \
NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \
- PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
- TYPES(ui32, TString, double, double, ui32, ui32, bool, bool, bool), \
- NAMES("poolId", "pool", "booked", "consumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \
- PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \
+ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \
+ NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \
+ PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \
TYPES(ui32, TString, TString, ui32, ui32, ui32), \
NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \
- PROBE(TryToHarmonize, GROUPS("Harmonizer"), \
+ PROBE(TryToHarmonize, GROUPS("Harmonizer"), \
TYPES(ui32, TString), \
NAMES("poolId", "pool")) \
- PROBE(SavedValues, GROUPS("Harmonizer"), \
+ PROBE(SavedValues, GROUPS("Harmonizer"), \
TYPES(ui32, TString, TString, double, double, double, double, double, double, double, double), \
NAMES("poolId", "pool", "valueName", "[0]", "[1]", "[2]", "[3]", "[4]", "[5]", "[6]", "[7]")) \
- PROBE(RegisterValue, GROUPS("Harmonizer"), \
+ PROBE(RegisterValue, GROUPS("Harmonizer"), \
TYPES(ui64, ui64, ui64, ui64, double, double, double), \
NAMES("ts", "lastTs", "dTs", "8sTs", "us", "lastUs", "dUs")) \
- PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \
- TYPES(ui64, ui64, bool, bool), \
- NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \
+ PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, bool, bool), \
+ NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \
PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \
- TYPES(ui64, ui64, ui64), \
- NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \
+ TYPES(ui64, ui64, ui64), \
+ NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \
/**/
LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER)
diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp
index f9bfaf8dc0..dc383f8c4c 100644
--- a/library/cpp/actors/helpers/selfping_actor.cpp
+++ b/library/cpp/actors/helpers/selfping_actor.cpp
@@ -61,10 +61,14 @@ struct TAvgOperation {
class TSelfPingActor : public TActorBootstrapped<TSelfPingActor> {
private:
const TDuration SendInterval;
- const NMonitoring::TDynamicCounters::TCounterPtr Counter;
+ const NMonitoring::TDynamicCounters::TCounterPtr MaxPingCounter;
+ const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter;
- NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow;
+ NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> MaxPingSlidingWindow;
+ NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSlidingWindow;
+ NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSmallSlidingWindow;
NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow;
THPTimer Timer;
@@ -74,12 +78,19 @@ public:
return SELF_PING_ACTOR;
}
- TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ TSelfPingActor(TDuration sendInterval,
+ const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
: SendInterval(sendInterval)
- , Counter(counter)
+ , MaxPingCounter(maxPingCounter)
+ , AvgPingCounter(avgPingCounter)
+ , AvgPingCounterWithSmallWindow(avgPingSmallWindowCounter)
, CalculationTimeCounter(calculationTimeCounter)
- , SlidingWindow(TDuration::Seconds(15), 100)
+ , MaxPingSlidingWindow(TDuration::Seconds(15), 100)
+ , AvgPingSlidingWindow(TDuration::Seconds(15), 100)
+ , AvgPingSmallSlidingWindow(TDuration::Seconds(1), 100)
, CalculationSlidingWindow(TDuration::Seconds(15), 100)
{
}
@@ -154,11 +165,23 @@ public:
const double passedTime = hpNow - e.TimeStart;
const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0;
- *Counter = SlidingWindow.Update(delayUs, now);
+ if (MaxPingCounter) {
+ *MaxPingCounter = MaxPingSlidingWindow.Update(delayUs, now);
+ }
+ if (AvgPingCounter) {
+ auto res = AvgPingSlidingWindow.Update({1, delayUs}, now);
+ *AvgPingCounter = double(res.Sum) / double(res.Count + 1);
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ auto res = AvgPingSmallSlidingWindow.Update({1, delayUs}, now);
+ *AvgPingCounterWithSmallWindow = double(res.Sum) / double(res.Count + 1);
+ }
- ui64 d = MeasureTaskDurationNs();
- auto res = CalculationSlidingWindow.Update({1, d}, now);
- *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
+ if (CalculationTimeCounter) {
+ ui64 d = MeasureTaskDurationNs();
+ auto res = CalculationSlidingWindow.Update({1, d}, now);
+ *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
+ }
SchedulePing(ctx, hpNow);
}
@@ -174,10 +197,12 @@ private:
IActor* CreateSelfPingActor(
TDuration sendInterval,
- const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
{
- return new TSelfPingActor(sendInterval, counter, calculationTimeCounter);
+ return new TSelfPingActor(sendInterval, maxPingCounter, avgPingCounter, avgPingSmallWindowCounter, calculationTimeCounter);
}
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h
index d7d07f9fa8..a976a4f425 100644
--- a/library/cpp/actors/helpers/selfping_actor.h
+++ b/library/cpp/actors/helpers/selfping_actor.h
@@ -7,7 +7,9 @@ namespace NActors {
NActors::IActor* CreateSelfPingActor(
TDuration sendInterval,
- const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter);
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index 459635fa24..542f817755 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -22,13 +22,17 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr());
NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr());
+ NMonitoring::TDynamicCounters::TCounterPtr counter3(new NMonitoring::TCounterForPtr());
+ NMonitoring::TDynamicCounters::TCounterPtr counter4(new NMonitoring::TCounterForPtr());
auto actor = CreateSelfPingActor(
TDuration::MilliSeconds(100), // sendInterval (unused in test)
- counter, counter2);
+ counter, counter2, counter3, counter4);
UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);
UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counter3->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counter4->Val(), 0);
const TActorId actorId = runtime->Register(actor);
Y_UNUSED(actorId);
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index e037c7a673..e4c1025bf5 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -236,19 +236,35 @@ static TCpuMask ParseAffinity(const TConfig& cfg) {
return result;
}
+TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) {
+ return systemConfig.HasSelfPingInterval()
+ ? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval())
+ : TDuration::MilliSeconds(10);
+}
+
void AddExecutorPool(
TCpuManagerConfig& cpuManager,
const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig,
const NKikimrConfig::TActorSystemConfig& systemConfig,
ui32 poolId,
ui32 maxActivityType,
- ui32& unitedThreads)
+ ui32& unitedThreads,
+ const NKikimr::TAppData* appData)
{
+ const auto counters = GetServiceCounters(appData->Counters, "utils");
switch (poolConfig.GetType()) {
case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: {
TBasicExecutorPoolConfig basic;
basic.PoolId = poolId;
basic.PoolName = poolConfig.GetName();
+ if (poolConfig.HasMaxAvgPingDeviation()) {
+ auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName);
+ auto &poolInfo = cpuManager.PingInfoByPool[poolId];
+ poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
+ poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
+ TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation());
+ poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds();
+ }
basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads());
basic.SpinThreshold = poolConfig.GetSpinThreshold();
basic.Affinity = ParseAffinity(poolConfig.GetAffinity());
@@ -268,6 +284,7 @@ void AddExecutorPool(
basic.MinThreadCount = poolConfig.GetMinThreads();
basic.MaxThreadCount = poolConfig.GetMaxThreads();
basic.DefaultThreadCount = poolConfig.GetThreads();
+ basic.Priority = poolConfig.GetPriority();
cpuManager.Basic.emplace_back(std::move(basic));
break;
}
@@ -353,11 +370,14 @@ static TUnitedWorkersConfig CreateUnitedWorkersConfig(const NKikimrConfig::TActo
return result;
}
-static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, ui32 maxActivityType) {
+static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, ui32 maxActivityType,
+ const NKikimr::TAppData* appData)
+{
TCpuManagerConfig cpuManager;
ui32 unitedThreads = 0;
+ cpuManager.PingInfoByPool.resize(config.GetExecutor().size());
for (int poolId = 0; poolId < config.GetExecutor().size(); poolId++) {
- AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, maxActivityType, unitedThreads);
+ AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, maxActivityType, unitedThreads, appData);
}
cpuManager.UnitedWorkers = CreateUnitedWorkersConfig(config.GetUnitedWorkers(), unitedThreads);
return cpuManager;
@@ -553,7 +573,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
setup->NodeId = NodeId;
setup->MaxActivityType = GetActivityTypeCount();
- setup->CpuManager = CreateCpuManagerConfig(systemConfig, setup->MaxActivityType);
+ setup->CpuManager = CreateCpuManagerConfig(systemConfig, setup->MaxActivityType, appData);
for (ui32 poolId = 0; poolId != setup->GetExecutorsCount(); ++poolId) {
const auto &execConfig = systemConfig.GetExecutor(poolId);
if (execConfig.HasInjectMadSquirrels()) {
@@ -1743,9 +1763,11 @@ void TSelfPingInitializer::InitializeServices(
for (size_t poolId = 0; poolId < setup->GetExecutorsCount(); ++poolId) {
const auto& poolName = setup->GetPoolName(poolId);
auto poolGroup = counters->GetSubgroup("execpool", poolName);
- auto counter = poolGroup->GetCounter("SelfPingMaxUs", false);
+ auto maxPingCounter = poolGroup->GetCounter("SelfPingMaxUs", false);
+ auto avgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
+ auto avgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
auto cpuTimeCounter = poolGroup->GetCounter("CpuMatBenchNs", false);
- IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, counter, cpuTimeCounter);
+ IActor* selfPingActor = CreateSelfPingActor(selfPingInterval, maxPingCounter, avgPingCounter, avgPingCounterWithSmallWindow, cpuTimeCounter);
setup->LocalServices.push_back(std::make_pair(TActorId(),
TActorSetupCmd(selfPingActor,
TMailboxType::HTSwap,
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 3bef73182b..98c8f61b6b 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -66,6 +66,10 @@ message TActorSystemConfig {
optional uint32 MaxThreads = 13; // Higher balancing bound, should be not lower than `Threads`
optional uint32 BalancingPriority = 14; // Priority of pool to obtain cpu due to balancing (higher is better)
optional uint64 ToleratedLatencyUs = 15; // p100-latency threshold indicating that more cpus are required by pool
+
+ // Actorsystem 1.4
+ optional int32 Priority = 16;
+ optional int32 MaxAvgPingDeviation = 17;
}
message TScheduler {