aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-05-05 11:09:01 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-05-05 11:09:01 +0300
commitb5a989b16cafa8a3b3bc076f1097a0eda6f48c06 (patch)
tree4da744117a5aab37758921fa43b95a3068e5aec1 /library/cpp/actors
parentfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (diff)
downloadydb-b5a989b16cafa8a3b3bc076f1097a0eda6f48c06.tar.gz
Ydb stable 23-1-2623.1.26
x-stable-origin-commit: 22184a7e157553d447f17a2dffc4ea2d32dfd74d
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor.h23
-rw-r--r--library/cpp/actors/core/actorsystem.cpp5
-rw-r--r--library/cpp/actors/core/actorsystem.h2
-rw-r--r--library/cpp/actors/core/cpu_manager.h7
-rw-r--r--library/cpp/actors/core/event.h3
-rw-r--r--library/cpp/actors/core/event_pb.h2
-rw-r--r--library/cpp/actors/core/event_pb_payload_ut.cpp4
-rw-r--r--library/cpp/actors/core/executor_pool.h7
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp8
-rw-r--r--library/cpp/actors/core/executor_thread.cpp8
-rw-r--r--library/cpp/actors/core/harmonizer.cpp149
-rw-r--r--library/cpp/actors/core/harmonizer.h16
-rw-r--r--library/cpp/actors/core/mon_stats.h6
-rw-r--r--library/cpp/actors/core/worker_context.h8
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h47
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp46
-rw-r--r--library/cpp/actors/interconnect/poller_actor_darwin.h10
-rw-r--r--library/cpp/actors/interconnect/poller_actor_linux.h12
-rw-r--r--library/cpp/actors/interconnect/poller_actor_win.h10
19 files changed, 307 insertions, 66 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 9ed3608223..b67c04b09b 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -29,6 +29,7 @@ namespace NActors {
ui32 CapturedActivation = 0;
ESendingType CapturedType = ESendingType::Lazy;
ESendingType SendingType = ESendingType::Common;
+ bool IsEnoughCpu = true;
};
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext;
@@ -436,20 +437,24 @@ namespace NActors {
// must be called to wrap any call trasitions from one actor to another
template<typename TActor, typename TMethod, typename... TArgs>
- static decltype((std::declval<TActor>().*std::declval<TMethod>())(std::declval<TArgs>()...))
- InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
- struct TRecurseContext: TActorContext {
- TActivationContext* Prev;
+ static std::invoke_result_t<TMethod, TActor, TArgs...> InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
+ struct TRecurseContext : TActorContext {
+ TActivationContext* const Prev;
+
TRecurseContext(const TActorId& actorId)
: TActorContext(TActivationContext::ActorContextFor(actorId))
- , Prev(TlsActivationContext) {
+ , Prev(TlsActivationContext)
+ {
TlsActivationContext = this;
}
+
~TRecurseContext() {
+ Y_VERIFY(TlsActivationContext == this, "TlsActivationContext mismatch; probably InvokeOtherActor was invoked from a coroutine");
TlsActivationContext = Prev;
}
} context(actor.SelfId());
- return (actor.*method)(std::forward<TArgs>(args)...);
+
+ return std::invoke(std::forward<TMethod>(method), actor, std::forward<TArgs>(args)...);
}
virtual void Registered(TActorSystem* sys, const TActorId& owner);
@@ -486,6 +491,12 @@ namespace NActors {
}
protected:
+ void SetEnoughCpu(bool isEnough) {
+ if (TlsThreadContext) {
+ TlsThreadContext->IsEnoughCpu = isEnough;
+ }
+ }
+
void Describe(IOutputStream&) const noexcept override;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index b8896acb34..21cca94d40 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -217,6 +217,11 @@ namespace NActors {
CpuManager->GetPoolStats(poolId, poolStats, statsCopy);
}
+ THarmonizerStats TActorSystem::GetHarmonizerStats() const {
+ return CpuManager->GetHarmonizerStats();
+
+ }
+
void TActorSystem::Start() {
Y_VERIFY(StartExecuted == false);
StartExecuted = true;
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index cd2cfda1bb..9f2483245a 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -299,6 +299,8 @@ namespace NActors {
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
+ THarmonizerStats GetHarmonizerStats() const;
+
void DeferPreStop(std::function<void()> fn) {
DeferredPreStop.push_back(std::move(fn));
}
diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h
index e2e3861a3b..c3a7588639 100644
--- a/library/cpp/actors/core/cpu_manager.h
+++ b/library/cpp/actors/core/cpu_manager.h
@@ -40,6 +40,13 @@ namespace NActors {
}
}
+ THarmonizerStats GetHarmonizerStats() const {
+ if (Harmonizer) {
+ return Harmonizer->GetStats();
+ }
+ return {};
+ }
+
private:
IExecutorPool* CreateExecutorPool(ui32 poolId);
};
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6b92edaf41..90ae16ac26 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -80,7 +80,8 @@ namespace NActors {
Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
if (!Event) {
- Event.Reset(TEventType::Load(Buffer.Get()));
+ static TEventSerializedData empty;
+ Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty));
}
if (Event) {
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 2d388fceeb..38058df749 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -228,7 +228,7 @@ namespace NActors {
return result;
}
- static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) {
+ static IEventBase* Load(TEventSerializedData *input) {
THolder<TEventPBBase> ev(new TEv());
if (!input->GetSize()) {
Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString());
diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp
index eab007bc15..7024d338d5 100644
--- a/library/cpp/actors/core/event_pb_payload_ut.cpp
+++ b/library/cpp/actors/core/event_pb_payload_ut.cpp
@@ -66,7 +66,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
}
UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser);
- THolder<IEventBase> ev2 = THolder(TEventTo::Load(buffers));
+ THolder<IEventBase> ev2 = THolder(TEventTo::Load(buffers.Get()));
TEventTo& msg2 = static_cast<TEventTo&>(*ev2);
UNIT_ASSERT_VALUES_EQUAL(msg2.Record.GetMeta(), msg.Record.GetMeta());
UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(0)), msg.GetPayload(msg.Record.GetPayloadId(0)));
@@ -142,7 +142,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
// deserialize
auto data = MakeIntrusive<TEventSerializedData>(ser1, false);
- THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data)));
+ THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data.Get())));
UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization
auto& record = parsedEvent->GetRecord();
UNIT_ASSERT_VALUES_EQUAL(record.GetMeta(), msg.GetMeta());
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index c7c85e61fd..f4def74077 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -13,6 +13,13 @@ namespace NActors {
struct TCpuConsumption {
double ConsumedUs = 0;
double BookedUs = 0;
+ ui64 NotEnoughCpuExecutions = 0;
+
+ void Add(const TCpuConsumption& other) {
+ ConsumedUs += other.ConsumedUs;
+ BookedUs += other.BookedUs;
+ NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
+ }
};
class IExecutorPool : TNonCopyable {
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index de04105991..fc87daed77 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -337,7 +337,7 @@ namespace NActors {
poolStats.DefaultThreadCount = DefaultThreadCount;
poolStats.MaxThreadCount = MaxThreadCount;
if (Harmonizer) {
- TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId);
+ TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolStats.IsNeedy = stats.IsNeedy;
poolStats.IsStarved = stats.IsStarved;
poolStats.IsHoggish = stats.IsHoggish;
@@ -345,6 +345,10 @@ namespace NActors {
poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState;
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
+ poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
+ poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
+ poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
+ poolStats.MinBookedCpuUs = stats.MinBookedCpu;
}
statsCopy.resize(PoolThreads + 1);
@@ -500,7 +504,7 @@ namespace NActors {
TThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStats(stats);
- return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)};
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 3c5dc2c2b4..f05b1d4479 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -364,7 +364,8 @@ namespace NActors {
}
#undef EXECUTE_MAILBOX
hpnow = GetCycleCountFast();
- execCycles += hpnow - hpprev;
+ i64 currentExecCycles = hpnow - hpprev;
+ execCycles += currentExecCycles;
hpprev = hpnow;
execCount++;
if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval)
@@ -377,6 +378,11 @@ namespace NActors {
nonExecCycles = 0;
Ctx.UpdateThreadTime();
}
+
+ if (!TlsThreadContext->IsEnoughCpu) {
+ Ctx.IncreaseNotEnoughCpuExecutions();
+ TlsThreadContext->IsEnoughCpu = true;
+ }
}
}
LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId);
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index e2fd0c5f24..a1838ee2b0 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -19,8 +19,8 @@ constexpr bool CheckBinaryPower(ui64 value) {
return !(value & (value - 1));
}
+template <ui8 HistoryBufferSize = 8>
struct TValueHistory {
- static constexpr ui64 HistoryBufferSize = 8;
static_assert(CheckBinaryPower(HistoryBufferSize));
double History[HistoryBufferSize] = {0.0};
@@ -31,32 +31,87 @@ struct TValueHistory {
ui64 AccumulatedTs = 0;
template <bool WithTail=false>
- double GetAvgPartForLastSeconds(ui8 seconds) {
- double sum = AccumulatedUs;
+ double Accumulate(auto op, auto comb, ui8 seconds) {
+ double acc = AccumulatedUs;
size_t idx = HistoryIdx;
ui8 leftSeconds = seconds;
+ if constexpr (!WithTail) {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ acc = History[idx];
+ }
do {
idx--;
leftSeconds--;
if (idx >= HistoryBufferSize) {
idx = HistoryBufferSize - 1;
}
- if (WithTail || leftSeconds) {
- sum += History[idx];
+ if constexpr (WithTail) {
+ acc = op(acc, History[idx]);
+ } else if (leftSeconds) {
+ acc = op(acc, History[idx]);
} else {
ui64 tsInSecond = Us2Ts(1'000'000.0);
- sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond;
+ acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond);
}
} while (leftSeconds);
- double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0);
- double avg = sum / duration;
- return avg;
+ double duration = 1'000'000.0 * seconds;
+ if constexpr (WithTail) {
+ duration += Ts2Us(AccumulatedTs);
+ }
+ return comb(acc, duration);
+ }
+
+ template <bool WithTail=false>
+ double GetAvgPartForLastSeconds(ui8 seconds) {
+ auto sum = [](double acc, double value) {
+ return acc + value;
+ };
+ auto avg = [](double sum, double duration) {
+ return sum / duration;
+ };
+ return Accumulate<WithTail>(sum, avg, seconds);
}
double GetAvgPart() {
return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
}
+ double GetMaxForLastSeconds(ui8 seconds) {
+ auto max = [](const double& acc, const double& value) {
+ return Max(acc, value);
+ };
+ auto fst = [](const double& value, const double&) { return value; };
+ return Accumulate<false>(max, fst, seconds);
+ }
+
+ double GetMax() {
+ return GetMaxForLastSeconds(HistoryBufferSize);
+ }
+
+ i64 GetMaxInt() {
+ return static_cast<i64>(GetMax());
+ }
+
+ double GetMinForLastSeconds(ui8 seconds) {
+ auto min = [](const double& acc, const double& value) {
+ return Min(acc, value);
+ };
+ auto fst = [](const double& value, const double&) { return value; };
+ return Accumulate<false>(min, fst, seconds);
+ }
+
+ double GetMin() {
+ return GetMinForLastSeconds(HistoryBufferSize);
+ }
+
+ i64 GetMinInt() {
+ return static_cast<i64>(GetMin());
+ }
+
void Register(ui64 ts, double valueUs) {
if (ts < LastTs) {
LastTs = ts;
@@ -101,8 +156,8 @@ struct TValueHistory {
};
struct TThreadInfo {
- TValueHistory Consumed;
- TValueHistory Booked;
+ TValueHistory<8> Consumed;
+ TValueHistory<8> Booked;
};
struct TPoolInfo {
@@ -116,6 +171,8 @@ struct TPoolInfo {
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
ui32 MaxAvgPingUs = 0;
ui64 LastUpdateTs = 0;
+ ui64 NotEnoughCpuExecutions = 0;
+ ui64 NewNotEnoughCpuExecutions = 0;
TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
TAtomic IncreasingThreadsByNeedyState = 0;
@@ -123,12 +180,20 @@ struct TPoolInfo {
TAtomic DecreasingThreadsByHoggishState = 0;
TAtomic PotentialMaxThreadCount = 0;
+ TValueHistory<16> Consumed;
+ TValueHistory<16> Booked;
+
+ TAtomic MaxConsumedCpu = 0;
+ TAtomic MinConsumedCpu = 0;
+ TAtomic MaxBookedCpu = 0;
+ TAtomic MinBookedCpu = 0;
+
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
double GetlastSecondPoolBooked(i16 threadIdx);
double GetConsumed(i16 threadIdx);
double GetlastSecondPoolConsumed(i16 threadIdx);
- void PullStats(ui64 ts);
+ TCpuConsumption PullStats(ui64 ts);
i16 GetThreadCount();
void SetThreadCount(i16 threadCount);
bool IsAvgPingGood();
@@ -167,15 +232,26 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
}
#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
-void TPoolInfo::PullStats(ui64 ts) {
+TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
+ TCpuConsumption acc;
for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
TThreadInfo &threadInfo = ThreadInfo[threadIdx];
TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
+ acc.Add(cpuConsumption);
threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
}
+ Consumed.Register(ts, acc.ConsumedUs);
+ RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt());
+ RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt());
+ Booked.Register(ts, acc.BookedUs);
+ RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt());
+ RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
+ NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
+ NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
+ return acc;
}
#undef UNROLL_HISTORY
@@ -206,6 +282,14 @@ private:
std::vector<TPoolInfo> Pools;
std::vector<ui16> PriorityOrder;
+ TValueHistory<16> Consumed;
+ TValueHistory<16> Booked;
+
+ TAtomic MaxConsumedCpu = 0;
+ TAtomic MinConsumedCpu = 0;
+ TAtomic MaxBookedCpu = 0;
+ TAtomic MinBookedCpu = 0;
+
void PullStats(ui64 ts);
void HarmonizeImpl(ui64 ts);
void CalculatePriorityOrder();
@@ -217,7 +301,8 @@ public:
void DeclareEmergency(ui64 ts) override;
void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
void Enable(bool enable) override;
- TPoolHarmonizedStats GetPoolStats(i16 poolId) const override;
+ TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
+ THarmonizerStats GetStats() const override;
};
THarmonizer::THarmonizer(ui64 ts) {
@@ -232,13 +317,21 @@ double THarmonizer::Rescale(double value) const {
}
void THarmonizer::PullStats(ui64 ts) {
+ TCpuConsumption acc;
for (TPoolInfo &pool : Pools) {
- pool.PullStats(ts);
+ TCpuConsumption consumption = pool.PullStats(ts);
+ acc.Add(consumption);
}
+ Consumed.Register(ts, acc.ConsumedUs);
+ RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt());
+ RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt());
+ Booked.Register(ts, acc.BookedUs);
+ RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt());
+ RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
}
Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
- return consumed < booked * 0.7;
+ return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
}
Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
@@ -273,7 +366,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
isStarvedPresent = true;
}
ui32 currentThreadCount = pool.GetThreadCount();
- bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount;
+ bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount;
if (pool.AvgPingCounter) {
if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
isNeedy = false;
@@ -304,6 +397,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt));
}
double overbooked = consumed - booked;
+ if (overbooked < 0) {
+ isStarvedPresent = false;
+ }
if (isStarvedPresent) {
// last_starved_at_consumed_value = сумма по всем пулам consumed;
// TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
@@ -319,7 +415,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
TPoolInfo &pool = Pools[poolIdx];
i64 threadCount = pool.GetThreadCount();
while (threadCount > pool.DefaultThreadCount) {
- pool.SetThreadCount(threadCount - 1);
+ pool.SetThreadCount(--threadCount);
AtomicIncrement(pool.DecreasingThreadsByStarvedState);
overbooked--;
LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
@@ -425,13 +521,17 @@ IHarmonizer* MakeHarmonizer(ui64 ts) {
return new THarmonizer(ts);
}
-TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const {
+TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
const TPoolInfo &pool = Pools[poolId];
ui64 flags = RelaxedLoad(&pool.LastFlags);
- return TPoolHarmonizedStats {
+ return TPoolHarmonizerStats{
.IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
.DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
.DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxConsumedCpu)),
+ .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MinConsumedCpu)),
+ .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxBookedCpu)),
+ .MinBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MinBookedCpu)),
.PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
.IsNeedy = static_cast<bool>(flags & 1),
.IsStarved = static_cast<bool>(flags & 2),
@@ -439,4 +539,13 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const {
};
}
+THarmonizerStats THarmonizer::GetStats() const {
+ return THarmonizerStats{
+ .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)),
+ .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)),
+ .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)),
+ .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)),
+ };
+}
+
}
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index bc6b938fe8..7c66ff54c6 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -6,16 +6,27 @@
namespace NActors {
class IExecutorPool;
- struct TPoolHarmonizedStats {
+ struct TPoolHarmonizerStats {
ui64 IncreasingThreadsByNeedyState = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ i64 MaxConsumedCpu = 0.0;
+ i64 MinConsumedCpu = 0.0;
+ i64 MaxBookedCpu = 0.0;
+ i64 MinBookedCpu = 0.0;
i16 PotentialMaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
};
+ struct THarmonizerStats {
+ i64 MaxConsumedCpu = 0.0;
+ i64 MinConsumedCpu = 0.0;
+ i64 MaxBookedCpu = 0.0;
+ i64 MinBookedCpu = 0.0;
+ };
+
// Pool cpu harmonizer
class IHarmonizer {
public:
@@ -24,7 +35,8 @@ namespace NActors {
virtual void DeclareEmergency(ui64 ts) = 0;
virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0;
virtual void Enable(bool enable) = 0;
- virtual TPoolHarmonizedStats GetPoolStats(i16 poolId) const = 0;
+ virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0;
+ virtual THarmonizerStats GetStats() const = 0;
};
IHarmonizer* MakeHarmonizer(ui64 ts);
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 4c664a964a..5d61c9f87c 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -63,6 +63,10 @@ namespace NActors {
ui64 IncreasingThreadsByNeedyState = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ i64 MaxConsumedCpuUs = 0;
+ i64 MinConsumedCpuUs = 0;
+ i64 MaxBookedCpuUs = 0;
+ i64 MinBookedCpuUs = 0;
i16 WrongWakenedThreadCount = 0;
i16 CurrentThreadCount = 0;
i16 PotentialMaxThreadCount = 0;
@@ -100,6 +104,7 @@ namespace NActors {
ui64 MailboxPushedOutBySoftPreemption = 0;
ui64 MailboxPushedOutByTime = 0;
ui64 MailboxPushedOutByEventCount = 0;
+ ui64 NotEnoughCpuExecutions = 0;
TExecutorThreadStats(size_t activityVecSize = 5) // must be not empty as 0 used as default
: ElapsedTicksByActivity(activityVecSize)
@@ -136,6 +141,7 @@ namespace NActors {
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
+ NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions);
ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index c3a2947df1..cc8da2ff77 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -104,7 +104,7 @@ namespace NActors {
i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0;
double usec = NHPTimer::GetSeconds(ts) * 1000000.0;
Stats->ActivationTimeHistogram.Add(usec);
- Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec);
+ RelaxedStore(&Stats->WorstActivationTimeUs, Max(Stats->WorstActivationTimeUs, (ui64)usec));
return usec;
}
@@ -140,6 +140,11 @@ namespace NActors {
RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks));
RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime());
}
+
+ void IncreaseNotEnoughCpuExecutions() {
+ RelaxedStore(&WorkerStats.NotEnoughCpuExecutions,
+ (ui64)RelaxedLoad(&WorkerStats.NotEnoughCpuExecutions) + 1);
+ }
#else
void GetCurrentStats(TExecutorThreadStats&) const {}
inline void AddElapsedCycles(ui32, i64) {}
@@ -159,6 +164,7 @@ namespace NActors {
i64 AddEventProcessingStats(i64, i64, ui32, ui64) { return 0; }
void UpdateActorsStats(size_t, IExecutorPool*) {}
void UpdateThreadTime() {}
+ void IncreaseNotEnoughCpuExecutions() {}
#endif
void Switch(IExecutorPool* executor,
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index d80951827d..51ace0e3cc 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -135,6 +135,11 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
+ NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
THistogramCounters LegacyActivationTimeHistogram;
@@ -190,6 +195,11 @@ private:
IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
+ NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
+ MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false);
+ MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false);
+ MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false);
+ MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false);
LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
ActivationTimeHistogram = PoolGroup->GetHistogram(
@@ -237,6 +247,7 @@ private:
*IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
*DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
*DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
+ *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
ActivationTimeHistogram->Reset();
@@ -281,6 +292,38 @@ private:
}
};
+ struct TActorSystemCounters {
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Group;
+
+ NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
+
+ void Init(NMonitoring::TDynamicCounters* group) {
+ Group = group;
+
+ MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false);
+ MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false);
+ MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false);
+ MinBookedCpu = Group->GetCounter("MinBookedCpu", false);
+ }
+
+ void Set(const THarmonizerStats& harmonizerStats) {
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu;
+ *MinConsumedCpu = harmonizerStats.MinConsumedCpu;
+ *MaxBookedCpu = harmonizerStats.MaxBookedCpu;
+ *MinBookedCpu = harmonizerStats.MinBookedCpu;
+#else
+ Y_UNUSED(poolStats);
+ Y_UNUSED(stats);
+ Y_UNUSED(numThreads);
+#endif
+ }
+
+ };
+
public:
static constexpr IActor::EActivityType ActorActivityType() {
return IActor::ACTORLIB_STATS;
@@ -297,6 +340,7 @@ public:
for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
}
+ ActorSystemCounters.Init(Counters.Get());
}
void Bootstrap(const TActorContext& ctx) {
@@ -322,6 +366,8 @@ private:
ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);
SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);
}
+ THarmonizerStats harmonizerStats = ctx.ExecutorThread.ActorSystem->GetHarmonizerStats();
+ ActorSystemCounters.Set(harmonizerStats);
OnWakeup(ctx);
@@ -343,6 +389,7 @@ protected:
NMonitoring::TDynamicCounterPtr Counters;
TVector<TExecutorPoolCounters> PoolCounters;
+ TActorSystemCounters ActorSystemCounters;
};
} // NActors
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..04e17c24ab 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -146,8 +146,7 @@ namespace NActors {
wrapper.Wait();
}
- bool DrainReadEnd() {
- size_t totalRead = 0;
+ void DrainReadEnd() {
char buffer[4096];
for (;;) {
ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
@@ -162,37 +161,38 @@ namespace NActors {
}
} else {
Y_VERIFY(n);
- totalRead += n;
}
}
- return totalRead;
}
bool ProcessSyncOpQueue() {
- if (DrainReadEnd()) {
- Y_VERIFY(!SyncOperationsQ.IsEmpty());
- do {
- TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
- if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
- static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
- op->SignalDone();
- } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
- op->SignalDone();
- return false; // terminate the thread
- } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
- op->SignalDone();
- } else {
- Y_FAIL();
- }
- } while (SyncOperationsQ.Pop());
- }
+ Y_VERIFY(!SyncOperationsQ.IsEmpty());
+ do {
+ TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
+ if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
+ static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
+ op->SignalDone();
+ } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
+ op->SignalDone();
+ return false; // terminate the thread
+ } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
+ op->SignalDone();
+ } else {
+ Y_FAIL();
+ }
+ } while (SyncOperationsQ.Pop());
return true;
}
void *ThreadProc() override {
SetCurrentThreadName("network poller");
- while (ProcessSyncOpQueue()) {
- static_cast<TDerived&>(*this).ProcessEventsInLoop();
+ for (;;) {
+ if (static_cast<TDerived&>(*this).ProcessEventsInLoop()) { // need to process the queue
+ DrainReadEnd();
+ if (!ProcessSyncOpQueue()) {
+ break;
+ }
+ }
}
return nullptr;
}
diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h
index 4cb0a58f8d..31c1144794 100644
--- a/library/cpp/actors/interconnect/poller_actor_darwin.h
+++ b/library/cpp/actors/interconnect/poller_actor_darwin.h
@@ -45,18 +45,20 @@ namespace NActors {
close(KqDescriptor);
}
- void ProcessEventsInLoop() {
+ bool ProcessEventsInLoop() {
std::array<struct kevent, 256> events;
int numReady = kevent(KqDescriptor, nullptr, 0, events.data(), events.size(), nullptr);
if (numReady == -1) {
if (errno == EINTR) {
- return;
+ return false;
} else {
Y_FAIL("kevent() failed with %s", strerror(errno));
}
}
+ bool res = false;
+
for (int i = 0; i < numReady; ++i) {
const struct kevent& ev = events[i];
if (ev.udata) {
@@ -65,8 +67,12 @@ namespace NActors {
const bool read = error || ev.filter == EVFILT_READ;
const bool write = error || ev.filter == EVFILT_WRITE;
Notify(it, read, write);
+ } else {
+ res = true;
}
}
+
+ return res;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h
index dd4f7c0124..6bd2cc258f 100644
--- a/library/cpp/actors/interconnect/poller_actor_linux.h
+++ b/library/cpp/actors/interconnect/poller_actor_linux.h
@@ -30,7 +30,7 @@ namespace NActors {
close(EpollDescriptor);
}
- void ProcessEventsInLoop() {
+ bool ProcessEventsInLoop() {
// preallocated array for events
std::array<epoll_event, 256> events;
@@ -42,12 +42,14 @@ namespace NActors {
// check return status for any errors
if (numReady == -1) {
if (errno == EINTR) {
- return; // restart the call a bit later
+ return false; // restart the call a bit later
} else {
Y_FAIL("epoll_wait() failed with %s", strerror(errno));
}
}
+ bool res = false;
+
for (int i = 0; i < numReady; ++i) {
const epoll_event& ev = events[i];
if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
@@ -73,8 +75,12 @@ namespace NActors {
// issue notifications
Notify(record, read, write);
+ } else {
+ res = true;
}
}
+
+ return res;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
@@ -110,5 +116,5 @@ namespace NActors {
};
using TPollerThread = TEpollThread;
-
+
} // namespace NActors
diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h
index 4b4caa0ebd..e593cbafd1 100644
--- a/library/cpp/actors/interconnect/poller_actor_win.h
+++ b/library/cpp/actors/interconnect/poller_actor_win.h
@@ -23,7 +23,7 @@ namespace NActors {
Stop();
}
- void ProcessEventsInLoop() {
+ bool ProcessEventsInLoop() {
fd_set readfds, writefds, exceptfds;
FD_ZERO(&readfds);
@@ -51,12 +51,14 @@ namespace NActors {
if (res == -1) {
const int err = LastSocketError();
if (err == EINTR) {
- return; // try a bit later
+ return false; // try a bit later
} else {
Y_FAIL("select() failed with %s", strerror(err));
}
}
+ bool flag = false;
+
with_lock (Mutex) {
for (const auto& [fd, record] : Descriptors) {
if (record) {
@@ -70,9 +72,13 @@ namespace NActors {
record->Flags &= ~WRITE;
}
Notify(record.Get(), read, write);
+ } else {
+ flag = true;
}
}
}
+
+ return flag;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {