diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-05-05 11:09:01 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-05-05 11:09:01 +0300 |
commit | b5a989b16cafa8a3b3bc076f1097a0eda6f48c06 (patch) | |
tree | 4da744117a5aab37758921fa43b95a3068e5aec1 /library/cpp/actors | |
parent | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (diff) | |
download | ydb-b5a989b16cafa8a3b3bc076f1097a0eda6f48c06.tar.gz |
Ydb stable 23-1-2623.1.26
x-stable-origin-commit: 22184a7e157553d447f17a2dffc4ea2d32dfd74d
Diffstat (limited to 'library/cpp/actors')
19 files changed, 307 insertions, 66 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 9ed3608223..b67c04b09b 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -29,6 +29,7 @@ namespace NActors { ui32 CapturedActivation = 0; ESendingType CapturedType = ESendingType::Lazy; ESendingType SendingType = ESendingType::Common; + bool IsEnoughCpu = true; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; @@ -436,20 +437,24 @@ namespace NActors { // must be called to wrap any call trasitions from one actor to another template<typename TActor, typename TMethod, typename... TArgs> - static decltype((std::declval<TActor>().*std::declval<TMethod>())(std::declval<TArgs>()...)) - InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) { - struct TRecurseContext: TActorContext { - TActivationContext* Prev; + static std::invoke_result_t<TMethod, TActor, TArgs...> InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) { + struct TRecurseContext : TActorContext { + TActivationContext* const Prev; + TRecurseContext(const TActorId& actorId) : TActorContext(TActivationContext::ActorContextFor(actorId)) - , Prev(TlsActivationContext) { + , Prev(TlsActivationContext) + { TlsActivationContext = this; } + ~TRecurseContext() { + Y_VERIFY(TlsActivationContext == this, "TlsActivationContext mismatch; probably InvokeOtherActor was invoked from a coroutine"); TlsActivationContext = Prev; } } context(actor.SelfId()); - return (actor.*method)(std::forward<TArgs>(args)...); + + return std::invoke(std::forward<TMethod>(method), actor, std::forward<TArgs>(args)...); } virtual void Registered(TActorSystem* sys, const TActorId& owner); @@ -486,6 +491,12 @@ namespace NActors { } protected: + void SetEnoughCpu(bool isEnough) { + if (TlsThreadContext) { + TlsThreadContext->IsEnoughCpu = isEnough; + } + } + void Describe(IOutputStream&) const noexcept override; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index b8896acb34..21cca94d40 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -217,6 +217,11 @@ namespace NActors { CpuManager->GetPoolStats(poolId, poolStats, statsCopy); } + THarmonizerStats TActorSystem::GetHarmonizerStats() const { + return CpuManager->GetHarmonizerStats(); + + } + void TActorSystem::Start() { Y_VERIFY(StartExecuted == false); StartExecuted = true; diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index cd2cfda1bb..9f2483245a 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -299,6 +299,8 @@ namespace NActors { void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const; + THarmonizerStats GetHarmonizerStats() const; + void DeferPreStop(std::function<void()> fn) { DeferredPreStop.push_back(std::move(fn)); } diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h index e2e3861a3b..c3a7588639 100644 --- a/library/cpp/actors/core/cpu_manager.h +++ b/library/cpp/actors/core/cpu_manager.h @@ -40,6 +40,13 @@ namespace NActors { } } + THarmonizerStats GetHarmonizerStats() const { + if (Harmonizer) { + return Harmonizer->GetStats(); + } + return {}; + } + private: IExecutorPool* CreateExecutorPool(ui32 poolId); }; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 6b92edaf41..90ae16ac26 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -80,7 +80,8 @@ namespace NActors { Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType); if (!Event) { - Event.Reset(TEventType::Load(Buffer.Get())); + static TEventSerializedData empty; + Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty)); } if (Event) { diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 2d388fceeb..38058df749 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -228,7 +228,7 @@ namespace NActors { return result; } - static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) { + static IEventBase* Load(TEventSerializedData *input) { THolder<TEventPBBase> ev(new TEv()); if (!input->GetSize()) { Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString()); diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index eab007bc15..7024d338d5 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -66,7 +66,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { } UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser); - THolder<IEventBase> ev2 = THolder(TEventTo::Load(buffers)); + THolder<IEventBase> ev2 = THolder(TEventTo::Load(buffers.Get())); TEventTo& msg2 = static_cast<TEventTo&>(*ev2); UNIT_ASSERT_VALUES_EQUAL(msg2.Record.GetMeta(), msg.Record.GetMeta()); UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(0)), msg.GetPayload(msg.Record.GetPayloadId(0))); @@ -142,7 +142,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { // deserialize auto data = MakeIntrusive<TEventSerializedData>(ser1, false); - THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data))); + THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data.Get()))); UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization auto& record = parsedEvent->GetRecord(); UNIT_ASSERT_VALUES_EQUAL(record.GetMeta(), msg.GetMeta()); diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index c7c85e61fd..f4def74077 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -13,6 +13,13 @@ namespace NActors { struct TCpuConsumption { double ConsumedUs = 0; double BookedUs = 0; + ui64 NotEnoughCpuExecutions = 0; + + void Add(const TCpuConsumption& other) { + ConsumedUs += other.ConsumedUs; + BookedUs += other.BookedUs; + NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; + } }; class IExecutorPool : TNonCopyable { diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index de04105991..fc87daed77 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -337,7 +337,7 @@ namespace NActors { poolStats.DefaultThreadCount = DefaultThreadCount; poolStats.MaxThreadCount = MaxThreadCount; if (Harmonizer) { - TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId); + TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); poolStats.IsNeedy = stats.IsNeedy; poolStats.IsStarved = stats.IsStarved; poolStats.IsHoggish = stats.IsHoggish; @@ -345,6 +345,10 @@ namespace NActors { poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; + poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; + poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; + poolStats.MaxBookedCpuUs = stats.MaxBookedCpu; + poolStats.MinBookedCpuUs = stats.MinBookedCpu; } statsCopy.resize(PoolThreads + 1); @@ -500,7 +504,7 @@ namespace NActors { TThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStats(stats); - return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)}; + return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 3c5dc2c2b4..f05b1d4479 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -364,7 +364,8 @@ namespace NActors { } #undef EXECUTE_MAILBOX hpnow = GetCycleCountFast(); - execCycles += hpnow - hpprev; + i64 currentExecCycles = hpnow - hpprev; + execCycles += currentExecCycles; hpprev = hpnow; execCount++; if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval) @@ -377,6 +378,11 @@ namespace NActors { nonExecCycles = 0; Ctx.UpdateThreadTime(); } + + if (!TlsThreadContext->IsEnoughCpu) { + Ctx.IncreaseNotEnoughCpuExecutions(); + TlsThreadContext->IsEnoughCpu = true; + } } } LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index e2fd0c5f24..a1838ee2b0 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -19,8 +19,8 @@ constexpr bool CheckBinaryPower(ui64 value) { return !(value & (value - 1)); } +template <ui8 HistoryBufferSize = 8> struct TValueHistory { - static constexpr ui64 HistoryBufferSize = 8; static_assert(CheckBinaryPower(HistoryBufferSize)); double History[HistoryBufferSize] = {0.0}; @@ -31,32 +31,87 @@ struct TValueHistory { ui64 AccumulatedTs = 0; template <bool WithTail=false> - double GetAvgPartForLastSeconds(ui8 seconds) { - double sum = AccumulatedUs; + double Accumulate(auto op, auto comb, ui8 seconds) { + double acc = AccumulatedUs; size_t idx = HistoryIdx; ui8 leftSeconds = seconds; + if constexpr (!WithTail) { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + acc = History[idx]; + } do { idx--; leftSeconds--; if (idx >= HistoryBufferSize) { idx = HistoryBufferSize - 1; } - if (WithTail || leftSeconds) { - sum += History[idx]; + if constexpr (WithTail) { + acc = op(acc, History[idx]); + } else if (leftSeconds) { + acc = op(acc, History[idx]); } else { ui64 tsInSecond = Us2Ts(1'000'000.0); - sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond; + acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond); } } while (leftSeconds); - double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0); - double avg = sum / duration; - return avg; + double duration = 1'000'000.0 * seconds; + if constexpr (WithTail) { + duration += Ts2Us(AccumulatedTs); + } + return comb(acc, duration); + } + + template <bool WithTail=false> + double GetAvgPartForLastSeconds(ui8 seconds) { + auto sum = [](double acc, double value) { + return acc + value; + }; + auto avg = [](double sum, double duration) { + return sum / duration; + }; + return Accumulate<WithTail>(sum, avg, seconds); } double GetAvgPart() { return GetAvgPartForLastSeconds<true>(HistoryBufferSize); } + double GetMaxForLastSeconds(ui8 seconds) { + auto max = [](const double& acc, const double& value) { + return Max(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + return Accumulate<false>(max, fst, seconds); + } + + double GetMax() { + return GetMaxForLastSeconds(HistoryBufferSize); + } + + i64 GetMaxInt() { + return static_cast<i64>(GetMax()); + } + + double GetMinForLastSeconds(ui8 seconds) { + auto min = [](const double& acc, const double& value) { + return Min(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + return Accumulate<false>(min, fst, seconds); + } + + double GetMin() { + return GetMinForLastSeconds(HistoryBufferSize); + } + + i64 GetMinInt() { + return static_cast<i64>(GetMin()); + } + void Register(ui64 ts, double valueUs) { if (ts < LastTs) { LastTs = ts; @@ -101,8 +156,8 @@ struct TValueHistory { }; struct TThreadInfo { - TValueHistory Consumed; - TValueHistory Booked; + TValueHistory<8> Consumed; + TValueHistory<8> Booked; }; struct TPoolInfo { @@ -116,6 +171,8 @@ struct TPoolInfo { NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; ui32 MaxAvgPingUs = 0; ui64 LastUpdateTs = 0; + ui64 NotEnoughCpuExecutions = 0; + ui64 NewNotEnoughCpuExecutions = 0; TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish TAtomic IncreasingThreadsByNeedyState = 0; @@ -123,12 +180,20 @@ struct TPoolInfo { TAtomic DecreasingThreadsByHoggishState = 0; TAtomic PotentialMaxThreadCount = 0; + TValueHistory<16> Consumed; + TValueHistory<16> Booked; + + TAtomic MaxConsumedCpu = 0; + TAtomic MinConsumedCpu = 0; + TAtomic MaxBookedCpu = 0; + TAtomic MinBookedCpu = 0; + bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); double GetlastSecondPoolBooked(i16 threadIdx); double GetConsumed(i16 threadIdx); double GetlastSecondPoolConsumed(i16 threadIdx); - void PullStats(ui64 ts); + TCpuConsumption PullStats(ui64 ts); i16 GetThreadCount(); void SetThreadCount(i16 threadCount); bool IsAvgPingGood(); @@ -167,15 +232,26 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { } #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] -void TPoolInfo::PullStats(ui64 ts) { +TCpuConsumption TPoolInfo::PullStats(ui64 ts) { + TCpuConsumption acc; for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { TThreadInfo &threadInfo = ThreadInfo[threadIdx]; TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + acc.Add(cpuConsumption); threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); } + Consumed.Register(ts, acc.ConsumedUs); + RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); + RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + Booked.Register(ts, acc.BookedUs); + RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); + RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); + NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; + NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; + return acc; } #undef UNROLL_HISTORY @@ -206,6 +282,14 @@ private: std::vector<TPoolInfo> Pools; std::vector<ui16> PriorityOrder; + TValueHistory<16> Consumed; + TValueHistory<16> Booked; + + TAtomic MaxConsumedCpu = 0; + TAtomic MinConsumedCpu = 0; + TAtomic MaxBookedCpu = 0; + TAtomic MinBookedCpu = 0; + void PullStats(ui64 ts); void HarmonizeImpl(ui64 ts); void CalculatePriorityOrder(); @@ -217,7 +301,8 @@ public: void DeclareEmergency(ui64 ts) override; void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; void Enable(bool enable) override; - TPoolHarmonizedStats GetPoolStats(i16 poolId) const override; + TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; + THarmonizerStats GetStats() const override; }; THarmonizer::THarmonizer(ui64 ts) { @@ -232,13 +317,21 @@ double THarmonizer::Rescale(double value) const { } void THarmonizer::PullStats(ui64 ts) { + TCpuConsumption acc; for (TPoolInfo &pool : Pools) { - pool.PullStats(ts); + TCpuConsumption consumption = pool.PullStats(ts); + acc.Add(consumption); } + Consumed.Register(ts, acc.ConsumedUs); + RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); + RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + Booked.Register(ts, acc.BookedUs); + RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); + RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); } Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { - return consumed < booked * 0.7; + return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; } Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { @@ -273,7 +366,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { isStarvedPresent = true; } ui32 currentThreadCount = pool.GetThreadCount(); - bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount; + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount; if (pool.AvgPingCounter) { if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { isNeedy = false; @@ -304,6 +397,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt)); } double overbooked = consumed - booked; + if (overbooked < 0) { + isStarvedPresent = false; + } if (isStarvedPresent) { // last_starved_at_consumed_value = сумма по всем пулам consumed; // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, @@ -319,7 +415,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { TPoolInfo &pool = Pools[poolIdx]; i64 threadCount = pool.GetThreadCount(); while (threadCount > pool.DefaultThreadCount) { - pool.SetThreadCount(threadCount - 1); + pool.SetThreadCount(--threadCount); AtomicIncrement(pool.DecreasingThreadsByStarvedState); overbooked--; LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); @@ -425,13 +521,17 @@ IHarmonizer* MakeHarmonizer(ui64 ts) { return new THarmonizer(ts); } -TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { +TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { const TPoolInfo &pool = Pools[poolId]; ui64 flags = RelaxedLoad(&pool.LastFlags); - return TPoolHarmonizedStats { + return TPoolHarmonizerStats{ .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxConsumedCpu)), + .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MinConsumedCpu)), + .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxBookedCpu)), + .MinBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MinBookedCpu)), .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast<bool>(flags & 1), .IsStarved = static_cast<bool>(flags & 2), @@ -439,4 +539,13 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { }; } +THarmonizerStats THarmonizer::GetStats() const { + return THarmonizerStats{ + .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)), + .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)), + .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)), + .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)), + }; +} + } diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index bc6b938fe8..7c66ff54c6 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -6,16 +6,27 @@ namespace NActors { class IExecutorPool; - struct TPoolHarmonizedStats { + struct TPoolHarmonizerStats { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i64 MaxConsumedCpu = 0.0; + i64 MinConsumedCpu = 0.0; + i64 MaxBookedCpu = 0.0; + i64 MinBookedCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; }; + struct THarmonizerStats { + i64 MaxConsumedCpu = 0.0; + i64 MinConsumedCpu = 0.0; + i64 MaxBookedCpu = 0.0; + i64 MinBookedCpu = 0.0; + }; + // Pool cpu harmonizer class IHarmonizer { public: @@ -24,7 +35,8 @@ namespace NActors { virtual void DeclareEmergency(ui64 ts) = 0; virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0; virtual void Enable(bool enable) = 0; - virtual TPoolHarmonizedStats GetPoolStats(i16 poolId) const = 0; + virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0; + virtual THarmonizerStats GetStats() const = 0; }; IHarmonizer* MakeHarmonizer(ui64 ts); diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 4c664a964a..5d61c9f87c 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -63,6 +63,10 @@ namespace NActors { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i64 MaxConsumedCpuUs = 0; + i64 MinConsumedCpuUs = 0; + i64 MaxBookedCpuUs = 0; + i64 MinBookedCpuUs = 0; i16 WrongWakenedThreadCount = 0; i16 CurrentThreadCount = 0; i16 PotentialMaxThreadCount = 0; @@ -100,6 +104,7 @@ namespace NActors { ui64 MailboxPushedOutBySoftPreemption = 0; ui64 MailboxPushedOutByTime = 0; ui64 MailboxPushedOutByEventCount = 0; + ui64 NotEnoughCpuExecutions = 0; TExecutorThreadStats(size_t activityVecSize = 5) // must be not empty as 0 used as default : ElapsedTicksByActivity(activityVecSize) @@ -136,6 +141,7 @@ namespace NActors { MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); + NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions); ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram); EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram); diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index c3a2947df1..cc8da2ff77 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -104,7 +104,7 @@ namespace NActors { i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0; double usec = NHPTimer::GetSeconds(ts) * 1000000.0; Stats->ActivationTimeHistogram.Add(usec); - Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec); + RelaxedStore(&Stats->WorstActivationTimeUs, Max(Stats->WorstActivationTimeUs, (ui64)usec)); return usec; } @@ -140,6 +140,11 @@ namespace NActors { RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks)); RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime()); } + + void IncreaseNotEnoughCpuExecutions() { + RelaxedStore(&WorkerStats.NotEnoughCpuExecutions, + (ui64)RelaxedLoad(&WorkerStats.NotEnoughCpuExecutions) + 1); + } #else void GetCurrentStats(TExecutorThreadStats&) const {} inline void AddElapsedCycles(ui32, i64) {} @@ -159,6 +164,7 @@ namespace NActors { i64 AddEventProcessingStats(i64, i64, ui32, ui64) { return 0; } void UpdateActorsStats(size_t, IExecutorPool*) {} void UpdateThreadTime() {} + void IncreaseNotEnoughCpuExecutions() {} #endif void Switch(IExecutorPool* executor, diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index d80951827d..51ace0e3cc 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -135,6 +135,11 @@ private: NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; + NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; THistogramCounters LegacyActivationTimeHistogram; @@ -190,6 +195,11 @@ private: IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true); DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true); DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); + NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); + MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); + MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); + MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); + MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); ActivationTimeHistogram = PoolGroup->GetHistogram( @@ -237,6 +247,7 @@ private: *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState; *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState; *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState; + *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions; LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); ActivationTimeHistogram->Reset(); @@ -281,6 +292,38 @@ private: } }; + struct TActorSystemCounters { + TIntrusivePtr<NMonitoring::TDynamicCounters> Group; + + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + + void Init(NMonitoring::TDynamicCounters* group) { + Group = group; + + MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false); + MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); + MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); + MinBookedCpu = Group->GetCounter("MinBookedCpu", false); + } + + void Set(const THarmonizerStats& harmonizerStats) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu; + *MinConsumedCpu = harmonizerStats.MinConsumedCpu; + *MaxBookedCpu = harmonizerStats.MaxBookedCpu; + *MinBookedCpu = harmonizerStats.MinBookedCpu; +#else + Y_UNUSED(poolStats); + Y_UNUSED(stats); + Y_UNUSED(numThreads); +#endif + } + + }; + public: static constexpr IActor::EActivityType ActorActivityType() { return IActor::ACTORLIB_STATS; @@ -297,6 +340,7 @@ public: for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); } + ActorSystemCounters.Init(Counters.Get()); } void Bootstrap(const TActorContext& ctx) { @@ -322,6 +366,8 @@ private: ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); } + THarmonizerStats harmonizerStats = ctx.ExecutorThread.ActorSystem->GetHarmonizerStats(); + ActorSystemCounters.Set(harmonizerStats); OnWakeup(ctx); @@ -343,6 +389,7 @@ protected: NMonitoring::TDynamicCounterPtr Counters; TVector<TExecutorPoolCounters> PoolCounters; + TActorSystemCounters ActorSystemCounters; }; } // NActors diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..04e17c24ab 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -146,8 +146,7 @@ namespace NActors { wrapper.Wait(); } - bool DrainReadEnd() { - size_t totalRead = 0; + void DrainReadEnd() { char buffer[4096]; for (;;) { ssize_t n = ReadEnd.Read(buffer, sizeof(buffer)); @@ -162,37 +161,38 @@ namespace NActors { } } else { Y_VERIFY(n); - totalRead += n; } } - return totalRead; } bool ProcessSyncOpQueue() { - if (DrainReadEnd()) { - Y_VERIFY(!SyncOperationsQ.IsEmpty()); - do { - TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); - if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { - static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); - op->SignalDone(); - } else if (std::get_if<TPollerExitThread>(&op->Operation)) { - op->SignalDone(); - return false; // terminate the thread - } else if (std::get_if<TPollerWakeup>(&op->Operation)) { - op->SignalDone(); - } else { - Y_FAIL(); - } - } while (SyncOperationsQ.Pop()); - } + Y_VERIFY(!SyncOperationsQ.IsEmpty()); + do { + TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); + if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { + static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); + op->SignalDone(); + } else if (std::get_if<TPollerExitThread>(&op->Operation)) { + op->SignalDone(); + return false; // terminate the thread + } else if (std::get_if<TPollerWakeup>(&op->Operation)) { + op->SignalDone(); + } else { + Y_FAIL(); + } + } while (SyncOperationsQ.Pop()); return true; } void *ThreadProc() override { SetCurrentThreadName("network poller"); - while (ProcessSyncOpQueue()) { - static_cast<TDerived&>(*this).ProcessEventsInLoop(); + for (;;) { + if (static_cast<TDerived&>(*this).ProcessEventsInLoop()) { // need to process the queue + DrainReadEnd(); + if (!ProcessSyncOpQueue()) { + break; + } + } } return nullptr; } diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h index 4cb0a58f8d..31c1144794 100644 --- a/library/cpp/actors/interconnect/poller_actor_darwin.h +++ b/library/cpp/actors/interconnect/poller_actor_darwin.h @@ -45,18 +45,20 @@ namespace NActors { close(KqDescriptor); } - void ProcessEventsInLoop() { + bool ProcessEventsInLoop() { std::array<struct kevent, 256> events; int numReady = kevent(KqDescriptor, nullptr, 0, events.data(), events.size(), nullptr); if (numReady == -1) { if (errno == EINTR) { - return; + return false; } else { Y_FAIL("kevent() failed with %s", strerror(errno)); } } + bool res = false; + for (int i = 0; i < numReady; ++i) { const struct kevent& ev = events[i]; if (ev.udata) { @@ -65,8 +67,12 @@ namespace NActors { const bool read = error || ev.filter == EVFILT_READ; const bool write = error || ev.filter == EVFILT_WRITE; Notify(it, read, write); + } else { + res = true; } } + + return res; } void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h index dd4f7c0124..6bd2cc258f 100644 --- a/library/cpp/actors/interconnect/poller_actor_linux.h +++ b/library/cpp/actors/interconnect/poller_actor_linux.h @@ -30,7 +30,7 @@ namespace NActors { close(EpollDescriptor); } - void ProcessEventsInLoop() { + bool ProcessEventsInLoop() { // preallocated array for events std::array<epoll_event, 256> events; @@ -42,12 +42,14 @@ namespace NActors { // check return status for any errors if (numReady == -1) { if (errno == EINTR) { - return; // restart the call a bit later + return false; // restart the call a bit later } else { Y_FAIL("epoll_wait() failed with %s", strerror(errno)); } } + bool res = false; + for (int i = 0; i < numReady; ++i) { const epoll_event& ev = events[i]; if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) { @@ -73,8 +75,12 @@ namespace NActors { // issue notifications Notify(record, read, write); + } else { + res = true; } } + + return res; } void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { @@ -110,5 +116,5 @@ namespace NActors { }; using TPollerThread = TEpollThread; - + } // namespace NActors diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h index 4b4caa0ebd..e593cbafd1 100644 --- a/library/cpp/actors/interconnect/poller_actor_win.h +++ b/library/cpp/actors/interconnect/poller_actor_win.h @@ -23,7 +23,7 @@ namespace NActors { Stop(); } - void ProcessEventsInLoop() { + bool ProcessEventsInLoop() { fd_set readfds, writefds, exceptfds; FD_ZERO(&readfds); @@ -51,12 +51,14 @@ namespace NActors { if (res == -1) { const int err = LastSocketError(); if (err == EINTR) { - return; // try a bit later + return false; // try a bit later } else { Y_FAIL("select() failed with %s", strerror(err)); } } + bool flag = false; + with_lock (Mutex) { for (const auto& [fd, record] : Descriptors) { if (record) { @@ -70,9 +72,13 @@ namespace NActors { record->Flags &= ~WRITE; } Notify(record.Get(), read, write); + } else { + flag = true; } } } + + return flag; } void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { |