diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-02-09 11:44:35 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-02-09 11:46:17 +0300 |
commit | b0967c30d3706b650b679fe119b6bd7b0924d328 (patch) | |
tree | 25579dfda238c2cc5b00324878303b3a05d09f45 /library | |
parent | 9b78acb9998e4a817a21fe60443c7c5d6a06b947 (diff) | |
download | ydb-22.5.10.tar.gz |
Ydb stable 22-5-1022.5.10stable-22-5
x-stable-origin-commit: f696baac1a4b8d48eb52b52b35930eef6d0eab42
Diffstat (limited to 'library')
34 files changed, 3288 insertions, 376 deletions
diff --git a/library/cpp/actors/core/CMakeLists.txt b/library/cpp/actors/core/CMakeLists.txt index 64c617307c..51379561db 100644 --- a/library/cpp/actors/core/CMakeLists.txt +++ b/library/cpp/actors/core/CMakeLists.txt @@ -42,6 +42,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_io.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_thread.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/interconnect.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index a6752f7d4f..ab53e3ec3e 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -543,8 +543,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) { setup->NodeId = 0; setup->ExecutorsCount = 1; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); + + ui64 ts = GetCycleCountFast(); + THolder<IHarmonizer> harmonizer(MakeHarmonizer(ts)); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic"); + setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic", harmonizer.Get()); + harmonizer->AddPool(setup->Executors[i].Get()); } setup->Scheduler = new TBasicSchedulerThread; diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 40499d7586..4801350067 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -124,10 +124,55 @@ namespace NActors { return 1; } + virtual i16 GetPriority() const { + return 0; + } + // generic virtual TAffinity* Affinity() const = 0; virtual void SetRealTimeMode() const {} + + virtual ui32 GetThreadCount() const { + return 1; + }; + + virtual void SetThreadCount(ui32 threads) { + Y_UNUSED(threads); + } + + virtual i16 GetBlockingThreadCount() const { + return 0; + } + + virtual i16 GetDefaultThreadCount() const { + return 1; + } + + virtual i16 GetMinThreadCount() const { + return 1; + } + + virtual i16 GetMaxThreadCount() const { + return 1; + + } + + virtual bool IsThreadBeingStopped(i16 threadIdx) const { + Y_UNUSED(threadIdx); + return false; + } + + virtual double GetThreadConsumedUs(i16 threadIdx) { + Y_UNUSED(threadIdx); + return 0.0; + } + + virtual double GetThreadBookedUs(i16 threadIdx) { + Y_UNUSED(threadIdx); + return 0.0; + } + }; // could be proxy to in-pool schedulers (for NUMA-aware executors) diff --git a/library/cpp/actors/core/balancer.cpp b/library/cpp/actors/core/balancer.cpp index 3dcc45c56b..d82701bbfb 100644 --- a/library/cpp/actors/core/balancer.cpp +++ b/library/cpp/actors/core/balancer.cpp @@ -2,8 +2,9 @@ #include "probes.h" -#include <library/cpp/actors/util/intrinsics.h> +#include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/intrinsics.h> #include <util/system/spinlock.h> @@ -27,11 +28,11 @@ namespace NActors { TLevel() {} - TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) { + TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) { ScaleFactor = double(currentCpus) / cfg.Cpus; - if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu + if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu LoadClass = Underloaded; - } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency + } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency LoadClass = Overloaded; } else { LoadClass = Moderate; @@ -82,6 +83,8 @@ namespace NActors { TBalancerConfig Config; public: + + ui64 GetPeriodUs() override; // Setup TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts); bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override; @@ -238,9 +241,12 @@ namespace NActors { } // Compute levels - pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle); - pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized - pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1); + pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle, + pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs); + pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle, + 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized + pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1, + pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs); // Prepare for balancing pool.PrevCpus = pool.CurrentCpus; @@ -263,7 +269,7 @@ namespace NActors { TPool& from = **fromIter; if (from.CurrentCpus == from.PrevCpus && // if not balanced yet from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated - from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement + from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement { MoveCpu(from, to); from.CurrentCpus--; @@ -295,6 +301,10 @@ namespace NActors { Lock.Release(); } + ui64 TBalancer::GetPeriodUs() { + return Config.PeriodUs; + } + IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) { return new TBalancer(config, unitedPools, ts); } diff --git a/library/cpp/actors/core/balancer.h b/library/cpp/actors/core/balancer.h index 9763ec79e1..e1f6f33bf3 100644 --- a/library/cpp/actors/core/balancer.h +++ b/library/cpp/actors/core/balancer.h @@ -10,6 +10,8 @@ namespace NActors { ui64 Ts = 0; // Measurement timestamp ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex + ui64 WorstActivationTimeUs = 0; + ui64 ExpectedLatencyIncreaseUs = 0; }; // Pool cpu balancer @@ -20,6 +22,7 @@ namespace NActors { virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0; virtual void Balance() = 0; virtual void Unlock() = 0; + virtual ui64 GetPeriodUs() = 0; // TODO: add method for reconfiguration on fly }; diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 0d65815fd9..0bf4b871d7 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -41,6 +41,10 @@ namespace NActors { ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; int RealtimePriority = 0; ui32 MaxActivityType = 5; + i16 MinThreadCount = 0; + i16 MaxThreadCount = 0; + i16 DefaultThreadCount = 0; + i16 Priority = 0; }; struct TIOExecutorPoolConfig { @@ -88,11 +92,18 @@ namespace NActors { TBalancerConfig Balancer; }; + struct TSelfPingInfo { + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs; + }; + struct TCpuManagerConfig { TUnitedWorkersConfig UnitedWorkers; TVector<TBasicExecutorPoolConfig> Basic; TVector<TIOExecutorPoolConfig> IO; TVector<TUnitedExecutorPoolConfig> United; + TVector<TSelfPingInfo> PingInfoByPool; ui32 GetExecutorsCount() const { return Basic.size() + IO.size() + United.size(); diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp index 39089b5d83..0736caa539 100644 --- a/library/cpp/actors/core/cpu_manager.cpp +++ b/library/cpp/actors/core/cpu_manager.cpp @@ -16,10 +16,18 @@ namespace NActors { UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get())); } + ui64 ts = GetCycleCountFast(); + Harmonizer.Reset(MakeHarmonizer(ts)); + Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]); for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx].Reset(CreateExecutorPool(excIdx)); + if (excIdx < Config.PingInfoByPool.size()) { + Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]); + } else { + Harmonizer->AddPool(Executors[excIdx].Get()); + } } } @@ -89,7 +97,7 @@ namespace NActors { IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) { for (TBasicExecutorPoolConfig& cfg : Config.Basic) { if (cfg.PoolId == poolId) { - return new TBasicExecutorPool(cfg); + return new TBasicExecutorPool(cfg, Harmonizer.Get()); } } for (TIOExecutorPoolConfig& cfg : Config.IO) { diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h index 454035477b..42bede91b8 100644 --- a/library/cpp/actors/core/cpu_manager.h +++ b/library/cpp/actors/core/cpu_manager.h @@ -1,6 +1,7 @@ #pragma once #include "actorsystem.h" +#include "harmonizer.h" #include "executor_pool_basic.h" #include "executor_pool_io.h" #include "executor_pool_united.h" @@ -11,6 +12,7 @@ namespace NActors { TArrayHolder<TAutoPtr<IExecutorPool>> Executors; THolder<TUnitedWorkers> UnitedWorkers; THolder<IBalancer> Balancer; + THolder<IHarmonizer> Harmonizer; TCpuManagerConfig Config; public: explicit TCpuManager(THolder<TActorSystemSetup>& setup) diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 4dce16939a..00e557fcb4 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -18,11 +18,16 @@ namespace NActors { ui32 threads, ui64 spinThreshold, const TString& poolName, + IHarmonizer *harmonizer, TAffinity* affinity, TDuration timePerMailbox, ui32 eventsPerMailbox, int realtimePriority, - ui32 maxActivityType) + ui32 maxActivityType, + i16 minThreadCount, + i16 maxThreadCount, + i16 defaultThreadCount, + i16 priority) : TExecutorPoolBase(poolId, threads, affinity, maxActivityType) , SpinThreshold(spinThreshold) , SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles @@ -34,21 +39,50 @@ namespace NActors { , ThreadUtilization(0) , MaxUtilizationCounter(0) , MaxUtilizationAccumulator(0) + , WrongWakenedThreadCount(0) , ThreadCount(threads) + , MinThreadCount(minThreadCount) + , MaxThreadCount(maxThreadCount) + , DefaultThreadCount(defaultThreadCount) + , Harmonizer(harmonizer) + , Priority(priority) { + i16 limit = Min(threads, (ui32)Max<i16>()); + if (DefaultThreadCount) { + DefaultThreadCount = Min(DefaultThreadCount, limit); + } else { + DefaultThreadCount = limit; + } + + MaxThreadCount = Min(Max(MaxThreadCount, DefaultThreadCount), limit); + + if (MinThreadCount) { + MinThreadCount = Max((i16)1, Min(MinThreadCount, DefaultThreadCount)); + } else { + MinThreadCount = DefaultThreadCount; + } + ThreadCount = MaxThreadCount; + auto semaphore = TSemaphore(); + semaphore.CurrentThreadCount = ThreadCount; + Semaphore = semaphore.ConverToI64(); } - TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg) + TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer) : TBasicExecutorPool( cfg.PoolId, cfg.Threads, cfg.SpinThreshold, cfg.PoolName, + harmonizer, new TAffinity(cfg.Affinity), cfg.TimePerMailbox, cfg.EventsPerMailbox, cfg.RealtimePriority, - cfg.MaxActivityType + cfg.MaxActivityType, + cfg.MinThreadCount, + cfg.MaxThreadCount, + cfg.DefaultThreadCount, + cfg.Priority ) {} @@ -56,126 +90,166 @@ namespace NActors { Threads.Destroy(); } + bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) { + do { + if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + if (threadCtx.BlockedPad.Park()) // interrupted + return true; + timers.HPStart = GetCycleCountFast(); + timers.Blocked += timers.HPStart - timers.HPNow; + } + } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag)); + return false; + } + + bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) { + do { + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + if (threadCtx.Pad.Park()) // interrupted + return true; + timers.HPStart = GetCycleCountFast(); + timers.Parked += timers.HPStart - timers.HPNow; + } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag)); + return false; + } + + void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) { + ui64 start = GetCycleCountFast(); + bool doSpin = true; + while (true) { + for (ui32 j = 0; doSpin && j < 12; ++j) { + if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { + doSpin = false; + break; + } + for (ui32 i = 0; i < 12; ++i) { + if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { + SpinLockPause(); + } else { + doSpin = false; + break; + } + } + } + if (!doSpin) { + break; + } + if (RelaxedLoad(&StopFlag)) { + break; + } + } + } + + bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) { +#if defined ACTORSLIB_COLLECT_EXEC_STATS + if (AtomicGetAndIncrement(ThreadUtilization) == 0) { + // Initially counter contains -t0, the pool start timestamp + // When the first thread goes to sleep we add t1, so the counter + // becomes t1-t0 >= 0, or the duration of max utilization so far. + // If the counter was negative and becomes positive, that means + // counter just turned into a duration and we should store that + // duration. Otherwise another thread raced with us and + // subtracted some other timestamp t2. + const i64 t = GetCycleCountFast(); + const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); + if (x < 0 && x + t > 0) + AtomicStore(&MaxUtilizationAccumulator, x + t); + } +#endif + + Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); + + if (SpinThreshold > 0 && !needToBlock) { + // spin configured period + AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); + GoToSpin(threadCtx); + // then - sleep + if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { + if (GoToSleep(threadCtx, timers)) { // interrupted + return true; + } + } + } + } else { + AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); + if (GoToSleep(threadCtx, timers)) { // interrupted + return true; + } + } + + Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + +#if defined ACTORSLIB_COLLECT_EXEC_STATS + if (AtomicDecrement(ThreadUtilization) == 0) { + // When we started sleeping counter contained t1-t0, or the + // last duration of max utilization. Now we subtract t2 >= t1, + // which turns counter negative again, and the next sleep cycle + // at timestamp t3 would be adding some new duration t3-t2. + // If the counter was positive and becomes negative that means + // there are no current races with other threads and we should + // store the last positive duration we observed. Multiple + // threads may be adding and subtracting values in potentially + // arbitrary order, which would cause counter to oscillate + // around zero. When it crosses zero is a good indication of a + // correct value. + const i64 t = GetCycleCountFast(); + const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); + if (x > 0 && x - t < 0) + AtomicStore(&MaxUtilizationAccumulator, x); + } +#endif + return false; + } + ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { ui32 workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); - NHPTimer::STime elapsed = 0; - NHPTimer::STime parked = 0; - NHPTimer::STime blocked = 0; - NHPTimer::STime hpstart = GetCycleCountFast(); - NHPTimer::STime hpnow; + TTimers timers; + + if (Harmonizer) { + LWPROBE(TryToHarmonize, PoolId, PoolName); + Harmonizer->Harmonize(timers.HPStart); + } TThreadCtx& threadCtx = Threads[workerId]; AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) { - do { - if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.BlockedPad.Park()) // interrupted - return 0; - hpstart = GetCycleCountFast(); - blocked += hpstart - hpnow; - } - } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag)); + if (GoToBeBlocked(threadCtx, timers)) { // interrupted + return 0; + } } - const TAtomic x = AtomicDecrement(Semaphore); + bool needToWait = false; + bool needToBlock = false; - if (x < 0) { -#if defined ACTORSLIB_COLLECT_EXEC_STATS - if (AtomicGetAndIncrement(ThreadUtilization) == 0) { - // Initially counter contains -t0, the pool start timestamp - // When the first thread goes to sleep we add t1, so the counter - // becomes t1-t0 >= 0, or the duration of max utilization so far. - // If the counter was negative and becomes positive, that means - // counter just turned into a duration and we should store that - // duration. Otherwise another thread raced with us and - // subtracted some other timestamp t2. - const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); - if (x < 0 && x + t > 0) - AtomicStore(&MaxUtilizationAccumulator, x + t); - } -#endif + TAtomic x = AtomicGet(Semaphore); + do { + i64 oldX = x; + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + needToBlock = semaphore.CurrentSleepThreadCount < 0; + needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount; - Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); - - if (SpinThreshold > 0) { - // spin configured period - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); - ui64 start = GetCycleCountFast(); - bool doSpin = true; - while (true) { - for (ui32 j = 0; doSpin && j < 12; ++j) { - if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { - doSpin = false; - break; - } - for (ui32 i = 0; i < 12; ++i) { - if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { - SpinLockPause(); - } else { - doSpin = false; - break; - } - } - } - if (!doSpin) { - break; - } - if (RelaxedLoad(&StopFlag)) { - break; - } - } - // then - sleep - if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { - do { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.Pad.Park()) // interrupted - return 0; - hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; - } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); - } - } - } else { - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); - do { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.Pad.Park()) // interrupted - return 0; - hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; - } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); + semaphore.OldSemaphore--; + if (needToWait) { + semaphore.CurrentSleepThreadCount++; } - Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); + if (x == oldX) { + break; + } + } while (!StopFlag); -#if defined ACTORSLIB_COLLECT_EXEC_STATS - if (AtomicDecrement(ThreadUtilization) == 0) { - // When we started sleeping counter contained t1-t0, or the - // last duration of max utilization. Now we subtract t2 >= t1, - // which turns counter negative again, and the next sleep cycle - // at timestamp t3 would be adding some new duration t3-t2. - // If the counter was positive and becomes negative that means - // there are no current races with other threads and we should - // store the last positive duration we observed. Multiple - // threads may be adding and subtracting values in potentially - // arbitrary order, which would cause counter to oscillate - // around zero. When it crosses zero is a good indication of a - // correct value. - const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); - if (x > 0 && x - t < 0) - AtomicStore(&MaxUtilizationAccumulator, x); + if (needToWait) { + if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted + return 0; } -#endif } else { AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING); } @@ -183,14 +257,14 @@ namespace NActors { // ok, has work suggested, must dequeue while (!RelaxedLoad(&StopFlag)) { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed); - if (parked > 0) { - wctx.AddParkedCycles(parked); + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.Elapsed); + if (timers.Parked > 0) { + wctx.AddParkedCycles(timers.Parked); } - if (blocked > 0) { - wctx.AddBlockedCycles(blocked); + if (timers.Blocked > 0) { + wctx.AddBlockedCycles(timers.Blocked); } return activation; } @@ -201,22 +275,26 @@ namespace NActors { return 0; } - inline void TBasicExecutorPool::WakeUpLoop() { - for (ui32 i = 0;;) { - TThreadCtx& threadCtx = Threads[i % PoolThreads]; - switch (AtomicLoad(&threadCtx.WaitingFlag)) { + inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { + for (i16 i = 0;;) { + TThreadCtx& threadCtx = Threads[i]; + TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag)); + switch (state) { case TThreadCtx::WS_NONE: case TThreadCtx::WS_RUNNING: - ++i; - break; - case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { - return; + if (++i >= MaxThreadCount) { + i = 0; } break; + case TThreadCtx::WS_ACTIVE: case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { - threadCtx.Pad.Unpark(); + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, state)) { + if (state == TThreadCtx::WS_BLOCKED) { + threadCtx.Pad.Unpark(); + } + if (i >= currentThreadCount) { + AtomicIncrement(WrongWakenedThreadCount); + } return; } break; @@ -228,14 +306,42 @@ namespace NActors { void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { Activations.Push(activation, revolvingCounter); - const TAtomic x = AtomicIncrement(Semaphore); - if (x <= 0) { // we must find someone to wake-up - WakeUpLoop(); + bool needToWakeUp = false; + + TAtomic x = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + do { + needToWakeUp = semaphore.CurrentSleepThreadCount > 0; + i64 oldX = semaphore.ConverToI64(); + semaphore.OldSemaphore++; + if (needToWakeUp) { + semaphore.CurrentSleepThreadCount--; + } + x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), oldX); + if (x == oldX) { + break; + } + semaphore = TSemaphore::GetSemaphore(x); + } while (true); + + if (needToWakeUp) { // we must find someone to wake-up + WakeUpLoop(semaphore.CurrentThreadCount); } } void TBasicExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000); + poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); + poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); + if (Harmonizer) { + TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId); + poolStats.IsNeedy = stats.IsNeedy; + poolStats.IsStarved = stats.IsStarved; + poolStats.IsHoggish = stats.IsHoggish; + poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState; + poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; + poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; + } statsCopy.resize(PoolThreads + 1); // Save counters from the pool object @@ -345,87 +451,71 @@ namespace NActors { with_lock (ChangeThreadsLock) { size_t prevCount = GetThreadCount(); AtomicSet(ThreadCount, threads); - if (prevCount < threads) { - for (size_t i = prevCount; i < threads; ++i) { - bool repeat = true; - while (repeat) { - switch (AtomicGet(Threads[i].BlockedFlag)) { - case TThreadCtx::BS_BLOCKING: - if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) { - // thread not entry to blocked loop - repeat = false; - } - break; - case TThreadCtx::BS_BLOCKED: - // thread entry to blocked loop and we wake it - AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE); - Threads[i].BlockedPad.Unpark(); - repeat = false; - break; - default: - // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block - Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up"); - } - } - } - } else if (prevCount > threads) { - // at first, start to block - for (size_t i = threads; i < prevCount; ++i) { - Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE); - AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING); - } - // after check need to wake up threads - for (size_t idx = threads; idx < prevCount; ++idx) { - TThreadCtx& threadCtx = Threads[idx]; - auto waitingFlag = AtomicGet(threadCtx.WaitingFlag); - auto blockedFlag = AtomicGet(threadCtx.BlockedFlag); - // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go. - // Either go to sleep and it will have to wake up, - // or go to execute task and after completion will be blocked. - while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) { - waitingFlag = AtomicGet(threadCtx.WaitingFlag); - blockedFlag = AtomicGet(threadCtx.BlockedFlag); - } - // next states: - // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block - // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block - // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing - // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked - - if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) { - // need wake up - Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING); - - // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored - constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask; - ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter); - - Activations.Push(emptyMailBoxHint, revolvingCounter); - - auto x = AtomicIncrement(Semaphore); - if (x <= 0) { - // try wake up. if success then go to next thread - switch (waitingFlag){ - case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { - continue; - } - break; - case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { - threadCtx.Pad.Unpark(); - continue; - } - break; - default: - ; // other thread woke this sleeping thread - } - // if thread has already been awakened then we must awaken the other - WakeUpLoop(); - } - } - } + TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore)); + i64 oldX = semaphore.ConverToI64(); + semaphore.CurrentThreadCount = threads; + if (threads > prevCount) { + semaphore.CurrentSleepThreadCount += (i64)threads - prevCount; + semaphore.OldSemaphore -= (i64)threads - prevCount; + } else { + semaphore.CurrentSleepThreadCount -= (i64)prevCount - threads; + semaphore.OldSemaphore += prevCount - threads; } + AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX); + LWPROBE(ThreadCount, PoolId, PoolName, threads, MinThreadCount, MaxThreadCount, DefaultThreadCount); + } + } + + i16 TBasicExecutorPool::GetDefaultThreadCount() const { + return DefaultThreadCount; + } + + i16 TBasicExecutorPool::GetMinThreadCount() const { + return MinThreadCount; + } + + i16 TBasicExecutorPool::GetMaxThreadCount() const { + return MaxThreadCount; + } + + bool TBasicExecutorPool::IsThreadBeingStopped(i16 threadIdx) const { + if ((ui32)threadIdx >= PoolThreads) { + return false; + } + auto blockedFlag = AtomicGet(Threads[threadIdx].BlockedFlag); + if (blockedFlag == TThreadCtx::BS_BLOCKING) { + return true; + } + return false; + } + + double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) { + if ((ui32)threadIdx >= PoolThreads) { + return 0; + } + TThreadCtx& threadCtx = Threads[threadIdx]; + TExecutorThreadStats stats; + threadCtx.Thread->GetCurrentStats(stats); + return Ts2Us(stats.ElapsedTicks); + } + + double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) { + if ((ui32)threadIdx >= PoolThreads) { + return 0; } + TThreadCtx& threadCtx = Threads[threadIdx]; + TExecutorThreadStats stats; + threadCtx.Thread->GetCurrentStats(stats); + return stats.CpuNs / 1000.0; + } + + i16 TBasicExecutorPool::GetBlockingThreadCount() const { + TAtomic x = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + return -Min<i16>(semaphore.CurrentSleepThreadCount, 0); + } + + i16 TBasicExecutorPool::GetPriority() const { + return Priority; } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 023190f7fe..cd94a998f1 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -4,6 +4,7 @@ #include "executor_thread.h" #include "scheduler_queue.h" #include "executor_pool_base.h" +#include "harmonizer.h" #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/actors/util/threadparkpad.h> #include <library/cpp/monlib/dynamic_counters/counters.h> @@ -45,6 +46,14 @@ namespace NActors { } }; + struct TTimers { + NHPTimer::STime Elapsed = 0; + NHPTimer::STime Parked = 0; + NHPTimer::STime Blocked = 0; + NHPTimer::STime HPStart = GetCycleCountFast(); + NHPTimer::STime HPNow; + }; + const ui64 SpinThreshold; const ui64 SpinThresholdCycles; @@ -62,11 +71,42 @@ namespace NActors { TAtomic ThreadUtilization; TAtomic MaxUtilizationCounter; TAtomic MaxUtilizationAccumulator; + TAtomic WrongWakenedThreadCount; TAtomic ThreadCount; TMutex ChangeThreadsLock; + i16 MinThreadCount; + i16 MaxThreadCount; + i16 DefaultThreadCount; + IHarmonizer *Harmonizer; + + const i16 Priority = 0; + public: + struct TSemaphore { + i64 OldSemaphore = 0; // 34 bits + // Sign bit + i16 CurrentSleepThreadCount = 0; // 14 bits + // Sign bit + i16 CurrentThreadCount = 0; // 14 bits + + inline i64 ConverToI64() { + i64 value = (1ll << 34) + OldSemaphore; + return value + | (((i64)CurrentSleepThreadCount + (1 << 14)) << 35) + | ((i64)CurrentThreadCount << 50); + } + + static inline TSemaphore GetSemaphore(i64 value) { + TSemaphore semaphore; + semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34); + semaphore.CurrentSleepThreadCount = ((value >> 35) & 0x7fff) - (1 << 14); + semaphore.CurrentThreadCount = (value >> 50) & 0x3fff; + return semaphore; + } + }; + static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX; static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX; @@ -74,12 +114,17 @@ namespace NActors { ui32 threads, ui64 spinThreshold, const TString& poolName = "", + IHarmonizer *harmonizer = nullptr, TAffinity* affinity = nullptr, TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX, int realtimePriority = 0, - ui32 maxActivityType = 1); - explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg); + ui32 maxActivityType = 1, + i16 minThreadCount = 0, + i16 maxThreadCount = 0, + i16 defaultThreadCount = 0, + i16 priority = 0); + explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer); ~TBasicExecutorPool(); ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; @@ -102,10 +147,22 @@ namespace NActors { void SetRealTimeMode() const override; - ui32 GetThreadCount() const; - void SetThreadCount(ui32 threads); + ui32 GetThreadCount() const override; + void SetThreadCount(ui32 threads) override; + i16 GetDefaultThreadCount() const override; + i16 GetMinThreadCount() const override; + i16 GetMaxThreadCount() const override; + bool IsThreadBeingStopped(i16 threadIdx) const override; + double GetThreadConsumedUs(i16 threadIdx) override; + double GetThreadBookedUs(i16 threadIdx) override; + i16 GetBlockingThreadCount() const override; + i16 GetPriority() const override; private: - void WakeUpLoop(); + void WakeUpLoop(i16 currentThreadCount); + bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); + void GoToSpin(TThreadCtx& threadCtx); + bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers); + bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers); }; } diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 76dff693af..067fb30a1c 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -6,10 +6,14 @@ #include <library/cpp/actors/util/should_continue.h> #include <library/cpp/testing/unittest/registar.h> -#include <library/cpp/actors/protos/unittests.pb.h> using namespace NActors; +#define VALUES_EQUAL(a, b, ...) \ + UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \ + << ' ' << (i64)semaphore.CurrentSleepThreadCount \ + << ' ' << (i64)semaphore.CurrentThreadCount __VA_ARGS__); + //////////////////////////////////////////////////////////////////////////////// struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { @@ -90,138 +94,59 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool) Y_UNIT_TEST_SUITE(BasicExecutorPool) { - Y_UNIT_TEST(DecreaseIncreaseThreadsCount) { - const size_t msgCount = 1e4; - const size_t size = 4; - const size_t halfSize = size / 2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + Y_UNIT_TEST(Semaphore) { + TBasicExecutorPool::TSemaphore semaphore; + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(0); - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); + VALUES_EQUAL(0, semaphore.ConverToI64()); + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(-1); + VALUES_EQUAL(-1, semaphore.ConverToI64()); + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(1); + VALUES_EQUAL(1, semaphore.ConverToI64()); - executorPool->SetThreadCount(halfSize); - TTestSenderActor* actors[size]; - TActorId actorIds[size]; - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); + for (i64 value = -1'000'000; value <= 1'000'000; ++value) { + VALUES_EQUAL(TBasicExecutorPool::TSemaphore::GetSemaphore(value).ConverToI64(), value); } - const int testCount = 2; - - TExecutorPoolStats poolStats[testCount]; - TVector<TExecutorThreadStats> statsCopy[testCount]; - - for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); + for (i8 sleepThreads = -10; sleepThreads <= 10; ++sleepThreads) { + + semaphore = TBasicExecutorPool::TSemaphore(); + semaphore.CurrentSleepThreadCount = sleepThreads; + i64 initialValue = semaphore.ConverToI64(); + + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(initialValue - 1); + VALUES_EQUAL(-1, semaphore.OldSemaphore); + + i64 value = initialValue; + value -= 100; + for (i32 expected = -100; expected <= 100; ++expected) { + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); + UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.CurrentThreadCount); + UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.CurrentThreadCount); + semaphore = TBasicExecutorPool::TSemaphore(); + semaphore.OldSemaphore = expected; + semaphore.CurrentSleepThreadCount = sleepThreads; + UNIT_ASSERT_VALUES_EQUAL(semaphore.ConverToI64(), value); + value++; } - for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - Sleep(TDuration::MilliSeconds(100)); - for (size_t i = 0; i < size; ++i) { - actors[i]->Stop(); + for (i32 expected = 101; expected >= -101; --expected) { + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); + UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.CurrentThreadCount); + UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.CurrentThreadCount); + value--; } - - executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); } - for (size_t i = 1; i <= halfSize; ++i) { - UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - - for (size_t i = halfSize + 1; i <= size; ++i) { - UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - - executorPool->SetThreadCount(size); - - for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); - } - for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - Sleep(TDuration::MilliSeconds(100)); - - for (size_t i = 0; i < size; ++i) { - actors[i]->Stop(); - } - - executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); - } - - for (size_t i = 1; i <= size; ++i) { - UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - } - - Y_UNIT_TEST(ChangeCount) { - const size_t msgCount = 1e3; - const size_t size = 4; - const size_t halfSize = size / 2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto begin = TInstant::Now(); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - executorPool->SetThreadCount(halfSize); - - TTestSenderActor* actors[size]; - TActorId actorIds[size]; - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actorIds[i], msgCount); - } - for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - const i32 N = 6; - const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; - - ui64 counter = 0; - - TTestSenderActor* changerActor = new TTestSenderActor([&]{ - executorPool->SetThreadCount(threadsCouns[counter]); - counter++; - if (counter == N) { - counter = 0; - } - }); - TActorId changerActorId = actorSystem.Register(changerActor); - changerActor->Start(changerActorId, msgCount); - actorSystem.Send(changerActorId, new TEvMsg()); - - while (true) { - size_t maxCounter = 0; - for (size_t i = 0; i < size; ++i) { - maxCounter = Max(maxCounter, actors[i]->GetCounter()); - } - - if (maxCounter == 0) { - break; - } - - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - - Sleep(TDuration::MilliSeconds(1)); - } - - changerActor->Stop(); + //UNIT_ASSERT_VALUES_EQUAL_C(-1, TBasicExecutorPool::TSemaphore::GetSemaphore(value-1).OldSemaphore); } Y_UNIT_TEST(CheckCompleteOne) { @@ -433,3 +358,182 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0); } } + +Y_UNIT_TEST_SUITE(ChangingThreadsCountInBasicExecutorPool) { + + struct TMockState { + void ActorDo() {} + }; + + struct TTestActors { + const size_t Count; + TArrayHolder<TTestSenderActor*> Actors; + TArrayHolder<TActorId> ActorIds; + + TTestActors(size_t count) + : Count(count) + , Actors(new TTestSenderActor*[count]) + , ActorIds(new TActorId[count]) + { } + + void Start(TActorSystem &actorSystem, size_t msgCount) { + for (size_t i = 0; i < Count; ++i) { + Actors[i]->Start(Actors[i]->SelfId(), msgCount); + } + for (size_t i = 0; i < Count; ++i) { + actorSystem.Send(ActorIds[i], new TEvMsg()); + } + } + + void Stop() { + for (size_t i = 0; i < Count; ++i) { + Actors[i]->Stop(); + } + } + }; + + template <typename TState = TMockState> + struct TTestCtx { + const size_t MaxThreadCount; + const size_t SendingMessageCount; + std::unique_ptr<TBasicExecutorPool> ExecutorPool; + THolder<TActorSystemSetup> Setup; + TActorSystem ActorSystem; + + TState State; + + TTestCtx(size_t maxThreadCount, size_t sendingMessageCount) + : MaxThreadCount(maxThreadCount) + , SendingMessageCount(sendingMessageCount) + , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50)) + , Setup(GetActorSystemSetup(ExecutorPool.get())) + , ActorSystem(Setup) + { + } + + TTestCtx(size_t maxThreadCount, size_t sendingMessageCount, const TState &state) + : MaxThreadCount(maxThreadCount) + , SendingMessageCount(sendingMessageCount) + , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50)) + , Setup(GetActorSystemSetup(ExecutorPool.get())) + , ActorSystem(Setup) + , State(state) + { + } + + ~TTestCtx() { + ExecutorPool.release(); + } + + TTestActors RegisterCheckActors(size_t actorCount) { + TTestActors res(actorCount); + for (size_t i = 0; i < actorCount; ++i) { + res.Actors[i] = new TTestSenderActor([&] { + State.ActorDo(); + }); + res.ActorIds[i] = ActorSystem.Register(res.Actors[i]); + } + return res; + } + }; + + struct TCheckingInFlightState { + TAtomic ExpectedMaximum = 0; + TAtomic CurrentInFlight = 0; + + void ActorStartProcessing() { + ui32 inFlight = AtomicIncrement(CurrentInFlight); + ui32 maximum = AtomicGet(ExpectedMaximum); + if (maximum) { + UNIT_ASSERT_C(inFlight <= maximum, "inFlight# " << inFlight << " maximum# " << maximum); + } + } + + void ActorStopProcessing() { + AtomicDecrement(CurrentInFlight); + } + + void ActorDo() { + ActorStartProcessing(); + NanoSleep(1'000'000); + ActorStopProcessing(); + } + }; + + Y_UNIT_TEST(DecreaseIncreaseThreadCount) { + const size_t msgCount = 1e2; + const size_t size = 4; + const size_t testCount = 2; + TTestCtx<TCheckingInFlightState> ctx(size, msgCount); + ctx.ActorSystem.Start(); + + TVector<TExecutorThreadStats> statsCopy[testCount]; + + TTestActors testActors = ctx.RegisterCheckActors(size); + + const size_t N = 6; + const size_t threadsCounts[N] = { 1, 3, 2, 3, 1, 4 }; + for (ui32 idx = 0; idx < 4 * N; ++idx) { + size_t currentThreadCount = threadsCounts[idx]; + ctx.ExecutorPool->SetThreadCount(currentThreadCount); + AtomicSet(ctx.State.ExpectedMaximum, currentThreadCount); + + for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { + testActors.Start(ctx.ActorSystem, msgCount); + Sleep(TDuration::MilliSeconds(100)); + testActors.Stop(); + } + Sleep(TDuration::MilliSeconds(10)); + } + ctx.ActorSystem.Stop(); + } + + Y_UNIT_TEST(ContiniousChangingThreadCount) { + const size_t msgCount = 1e2; + const size_t size = 4; + + auto begin = TInstant::Now(); + TTestCtx<TCheckingInFlightState> ctx(size, msgCount, TCheckingInFlightState{msgCount}); + ctx.ActorSystem.Start(); + TTestActors testActors = ctx.RegisterCheckActors(size); + + testActors.Start(ctx.ActorSystem, msgCount); + + const size_t N = 6; + const size_t threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; + + ui64 counter = 0; + + TTestSenderActor* changerActor = new TTestSenderActor([&]{ + ctx.State.ActorStartProcessing(); + AtomicSet(ctx.State.ExpectedMaximum, 0); + ctx.ExecutorPool->SetThreadCount(threadsCouns[counter]); + NanoSleep(10'000'000); + AtomicSet(ctx.State.ExpectedMaximum, threadsCouns[counter]); + counter++; + if (counter == N) { + counter = 0; + } + ctx.State.ActorStopProcessing(); + }); + TActorId changerActorId = ctx.ActorSystem.Register(changerActor); + changerActor->Start(changerActorId, msgCount); + ctx.ActorSystem.Send(changerActorId, new TEvMsg()); + + while (true) { + size_t maxCounter = 0; + for (size_t i = 0; i < size; ++i) { + maxCounter = Max(maxCounter, testActors.Actors[i]->GetCounter()); + } + if (maxCounter == 0) { + break; + } + auto now = TInstant::Now(); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); + Sleep(TDuration::MilliSeconds(1)); + } + + changerActor->Stop(); + ctx.ActorSystem.Stop(); + } +} diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index dac6245635..a2cb269280 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -7,6 +7,7 @@ #include "mailbox.h" #include "scheduler_queue.h" #include <library/cpp/actors/util/affinity.h> +#include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/util/futex.h> #include <library/cpp/actors/util/intrinsics.h> @@ -953,6 +954,8 @@ namespace NActors { // Thread-safe per pool stats // NOTE: It's guaranteed that cpu never executes two instance of the same pool TVector<TExecutorThreadStats> PoolStats; + TCpuLoadLog<1024> LoadLog; + // Configuration TCpuId CpuId; @@ -1000,7 +1003,9 @@ namespace NActors { } bool ActiveWait(ui64 spinThresholdTs, TPoolId& result) { - ui64 deadline = GetCycleCountFast() + spinThresholdTs; + ui64 ts = GetCycleCountFast(); + LoadLog.RegisterBusyPeriod(ts); + ui64 deadline = ts + spinThresholdTs; while (GetCycleCountFast() < deadline) { for (ui32 i = 0; i < 12; ++i) { TPoolId current = State.CurrentPool(); @@ -1008,6 +1013,7 @@ namespace NActors { SpinLockPause(); } else { result = current; + LoadLog.RegisterIdlePeriod(GetCycleCountFast()); return true; // wakeup } } @@ -1269,15 +1275,25 @@ namespace NActors { if (Pools[pool].IsUnited()) { ui64 ElapsedTs = 0; ui64 ParkedTs = 0; + TStackVec<TCpuLoadLog<1024>*, 128> logs; + ui64 worstActivationTimeUs = 0; for (TCpu* cpu : Pools[pool].WakeOrderCpus) { - const TExecutorThreadStats& cpuStats = cpu->PoolStats[pool]; + TExecutorThreadStats& cpuStats = cpu->PoolStats[pool]; ElapsedTs += cpuStats.ElapsedTicks; ParkedTs += cpuStats.ParkedTicks; + worstActivationTimeUs = Max(worstActivationTimeUs, cpuStats.WorstActivationTimeUs); + cpuStats.WorstActivationTimeUs = 0; + logs.push_back(&cpu->LoadLog); } + ui64 minPeriodTs = Min(ui64(Us2Ts(Balancer->GetPeriodUs())), ui64((1024ull-2ull)*64ull*128ull*1024ull)); + ui64 estimatedTs = MinusOneCpuEstimator.MaxLatencyIncreaseWithOneLessCpu( + &logs[0], logs.size(), ts, minPeriodTs); TBalancerStats stats; stats.Ts = ts; stats.CpuUs = Ts2Us(ElapsedTs); stats.IdleUs = Ts2Us(ParkedTs); + stats.ExpectedLatencyIncreaseUs = Ts2Us(estimatedTs); + stats.WorstActivationTimeUs = worstActivationTimeUs; Balancer->SetPoolStats(pool, stats); } } @@ -1332,11 +1348,13 @@ namespace NActors { return result; } wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed()); + cpu.LoadLog.RegisterBusyPeriod(GetCycleCountFast()); bool wakeup; do { wakeup = cpu.BlockedWait(result, Config.Balancer.PeriodUs * 1000); wctx.AddParkedCycles(timeTracker.Elapsed()); } while (!wakeup); + cpu.LoadLog.RegisterIdlePeriod(GetCycleCountFast()); return result; } diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index a090ba2466..0895b06462 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -8,6 +8,7 @@ #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -34,6 +35,7 @@ namespace NActors { TCpuAllocationConfig Allocation; volatile bool StopFlag = false; + TMinusOneCpuEstimator<1024> MinusOneCpuEstimator; public: TUnitedWorkers( diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index a1595d8588..88b04e6472 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -111,6 +111,10 @@ struct TRoundRobinBalancer: public IBalancer { State->Load(assigned, current); State->AssignPool(NextPool[assigned]); } + + ui64 GetPeriodUs() override { + return 1000; + } }; void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) { diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 446b651efd..4271dadab2 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -87,7 +87,7 @@ namespace NActors { } template <class T> - inline TString SafeTypeName(T* t) { + inline TString SafeTypeName(const T* t) { if (t == nullptr) { return "nullptr"; } @@ -98,11 +98,7 @@ namespace NActors { } } - inline TString ActorTypeName(const IActor* actor, ui32 activityType) { - return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)"); - } - - inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient, + inline void LwTraceSlowDelivery(IEventHandle* ev, const std::type_info* actorType, ui32 poolId, const TActorId& currentRecipient, double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) { const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; LWPROBE(EventSlowDelivery, @@ -112,10 +108,10 @@ namespace NActors { eventsExecutedBefore, baseEv ? SafeTypeName(baseEv) : (ev ? ToString(ev->Type) : TString("nullptr")), currentRecipient.ToString(), - SafeTypeName(actor)); + SafeTypeName(actorType)); } - inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const IActor* actor, ui32 poolId, ui32 activityType, + inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const std::type_info* actorType, ui32 poolId, const TActorId& currentRecipient, double eventMs) { // Event could have been destroyed by actor->Receive(); const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; @@ -124,7 +120,7 @@ namespace NActors { eventMs, baseEv ? SafeTypeName(baseEv) : ToString(evTypeForTracing), currentRecipient.ToString(), - ActorTypeName(actor, activityType)); + SafeTypeName(actorType)); } template <typename TMailbox> @@ -137,6 +133,7 @@ namespace NActors { NHPTimer::STime hpprev = hpstart; IActor* actor = nullptr; + const std::type_info* actorType = nullptr; ui32 prevActivityType = std::numeric_limits<ui32>::max(); TActorId recipient; for (ui32 executed = 0; executed < Ctx.EventsPerMailbox; ++executed) { @@ -148,6 +145,9 @@ namespace NActors { TActorContext ctx(*mailbox, *this, hpprev, recipient); TlsActivationContext = &ctx; + // Since actor is not null there should be no exceptions + actorType = &typeid(*actor); + #ifdef USE_ACTOR_CALLSTACK TCallstack::GetTlsCallstack() = ev->Callstack; TCallstack::GetTlsCallstack().SetLinesToSkip(); @@ -165,7 +165,7 @@ namespace NActors { i64 usecDeliv = Ctx.AddEventDeliveryStats(ev->SendTime, hpprev); if (usecDeliv > 5000) { double sinceActivationMs = NHPTimer::GetSeconds(hpprev - hpstart) * 1000.0; - LwTraceSlowDelivery(ev.Get(), actor, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed); + LwTraceSlowDelivery(ev.Get(), actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed); } ui32 evTypeForTracing = ev->Type; @@ -191,7 +191,7 @@ namespace NActors { hpnow = GetCycleCountFast(); NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter); if (elapsed > 1000000) { - LwTraceSlowEvent(ev.Get(), evTypeForTracing, actor, Ctx.PoolId, activityType, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); + LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); } // The actor might have been destroyed @@ -200,6 +200,8 @@ namespace NActors { CurrentRecipient = TActorId(); } else { + actorType = nullptr; + TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); if (nonDelivered.Get()) { ActorSystem->Send(nonDelivered); @@ -223,7 +225,7 @@ namespace NActors { CyclesToDuration(hpnow - hpstart), Ctx.WorkerId, recipient.ToString(), - SafeTypeName(actor)); + SafeTypeName(actorType)); break; } @@ -239,7 +241,7 @@ namespace NActors { CyclesToDuration(hpnow - hpstart), Ctx.WorkerId, recipient.ToString(), - SafeTypeName(actor)); + SafeTypeName(actorType)); break; } @@ -254,7 +256,7 @@ namespace NActors { CyclesToDuration(hpnow - hpstart), Ctx.WorkerId, recipient.ToString(), - SafeTypeName(actor)); + SafeTypeName(actorType)); break; } } else { diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp new file mode 100644 index 0000000000..f318d8909c --- /dev/null +++ b/library/cpp/actors/core/harmonizer.cpp @@ -0,0 +1,431 @@ +#include "harmonizer.h" + +#include "probes.h" +#include "actorsystem.h" + +#include <library/cpp/actors/util/cpu_load_log.h> +#include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/intrinsics.h> + +#include <util/system/spinlock.h> + +#include <algorithm> + +namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +constexpr bool CheckBinaryPower(ui64 value) { + return !(value & (value - 1)); +} + +struct TValueHistory { + static constexpr ui64 HistoryBufferSize = 8; + static_assert(CheckBinaryPower(HistoryBufferSize)); + + double History[HistoryBufferSize] = {0.0}; + ui64 HistoryIdx = 0; + ui64 LastTs = Max<ui64>(); + double LastUs = 0.0; + double AccumulatedUs = 0.0; + ui64 AccumulatedTs = 0; + + template <bool WithTail=false> + double GetAvgPartForLastSeconds(ui8 seconds) { + double sum = AccumulatedUs; + size_t idx = HistoryIdx; + ui8 leftSeconds = seconds; + do { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + if (WithTail || leftSeconds) { + sum += History[idx]; + } else { + ui64 tsInSecond = Us2Ts(1'000'000.0); + sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond; + } + } while (leftSeconds); + double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0); + double avg = sum / duration; + return avg; + } + + double GetAvgPart() { + return GetAvgPartForLastSeconds<true>(HistoryBufferSize); + } + + void Register(ui64 ts, double valueUs) { + if (ts < LastTs) { + LastTs = ts; + LastUs = valueUs; + AccumulatedUs = 0.0; + AccumulatedTs = 0; + return; + } + ui64 lastTs = std::exchange(LastTs, ts); + ui64 dTs = ts - lastTs; + double lastUs = std::exchange(LastUs, valueUs); + double dUs = valueUs - lastUs; + LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs); + + if (dTs > Us2Ts(8'000'000.0)) { + dUs = dUs * 1'000'000.0 / Ts2Us(dTs); + for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { + History[idx] = dUs; + } + AccumulatedUs = 0.0; + AccumulatedTs = 0; + return; + } + + while (dTs > 0) { + if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) { + AccumulatedTs += dTs; + AccumulatedUs += dUs; + break; + } else { + ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs; + double addUs = dUs * addTs / dTs; + dTs -= addTs; + dUs -= addUs; + History[HistoryIdx] = AccumulatedUs + addUs; + HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; + AccumulatedUs = 0.0; + AccumulatedTs = 0; + } + } + } +}; + +struct TThreadInfo { + TValueHistory Consumed; + TValueHistory Booked; +}; + +struct TPoolInfo { + std::vector<TThreadInfo> ThreadInfo; + IExecutorPool* Pool = nullptr; + i16 DefaultThreadCount = 0; + i16 MinThreadCount = 0; + i16 MaxThreadCount = 0; + i16 Priority = 0; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs = 0; + ui64 LastUpdateTs = 0; + + TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish + TAtomic IncreasingThreadsByNeedyState = 0; + TAtomic DecreasingThreadsByStarvedState = 0; + TAtomic DecreasingThreadsByHoggishState = 0; + + bool IsBeingStopped(i16 threadIdx); + double GetBooked(i16 threadIdx); + double GetlastSecondPoolBooked(i16 threadIdx); + double GetConsumed(i16 threadIdx); + double GetlastSecondPoolConsumed(i16 threadIdx); + void PullStats(ui64 ts); + i16 GetThreadCount(); + void SetThreadCount(i16 threadCount); + bool IsAvgPingGood(); +}; + +bool TPoolInfo::IsBeingStopped(i16 threadIdx) { + return Pool->IsThreadBeingStopped(threadIdx); +} + +double TPoolInfo::GetBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + } + return 0.0; +} + +double TPoolInfo::GetConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + } + return 0.0; +} + +#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] +void TPoolInfo::PullStats(ui64 ts) { + for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { + TThreadInfo &threadInfo = ThreadInfo[threadIdx]; + threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx)); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); + threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx)); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); + } +} +#undef UNROLL_HISTORY + +i16 TPoolInfo::GetThreadCount() { + return Pool->GetThreadCount(); +} + +void TPoolInfo::SetThreadCount(i16 threadCount) { + Pool->SetThreadCount(threadCount); +} + +bool TPoolInfo::IsAvgPingGood() { + bool res = true; + if (AvgPingCounter) { + res &= *AvgPingCounter > MaxAvgPingUs; + } + if (AvgPingCounterWithSmallWindow) { + res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; + } + return res; +} + +class THarmonizer: public IHarmonizer { +private: + std::atomic<bool> IsDisabled = false; + TSpinLock Lock; + std::atomic<ui64> NextHarmonizeTs = 0; + std::vector<TPoolInfo> Pools; + std::vector<ui16> PriorityOrder; + + void PullStats(ui64 ts); + void HarmonizeImpl(ui64 ts); + void CalculatePriorityOrder(); +public: + THarmonizer(ui64 ts); + virtual ~THarmonizer(); + double Rescale(double value) const; + void Harmonize(ui64 ts) override; + void DeclareEmergency(ui64 ts) override; + void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; + void Enable(bool enable) override; + TPoolHarmonizedStats GetPoolStats(i16 poolId) const override; +}; + +THarmonizer::THarmonizer(ui64 ts) { + NextHarmonizeTs = ts; +} + +THarmonizer::~THarmonizer() { +} + +double THarmonizer::Rescale(double value) const { + return Max(0.0, Min(1.0, value * (1.0/0.9))); +} + +void THarmonizer::PullStats(ui64 ts) { + for (TPoolInfo &pool : Pools) { + pool.PullStats(ts); + } +} + +Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { + return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; +} + +Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { + return booked < currentThreadCount - 1; +} + +void THarmonizer::HarmonizeImpl(ui64 ts) { + bool isStarvedPresent = false; + double booked = 0.0; + double consumed = 0.0; + double lastSecondBooked = 0.0; + i64 beingStopped = 0; + i64 total = 0; + TStackVec<size_t, 8> needyPools; + TStackVec<size_t, 8> hoggishPools; + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + total += pool.DefaultThreadCount; + double poolBooked = 0.0; + double poolConsumed = 0.0; + double lastSecondPoolBooked = 0.0; + double lastSecondPoolConsumed = 0.0; + beingStopped += pool.Pool->GetBlockingThreadCount(); + for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { + poolBooked += Rescale(pool.GetBooked(threadIdx)); + lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx)); + poolConsumed += Rescale(pool.GetConsumed(threadIdx)); + lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx)); + } + bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); + if (isStarved) { + isStarvedPresent = true; + } + ui32 currentThreadCount = pool.GetThreadCount(); + bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount; + if (pool.AvgPingCounter) { + if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { + isNeedy = false; + } else { + pool.LastUpdateTs = ts; + } + } + if (isNeedy) { + needyPools.push_back(poolIdx); + } + bool isHoggish = IsHoggish(poolBooked, currentThreadCount) + || IsHoggish(lastSecondPoolBooked, currentThreadCount); + if (isHoggish) { + hoggishPools.push_back(poolIdx); + } + booked += poolBooked; + consumed += poolConsumed; + AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2)); + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); + } + double budget = total - Max(booked, lastSecondBooked); + if (budget < -0.1) { + isStarvedPresent = true; + } + double overbooked = consumed - booked; + if (isStarvedPresent) { + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + TStackVec<size_t> reorder; + for (size_t i = 0; i < Pools.size(); ++i) { + reorder.push_back(i); + } + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + if (threadCount > pool.DefaultThreadCount) { + pool.SetThreadCount(threadCount - 1); + AtomicIncrement(pool.DecreasingThreadsByStarvedState); + overbooked--; + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + if (overbooked < 1) { + break; + } + } + } + } + } else { + for (size_t needyPoolIdx : needyPools) { + TPoolInfo &pool = Pools[needyPoolIdx]; + if (budget >= 1.0) { + i64 threadCount = pool.GetThreadCount(); + if (threadCount + 1 <= pool.MaxThreadCount) { + AtomicIncrement(pool.IncreasingThreadsByNeedyState); + pool.SetThreadCount(threadCount + 1); + budget -= 1.0; + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount); + } + } + } + } + for (size_t hoggishPoolIdx : hoggishPools) { + TPoolInfo &pool = Pools[hoggishPoolIdx]; + i64 threadCount = pool.GetThreadCount(); + if (threadCount > pool.MinThreadCount) { + AtomicIncrement(pool.DecreasingThreadsByHoggishState); + LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + pool.SetThreadCount(threadCount - 1); + } + } +} + +void THarmonizer::CalculatePriorityOrder() { + PriorityOrder.resize(Pools.size()); + Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); + Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs].Priority != Pools[rhs].Priority) { + return Pools[lhs].Priority < Pools[rhs].Priority; + } + return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId; + }); +} + +void THarmonizer::Harmonize(ui64 ts) { + if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); + return; + } + // Check again under the lock + if (IsDisabled) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true); + Lock.Release(); + return; + } + // Will never reach this line disabled + ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull)); + LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs); + + if (PriorityOrder.empty()) { + CalculatePriorityOrder(); + } + + PullStats(ts); + HarmonizeImpl(ts); + + Lock.Release(); +} + +void THarmonizer::DeclareEmergency(ui64 ts) { + NextHarmonizeTs = ts; +} + +void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { + TGuard<TSpinLock> guard(Lock); + TPoolInfo poolInfo; + poolInfo.Pool = pool; + poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); + poolInfo.MinThreadCount = pool->GetMinThreadCount(); + poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); + poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount); + poolInfo.Priority = pool->GetPriority(); + pool->SetThreadCount(poolInfo.DefaultThreadCount); + if (pingInfo) { + poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; + poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; + poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; + } + Pools.push_back(poolInfo); + PriorityOrder.clear(); +}; + +void THarmonizer::Enable(bool enable) { + TGuard<TSpinLock> guard(Lock); + IsDisabled = enable; +} + +IHarmonizer* MakeHarmonizer(ui64 ts) { + return new THarmonizer(ts); +} + +TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { + const TPoolInfo &pool = Pools[poolId]; + ui64 flags = RelaxedLoad(&pool.LastFlags); + return TPoolHarmonizedStats { + .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), + .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), + .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .IsNeedy = static_cast<bool>(flags & 1), + .IsStarved = static_cast<bool>(flags & 2), + .IsHoggish = static_cast<bool>(flags & 4), + }; +} + +} diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h new file mode 100644 index 0000000000..61f13e43ac --- /dev/null +++ b/library/cpp/actors/core/harmonizer.h @@ -0,0 +1,30 @@ +#pragma once + +#include "defs.h" +#include "config.h" + +namespace NActors { + class IExecutorPool; + + struct TPoolHarmonizedStats { + ui64 IncreasingThreadsByNeedyState = 0; + ui64 DecreasingThreadsByStarvedState = 0; + ui64 DecreasingThreadsByHoggishState = 0; + bool IsNeedy = false; + bool IsStarved = false; + bool IsHoggish = false; + }; + + // Pool cpu harmonizer + class IHarmonizer { + public: + virtual ~IHarmonizer() {} + virtual void Harmonize(ui64 ts) = 0; + virtual void DeclareEmergency(ui64 ts) = 0; + virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0; + virtual void Enable(bool enable) = 0; + virtual TPoolHarmonizedStats GetPoolStats(i16 poolId) const = 0; + }; + + IHarmonizer* MakeHarmonizer(ui64 ts); +} diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 6d482926d1..117d2ad41d 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -60,6 +60,14 @@ namespace NActors { struct TExecutorPoolStats { ui64 MaxUtilizationTime = 0; + ui64 IncreasingThreadsByNeedyState = 0; + ui64 DecreasingThreadsByStarvedState = 0; + ui64 DecreasingThreadsByHoggishState = 0; + i16 WrongWakenedThreadCount = 0; + i16 CurrentThreadCount = 0; + bool IsNeedy = false; + bool IsStarved = false; + bool IsHoggish = false; }; struct TExecutorThreadStats { @@ -69,6 +77,7 @@ namespace NActors { ui64 NonDeliveredEvents = 0; ui64 EmptyMailboxActivation = 0; ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion) + ui64 WorstActivationTimeUs = 0; NHPTimer::STime ElapsedTicks = 0; NHPTimer::STime ParkedTicks = 0; NHPTimer::STime BlockedTicks = 0; @@ -111,6 +120,9 @@ namespace NActors { NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); CpuNs += RelaxedLoad(&other.CpuNs); + RelaxedStore( + &WorstActivationTimeUs, + std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); ElapsedTicks += RelaxedLoad(&other.ElapsedTicks); ParkedTicks += RelaxedLoad(&other.ParkedTicks); BlockedTicks += RelaxedLoad(&other.BlockedTicks); diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index 4912d6dd26..11bbf81287 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -166,6 +166,30 @@ PROBE(MoveCpu, GROUPS("PoolCpuBalancer"), \ TYPES(ui32, ui64, TString, TString, ui32), \ NAMES("fromPoolId", "toPoolId", "fromPool", "toPool", "cpu")) \ + PROBE(ThreadCount, GROUPS("BasicThreadPool"), \ + TYPES(ui32, TString, ui32, ui32, ui32, ui32), \ + NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ + PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ + NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \ + PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, TString, ui32, ui32, ui32), \ + NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \ + PROBE(TryToHarmonize, GROUPS("Harmonizer"), \ + TYPES(ui32, TString), \ + NAMES("poolId", "pool")) \ + PROBE(SavedValues, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, TString, double, double, double, double, double, double, double, double), \ + NAMES("poolId", "pool", "valueName", "[0]", "[1]", "[2]", "[3]", "[4]", "[5]", "[6]", "[7]")) \ + PROBE(RegisterValue, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, ui64, ui64, double, double, double), \ + NAMES("ts", "lastTs", "dTs", "8sTs", "us", "lastUs", "dUs")) \ + PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, bool, bool), \ + NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \ + PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, ui64), \ + NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \ /**/ LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER) diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index b4c37a7629..384a13c5ee 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -95,6 +95,7 @@ namespace NActors { i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0; double usec = NHPTimer::GetSeconds(ts) * 1000000.0; Stats->ActivationTimeHistogram.Add(usec); + Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec); return usec; } diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 61d0b45780..b1217b1d63 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -124,6 +124,15 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; + NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr IsNeedy; + NMonitoring::TDynamicCounters::TCounterPtr IsStarved; + NMonitoring::TDynamicCounters::TCounterPtr IsHoggish; + NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState; + NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState; + NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; + THistogramCounters LegacyActivationTimeHistogram; NMonitoring::THistogramPtr ActivationTimeHistogram; @@ -167,6 +176,14 @@ private: MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); + WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true); + CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false); + IsNeedy = PoolGroup->GetCounter("IsNeedy", false); + IsStarved = PoolGroup->GetCounter("IsStarved", false); + IsHoggish = PoolGroup->GetCounter("IsHoggish", false); + IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true); + DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true); + DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); ActivationTimeHistogram = PoolGroup->GetHistogram( @@ -203,6 +220,14 @@ private: *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; + *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount; + *CurrentThreadCount = poolStats.CurrentThreadCount; + *IsNeedy = poolStats.IsNeedy; + *IsStarved = poolStats.IsStarved; + *IsHoggish = poolStats.IsHoggish; + *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState; + *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState; + *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState; LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); ActivationTimeHistogram->Reset(); diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index f9bfaf8dc0..dc383f8c4c 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -61,10 +61,14 @@ struct TAvgOperation { class TSelfPingActor : public TActorBootstrapped<TSelfPingActor> { private: const TDuration SendInterval; - const NMonitoring::TDynamicCounters::TCounterPtr Counter; + const NMonitoring::TDynamicCounters::TCounterPtr MaxPingCounter; + const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter; - NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow; + NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> MaxPingSlidingWindow; + NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSlidingWindow; + NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSmallSlidingWindow; NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow; THPTimer Timer; @@ -74,12 +78,19 @@ public: return SELF_PING_ACTOR; } - TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, + TSelfPingActor(TDuration sendInterval, + const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) : SendInterval(sendInterval) - , Counter(counter) + , MaxPingCounter(maxPingCounter) + , AvgPingCounter(avgPingCounter) + , AvgPingCounterWithSmallWindow(avgPingSmallWindowCounter) , CalculationTimeCounter(calculationTimeCounter) - , SlidingWindow(TDuration::Seconds(15), 100) + , MaxPingSlidingWindow(TDuration::Seconds(15), 100) + , AvgPingSlidingWindow(TDuration::Seconds(15), 100) + , AvgPingSmallSlidingWindow(TDuration::Seconds(1), 100) , CalculationSlidingWindow(TDuration::Seconds(15), 100) { } @@ -154,11 +165,23 @@ public: const double passedTime = hpNow - e.TimeStart; const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0; - *Counter = SlidingWindow.Update(delayUs, now); + if (MaxPingCounter) { + *MaxPingCounter = MaxPingSlidingWindow.Update(delayUs, now); + } + if (AvgPingCounter) { + auto res = AvgPingSlidingWindow.Update({1, delayUs}, now); + *AvgPingCounter = double(res.Sum) / double(res.Count + 1); + } + if (AvgPingCounterWithSmallWindow) { + auto res = AvgPingSmallSlidingWindow.Update({1, delayUs}, now); + *AvgPingCounterWithSmallWindow = double(res.Sum) / double(res.Count + 1); + } - ui64 d = MeasureTaskDurationNs(); - auto res = CalculationSlidingWindow.Update({1, d}, now); - *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); + if (CalculationTimeCounter) { + ui64 d = MeasureTaskDurationNs(); + auto res = CalculationSlidingWindow.Update({1, d}, now); + *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); + } SchedulePing(ctx, hpNow); } @@ -174,10 +197,12 @@ private: IActor* CreateSelfPingActor( TDuration sendInterval, - const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) { - return new TSelfPingActor(sendInterval, counter, calculationTimeCounter); + return new TSelfPingActor(sendInterval, maxPingCounter, avgPingCounter, avgPingSmallWindowCounter, calculationTimeCounter); } } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h index d7d07f9fa8..a976a4f425 100644 --- a/library/cpp/actors/helpers/selfping_actor.h +++ b/library/cpp/actors/helpers/selfping_actor.h @@ -7,7 +7,9 @@ namespace NActors { NActors::IActor* CreateSelfPingActor( TDuration sendInterval, - const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter, + const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter, const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter); } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index 459635fa24..542f817755 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -22,13 +22,17 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr()); NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr()); + NMonitoring::TDynamicCounters::TCounterPtr counter3(new NMonitoring::TCounterForPtr()); + NMonitoring::TDynamicCounters::TCounterPtr counter4(new NMonitoring::TCounterForPtr()); auto actor = CreateSelfPingActor( TDuration::MilliSeconds(100), // sendInterval (unused in test) - counter, counter2); + counter, counter2, counter3, counter4); UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0); UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counter3->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counter4->Val(), 0); const TActorId actorId = runtime->Register(actor); Y_UNUSED(actorId); diff --git a/library/cpp/actors/util/CMakeLists.txt b/library/cpp/actors/util/CMakeLists.txt index 40d958d75e..233e1fe0fc 100644 --- a/library/cpp/actors/util/CMakeLists.txt +++ b/library/cpp/actors/util/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-util PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + library-cpp-pop_count ) target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp diff --git a/library/cpp/actors/util/cpu_load_log.h b/library/cpp/actors/util/cpu_load_log.h new file mode 100644 index 0000000000..e4ae612246 --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log.h @@ -0,0 +1,227 @@ +#pragma once + +#include "defs.h" +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/pop_count/popcount.h> + +static constexpr ui64 BitDurationNs = 131'072; // A power of 2 + +template <ui64 DataSize> +struct TCpuLoadLog { + static constexpr ui64 BitsSize = DataSize * 64; + TAtomic LastTimeNs = 0; + ui64 Data[DataSize]; + + TCpuLoadLog() { + LastTimeNs = 0; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + TCpuLoadLog(ui64 timeNs) { + LastTimeNs = timeNs; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + void RegisterBusyPeriod(ui64 timeNs) { + RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs)); + } + + template <bool ModifyLastTime> + void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) { + timeNs |= 1ull; + if (timeNs < lastTimeNs) { + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + const ui64 lastIdx = timeNs / BitDurationNs; + const ui64 curIdx = lastTimeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 firstBitIdx = curIdx % 64; + const ui64 lastElementIdx = lastIdx / 64; + const ui64 lastBitIdx = lastIdx % 64; + if (firstElementIdx == lastElementIdx) { + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx)); + const ui64 newValue = prevValue | bits; + AtomicSet(Data[firstElementIdx % DataSize], newValue); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + // process the first element + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = ((~0ull) << firstBitIdx); + const ui64 newValue = (prevValue | bits); + AtomicSet(Data[firstElementIdx % DataSize], newValue); + ++firstElementIdx; + // process the fully filled elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + for (ui64 i = 0; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } + // process the last element + const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx)); + AtomicSet(Data[lastOffset], newValue2); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + } + + void RegisterIdlePeriod(ui64 timeNs) { + timeNs &= ~1ull; + ui64 lastTimeNs = AtomicGet(LastTimeNs); + if (timeNs < lastTimeNs) { + // Fast check first, slower chec later + if ((timeNs | 1ull) < lastTimeNs) { + // Time goes back, dont panic, just mark the whole array 'busy' + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + AtomicSet(LastTimeNs, timeNs); + return; + } + } + const ui64 curIdx = lastTimeNs / BitDurationNs; + const ui64 lastIdx = timeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 lastElementIdx = lastIdx / 64; + if (firstElementIdx >= lastElementIdx) { + AtomicSet(LastTimeNs, timeNs); + return; + } + // process the first partially filled element + ++firstElementIdx; + // process all other elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], 0); + } + for (ui64 i = 0; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } + AtomicSet(LastTimeNs, timeNs); + } +}; + +template <ui64 DataSize> +struct TMinusOneCpuEstimator { + static constexpr ui64 BitsSize = DataSize * 64; + ui64 BeginDelayIdx; + ui64 EndDelayIdx; + ui64 Idle; + ui64 Delay[BitsSize]; + + ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) { + Y_VERIFY(logCount > 0); + ui64 endTimeNs = timeNs; + + ui64 lastTimeNs = timeNs; + for (i64 log_idx = 0; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->LastTimeNs); + if ((x & 1) == 1) { + lastTimeNs = Min(lastTimeNs, x); + } else { + logs[log_idx]->template RegisterBusyPeriod<false>(endTimeNs, x); + } + } + const ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0; + + ui64 beginIdx = beginTimeNs / BitDurationNs; + ui64 lastIdx = lastTimeNs / BitDurationNs; + ui64 beginElementIdx = beginIdx / 64; + ui64 lastElementIdx = lastIdx / 64; + + BeginDelayIdx = 0; + EndDelayIdx = 0; + Idle = 0; + ui64 maxDelay = 0; + ui64 bucket = 0; + for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) { + ui64 i = idx % DataSize; + ui64 input = AtomicGet(logs[0]->Data[i]); + ui64 all_busy = ~0ull; + for (i64 log_idx = 1; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->Data[i]); + all_busy &= x; + } + if (!input) { + if (!bucket) { + Idle += 64 - PopCount(all_busy); + continue; + } + } + for (i64 bit_idx = 0; bit_idx < 64; ++bit_idx) { + ui64 x = (1ull << bit_idx); + if (all_busy & x) { + if (input & x) { + // Push into the queue + bucket++; + Delay[EndDelayIdx] = EndDelayIdx; + ++EndDelayIdx; + } else { + // All busy + } + } else { + if (input & x) { + // Move success + } else { + if (bucket) { + // Remove from the queue + bucket--; + ui64 stored = Delay[BeginDelayIdx]; + ++BeginDelayIdx; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "bit_idx: " << bit_idx << " stored: " << stored << " delay: " << delay << Endl; + } else { + Idle++; + } + } + } + } + } + if (bucket) { + ui64 stored = Delay[BeginDelayIdx]; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "last stored: " << stored << " delay: " << delay << Endl; + } + return maxDelay * BitDurationNs; + } +}; + diff --git a/library/cpp/actors/util/cpu_load_log_ut.cpp b/library/cpp/actors/util/cpu_load_log_ut.cpp new file mode 100644 index 0000000000..7109123c6e --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log_ut.cpp @@ -0,0 +1,275 @@ +#include "cpu_load_log.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/sanitizers.h> +#include <util/system/thread.h> + +Y_UNIT_TEST_SUITE(CpuLoadLog) { + + TString PrintBits(ui64 x) { + TStringStream str; + for (ui64 i = 0; i < 64; ++i) { + if (x & (1ull << i)) { + str << "1"; + } else { + str << "0"; + } + } + return str.Str(); + } + + Y_UNIT_TEST(FillAll) { + TCpuLoadLog<5> log(100*BitDurationNs); + log.RegisterBusyPeriod(101*BitDurationNs); + log.RegisterBusyPeriod(163*BitDurationNs); + log.RegisterBusyPeriod(164*BitDurationNs); + log.RegisterBusyPeriod(165*BitDurationNs); + log.RegisterBusyPeriod(331*BitDurationNs); + log.RegisterBusyPeriod(340*BitDurationNs); + log.RegisterBusyPeriod(420*BitDurationNs); + log.RegisterBusyPeriod(511*BitDurationNs); + //for (ui64 i = 0; i < 5; ++i) { + // Cerr << "i: " << i << " bits: " << PrintBits(log.Data[i]) << Endl; + //} + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_C((ui64(log.Data[i]) == ~ui64(0)), "Unequal at " << i << "\n got: " << PrintBits(log.Data[i]) + << "\n expected: " << PrintBits(~ui64(0))); + } + } + + Y_UNIT_TEST(PartialFill) { + TCpuLoadLog<5> log(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs/2); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterIdlePeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterBusyPeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1011ull)); + log.RegisterBusyPeriod(63*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(2*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod((192+5*64-1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + log.RegisterIdlePeriod((192+15*64)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + } + + Y_UNIT_TEST(Estimator) { + TCpuLoadLog<5> *log[10]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + log[0]->RegisterIdlePeriod((5*64-2)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[0]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[4]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + log[1]->RegisterIdlePeriod((5*64-2+1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[0]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[4]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64)*BitDurationNs-1, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 1); + + value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64+10)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 12); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator2) { + TCpuLoadLog<5> *log[2]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[i]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + } + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[i]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + } + + log[0]->Data[2] = ~0ull; + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64-1)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 32); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator3) { + TCpuLoadLog<5> *log[3]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + log[2] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=8) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[1]->RegisterIdlePeriod(i*BitDurationNs); + log[1]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[2]->RegisterIdlePeriod(i*BitDurationNs); + log[2]->RegisterBusyPeriod((i+3)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + for (ui64 n = 0; n < 3; ++n) { + UNIT_ASSERT_VALUES_EQUAL_C(PrintBits(log[n]->Data[i]), + PrintBits(0b0000111100001111000011110000111100001111000011110000111100001111ull), + " i: " << i << " n: " << n); + } + } + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 3, (5*64-5)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 4); + + delete log[0]; + delete log[1]; + delete log[2]; + } + /* + class TWorkerThread : public ISimpleThread { + private: + std::function<void()> Func; + double Time = 0.0; + + public: + TWorkerThread(std::function<void()> func) + : Func(std::move(func)) + { } + + double GetTime() const { + return Time; + } + + static THolder<TWorkerThread> Spawn(std::function<void()> func) { + THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func)); + thread->Start(); + return thread; + } + + private: + void* ThreadProc() noexcept override { + THPTimer timer; + Func(); + Time = timer.Passed(); + return nullptr; + } + }; + + void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) { + // Concurrency factor 4 is up to 16 threads + + auto workerFunc = [&](size_t threadIndex) { + }; + + TVector<THolder<TWorkerThread>> workers(threads); + for (size_t i = 0; i < threads; ++i) { + workers[i] = TWorkerThread::Spawn([workerFunc, i]() { + workerFunc(i); + }); + } + + double maxTime = 0; + for (size_t i = 0; i < threads; ++i) { + workers[i]->Join(); + maxTime = Max(maxTime, workers[i]->GetTime()); + } + + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + + Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl; + } + + void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) { + for (size_t i = 0; i < 3; ++i) { + DoConcurrentPushPop(threads, perThreadCount); + } + } + + static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000); + + Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); } + */ +} diff --git a/library/cpp/actors/util/thread_load_log.h b/library/cpp/actors/util/thread_load_log.h new file mode 100644 index 0000000000..b4b34d47bb --- /dev/null +++ b/library/cpp/actors/util/thread_load_log.h @@ -0,0 +1,363 @@ +#pragma once + +#include "defs.h" + +#include <util/system/types.h> + +#include <type_traits> +#include <algorithm> +#include <atomic> +#include <limits> +#include <queue> + +template <ui64 TIME_SLOT_COUNT, ui64 TIME_SLOT_LENGTH_NS = 131'072, typename Type = std::uint8_t> +class TThreadLoad { +public: + using TimeSlotType = Type; + +private: + static constexpr auto TIME_SLOT_MAX_VALUE = std::numeric_limits<TimeSlotType>::max(); + static constexpr ui64 TIME_SLOT_PART_COUNT = TIME_SLOT_MAX_VALUE + 1; + static constexpr auto TIME_SLOT_PART_LENGTH_NS = TIME_SLOT_LENGTH_NS / TIME_SLOT_PART_COUNT; + + template <typename T> + static void AtomicAddBound(std::atomic<T>& val, i64 inc) { + if (inc == 0) { + return; + } + + auto newVal = val.load(); + auto oldVal = newVal; + + do { + static constexpr auto MAX_VALUE = std::numeric_limits<T>::max(); + + if (oldVal >= MAX_VALUE) { + return; + } + newVal = std::min<i64>(MAX_VALUE, static_cast<i64>(oldVal) + inc); + } while (!val.compare_exchange_weak(oldVal, newVal)); + } + + template <typename T> + static void AtomicSubBound(std::atomic<T>& val, i64 sub) { + if (sub == 0) { + return; + } + + auto newVal = val.load(); + auto oldVal = newVal; + + do { + if (oldVal == 0) { + return; + } + newVal = std::max<i64>(0, static_cast<i64>(oldVal) - sub); + } while (!val.compare_exchange_weak(oldVal, newVal)); + } + + void UpdateCompleteTimeSlots(ui64 firstSlotNumber, ui64 lastSlotNumber, TimeSlotType timeSlotValue) { + ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT; + ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT; + + const ui64 firstTimeSlotsPass = firstSlotNumber / TIME_SLOT_COUNT; + const ui64 lastTimeSlotsPass = lastSlotNumber / TIME_SLOT_COUNT; + + if (firstTimeSlotsPass == lastTimeSlotsPass) { + // first and last time slots are in the same pass + for (auto slotNumber = firstSlotNumber + 1; slotNumber < lastSlotNumber; ++slotNumber) { + auto slotIndex = slotNumber % TIME_SLOT_COUNT; + TimeSlots[slotIndex] = timeSlotValue; + } + } else if (firstTimeSlotsPass + 1 == lastTimeSlotsPass) { + for (auto slotIndex = (firstSlotNumber + 1) % TIME_SLOT_COUNT; firstSlotIndex < slotIndex && slotIndex < TIME_SLOT_COUNT; ++slotIndex) { + TimeSlots[slotIndex] = timeSlotValue; + } + for (auto slotIndex = 0u; slotIndex < lastSlotIndex; ++slotIndex) { + TimeSlots[slotIndex] = timeSlotValue; + } + } else { + for (auto slotIndex = 0u; slotIndex < TIME_SLOT_COUNT; ++slotIndex) { + TimeSlots[slotIndex] = timeSlotValue; + } + } + } + +public: + std::atomic<ui64> LastTimeNs; + std::atomic<TimeSlotType> TimeSlots[TIME_SLOT_COUNT]; + std::atomic<bool> LastRegisteredPeriodIsBusy = false; + + explicit TThreadLoad(ui64 timeNs = 0) { + static_assert(std::is_unsigned<TimeSlotType>::value); + + LastTimeNs = timeNs; + for (size_t i = 0; i < TIME_SLOT_COUNT; ++i) { + TimeSlots[i] = 0; + } + } + + static constexpr auto GetTimeSlotCount() { + return TIME_SLOT_COUNT; + } + + static constexpr auto GetTimeSlotLengthNs() { + return TIME_SLOT_LENGTH_NS; + } + + static constexpr auto GetTimeSlotPartLengthNs() { + return TIME_SLOT_PART_LENGTH_NS; + } + + static constexpr auto GetTimeSlotPartCount() { + return TIME_SLOT_PART_COUNT; + } + + static constexpr auto GetTimeSlotMaxValue() { + return TIME_SLOT_MAX_VALUE; + } + + static constexpr auto GetTimeWindowLengthNs() { + return TIME_SLOT_COUNT * TIME_SLOT_LENGTH_NS; + } + + void RegisterBusyPeriod(ui64 timeNs) { + RegisterBusyPeriod<true>(timeNs, LastTimeNs.load()); + } + + template <bool ModifyLastTime> + void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) { + LastRegisteredPeriodIsBusy = true; + + if (timeNs < lastTimeNs) { + // when time goes back, mark all time slots as 'free' + for (size_t i = 0u; i < TIME_SLOT_COUNT; ++i) { + TimeSlots[i] = 0; + } + + if (ModifyLastTime) { + LastTimeNs = timeNs; + } + + return; + } + + // lastTimeNs <= timeNs + ui64 firstSlotNumber = lastTimeNs / TIME_SLOT_LENGTH_NS; + ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT; + ui64 lastSlotNumber = timeNs / TIME_SLOT_LENGTH_NS; + ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT; + + if (firstSlotNumber == lastSlotNumber) { + ui32 slotLengthNs = timeNs - lastTimeNs; + ui32 slotPartsCount = (slotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + AtomicAddBound(TimeSlots[firstSlotIndex], slotPartsCount); + + if (ModifyLastTime) { + LastTimeNs = timeNs; + } + return; + } + + ui32 firstSlotLengthNs = TIME_SLOT_LENGTH_NS - (lastTimeNs % TIME_SLOT_LENGTH_NS); + ui32 firstSlotPartsCount = (firstSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + ui32 lastSlotLengthNs = timeNs % TIME_SLOT_LENGTH_NS; + ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + + // process first time slot + AtomicAddBound(TimeSlots[firstSlotIndex], firstSlotPartsCount); + + // process complete time slots + UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, TIME_SLOT_MAX_VALUE); + + // process last time slot + AtomicAddBound(TimeSlots[lastSlotIndex], lastSlotPartsCount); + + if (ModifyLastTime) { + LastTimeNs = timeNs; + } + } + + void RegisterIdlePeriod(ui64 timeNs) { + LastRegisteredPeriodIsBusy = false; + + ui64 lastTimeNs = LastTimeNs.load(); + if (timeNs < lastTimeNs) { + // when time goes back, mark all time slots as 'busy' + for (size_t i = 0u; i < TIME_SLOT_COUNT; ++i) { + TimeSlots[i] = TIME_SLOT_MAX_VALUE; + } + LastTimeNs = timeNs; + return; + } + + // lastTimeNs <= timeNs + ui64 firstSlotNumber = lastTimeNs / TIME_SLOT_LENGTH_NS; + ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT; + ui64 lastSlotNumber = timeNs / TIME_SLOT_LENGTH_NS; + ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT; + + if (firstSlotNumber == lastSlotNumber) { + ui32 slotLengthNs = timeNs - lastTimeNs; + ui32 slotPartsCount = slotLengthNs / TIME_SLOT_PART_LENGTH_NS; + + AtomicSubBound(TimeSlots[firstSlotIndex], slotPartsCount); + + LastTimeNs = timeNs; + return; + } + + ui32 firstSlotLengthNs = TIME_SLOT_LENGTH_NS - (lastTimeNs % TIME_SLOT_LENGTH_NS); + ui32 firstSlotPartsCount = (firstSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + ui32 lastSlotLengthNs = timeNs % TIME_SLOT_LENGTH_NS; + ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + + // process first time slot + AtomicSubBound(TimeSlots[firstSlotIndex], firstSlotPartsCount); + + // process complete time slots + UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, 0); + + // process last time slot + AtomicSubBound(TimeSlots[lastSlotIndex], lastSlotPartsCount); + + LastTimeNs = timeNs; + } +}; + +class TMinusOneThreadEstimator { +private: + template <typename T, int MaxSize> + class TArrayQueue { + public: + bool empty() const { + return FrontIndex == -1; + } + + bool full() const { + return (RearIndex + 1) % MaxSize == FrontIndex; + } + + T& front() { + return Data[FrontIndex]; + } + + bool push(T &&t) { + if (full()) { + return false; + } + + if (FrontIndex == -1) { + FrontIndex = 0; + } + + RearIndex = (RearIndex + 1) % MaxSize; + Data[RearIndex] = std::move(t); + return true; + } + + bool pop() { + if (empty()) { + return false; + } + + if (FrontIndex == RearIndex) { + FrontIndex = RearIndex = -1; + } else { + FrontIndex = (FrontIndex + 1) % MaxSize; + } + + return true; + } + + private: + int FrontIndex = -1; + int RearIndex = -1; + T Data[MaxSize]; + }; + +public: + template <typename T> + ui64 MaxLatencyIncreaseWithOneLessCpu(T **threadLoads, ui32 threadCount, ui64 timeNs, ui64 periodNs) { + Y_VERIFY(threadCount > 0); + + struct TTimeSlotData { + typename T::TimeSlotType Load; + ui64 Index; + }; + + ui64 lastTimeNs = timeNs; + for (auto threadIndex = 0u; threadIndex < threadCount; ++threadIndex) { + if (threadLoads[threadIndex]->LastRegisteredPeriodIsBusy.load()) { + lastTimeNs = std::min(lastTimeNs, threadLoads[threadIndex]->LastTimeNs.load()); + } else { + // make interval [lastTimeNs, timeNs] 'busy' + threadLoads[threadIndex]->template RegisterBusyPeriod<false>(timeNs, threadLoads[threadIndex]->LastTimeNs.load()); + } + } + + periodNs = std::min(T::GetTimeWindowLengthNs(), periodNs); + + ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0; + + ui64 firstSlotNumber = beginTimeNs / T::GetTimeSlotLengthNs(); + ui64 lastSlotNumber = (lastTimeNs + T::GetTimeSlotLengthNs() - 1) / T::GetTimeSlotLengthNs(); + + ui64 maxTimeSlotShiftCount = 0u; + TArrayQueue<TTimeSlotData, T::GetTimeSlotCount()> firstThreadLoadDataQueue; + + for (auto slotNumber = firstSlotNumber; slotNumber < lastSlotNumber; ++slotNumber) { + ui64 slotIndex = slotNumber % T::GetTimeSlotCount(); + + typename T::TimeSlotType firstThreadTimeSlotValue = threadLoads[0]->TimeSlots[slotIndex].load(); + + // distribute previous load of the first thread by other threads + auto foundIdleThread = false; + + for (auto threadIndex = 1u; threadIndex < threadCount; ++threadIndex) { + typename T::TimeSlotType thisThreadAvailableTimeSlotLoad = threadLoads[threadIndex]->GetTimeSlotMaxValue() - threadLoads[threadIndex]->TimeSlots[slotIndex].load(); + + while (!firstThreadLoadDataQueue.empty() && thisThreadAvailableTimeSlotLoad > 0) { + auto& firstThreadLoadData = firstThreadLoadDataQueue.front(); + + auto distributedLoad = std::min(thisThreadAvailableTimeSlotLoad, firstThreadLoadData.Load); + + thisThreadAvailableTimeSlotLoad -= distributedLoad; + firstThreadLoadData.Load -= distributedLoad; + + if (firstThreadLoadData.Load == 0) { + auto timeSlotShiftCount = slotIndex - firstThreadLoadData.Index; + maxTimeSlotShiftCount = std::max(maxTimeSlotShiftCount, timeSlotShiftCount); + auto res = firstThreadLoadDataQueue.pop(); + Y_VERIFY(res); + } + } + + if (thisThreadAvailableTimeSlotLoad == threadLoads[threadIndex]->GetTimeSlotMaxValue()) { + foundIdleThread = true; + } + } + + // distribute current load of the first thread by other threads + if (firstThreadTimeSlotValue > 0) { + if (foundIdleThread) { + // The current load of the first thead can be + // moved to the idle thread so there is nothing to do + } else { + // The current load of the first thread can be later + // processed by the following time slots of other threads + auto res = firstThreadLoadDataQueue.push({firstThreadTimeSlotValue, slotIndex}); + Y_VERIFY(res); + } + } + } + + if (!firstThreadLoadDataQueue.empty()) { + const auto& timeSlotData = firstThreadLoadDataQueue.front(); + auto timeSlotShiftCount = T::GetTimeSlotCount() - timeSlotData.Index; + maxTimeSlotShiftCount = std::max(maxTimeSlotShiftCount, timeSlotShiftCount); + } + + return maxTimeSlotShiftCount * T::GetTimeSlotLengthNs(); + } +}; diff --git a/library/cpp/actors/util/thread_load_log_ut.cpp b/library/cpp/actors/util/thread_load_log_ut.cpp new file mode 100644 index 0000000000..20e776cff6 --- /dev/null +++ b/library/cpp/actors/util/thread_load_log_ut.cpp @@ -0,0 +1,966 @@ +#include "thread_load_log.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/thread.h> +#include <util/system/types.h> +#include <util/system/sanitizers.h> + +#include <limits> + +Y_UNIT_TEST_SUITE(ThreadLoadLog) { + + Y_UNIT_TEST(TThreadLoad8BitSlotType) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint8_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeWindowLengthNs(), timeWindowLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotLengthNs(), timeSlotLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotCount(), timeSlotCount); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotMaxValue(), std::numeric_limits<TSlotType>::max()); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<TSlotType>::max() + 1); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartLengthNs(), T::GetTimeSlotLengthNs() / T::GetTimeSlotPartCount()); + } + + Y_UNIT_TEST(TThreadLoad16BitSlotType) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint16_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeWindowLengthNs(), timeWindowLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotLengthNs(), timeSlotLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotCount(), timeSlotCount); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotMaxValue(), std::numeric_limits<TSlotType>::max()); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<TSlotType>::max() + 1); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartLengthNs(), T::GetTimeSlotLengthNs() / T::GetTimeSlotPartCount()); + } + + Y_UNIT_TEST(TThreadLoad8BitSlotTypeWindowBusy) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint8_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + T threadLoad; + threadLoad.RegisterBusyPeriod(T::GetTimeWindowLengthNs()); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), T::GetTimeWindowLengthNs()); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), T::GetTimeSlotMaxValue()); + } + } + + Y_UNIT_TEST(TThreadLoad16BitSlotTypeWindowBusy) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint16_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + T threadLoad; + threadLoad.RegisterBusyPeriod(T::GetTimeWindowLengthNs()); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), T::GetTimeWindowLengthNs()); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), T::GetTimeSlotMaxValue()); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot1) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot2) { + using T = TThreadLoad<38400>; + + ui32 startNs = 2 * T::GetTimeSlotPartLengthNs(); + T threadLoad(startNs); + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot3) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot4) { + using T = TThreadLoad<38400>; + + ui32 startNs = 2 * T::GetTimeSlotPartLengthNs(); + T threadLoad(startNs); + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), (timeNs - startNs) / T::GetTimeSlotPartLengthNs()); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTwoTimeSlots1) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTwoTimeSlots2) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots1) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots2) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots3) { + using T = TThreadLoad<38400>; + + ui32 startNs = 3 * T::GetTimeSlotPartLengthNs(); + T threadLoad(startNs); + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 0; + threadLoad.RegisterBusyPeriod(timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot1) { + using T = TThreadLoad<38400>; + + ui64 timeNs = T::GetTimeSlotPartLengthNs(); + T threadLoad(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 3 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 4 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot2) { + using T = TThreadLoad<38400>; + + ui64 timeNs = T::GetTimeSlotPartLengthNs(); + T threadLoad(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 4 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 3); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot3) { + using T = TThreadLoad<38400>; + + ui64 timeNs = T::GetTimeSlotPartLengthNs(); + T threadLoad(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 4 * T::GetTimeSlotPartLengthNs() - 2; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 5 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 3); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots1) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots2) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots3) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots1) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots2) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots3) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots4) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() + 2 * threadLoad.GetTimeSlotPartLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotPartCount() - 2); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots5) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = timeNs + threadLoad.GetTimeWindowLengthNs() + threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodOverTimeWindow) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint8_t>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 5 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[3].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[4].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 5u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = timeNs + threadLoad.GetTimeWindowLengthNs() - 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[3].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[4].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 5u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsZeroShiftNs) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<std::uint16_t>::max() + 1); + + T *threadLoads[2]; + threadLoads[0] = new T; + threadLoads[1] = new T; + + for (ui64 i = 1; i < timeSlotCount; i += 2) { + threadLoads[0]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs()); + threadLoads[0]->RegisterBusyPeriod((i + 1) * T::GetTimeSlotLengthNs()); + } + + for (ui64 i = 1; i < timeSlotCount; i += 2) { + threadLoads[1]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs()); + threadLoads[1]->RegisterIdlePeriod((i + 1) * T::GetTimeSlotLengthNs()); + } + + TMinusOneThreadEstimator estimator; + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, 2, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + UNIT_ASSERT_VALUES_EQUAL(value, 0); + + delete threadLoads[0]; + delete threadLoads[1]; + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsOneTimeSlotShift1) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 2; i < threadLoads[t]->GetTimeSlotCount(); i += 2) { + threadLoads[t]->RegisterIdlePeriod((i - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterIdlePeriod((threadLoads[t]->GetTimeSlotCount() - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsOneTimeSlotShift2) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 2; i < threadLoads[t]->GetTimeSlotCount(); i += 2) { + threadLoads[t]->RegisterBusyPeriod((i - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterBusyPeriod((threadLoads[t]->GetTimeSlotCount() - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 0) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 0) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift1) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 4; i < threadLoads[t]->GetTimeSlotCount(); i += 4) { + threadLoads[t]->RegisterIdlePeriod((i - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterIdlePeriod((threadLoads[t]->GetTimeSlotCount() - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 2 || s % 4 == 3) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 2 || s % 4 == 3) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[s].load(), 0); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift2) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 4; i < threadLoads[t]->GetTimeSlotCount(); i += 4) { + threadLoads[t]->RegisterBusyPeriod((i - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterBusyPeriod((threadLoads[t]->GetTimeSlotCount() - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 0 || s % 4 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 0 || s % 4 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift3) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + auto timeNs = T::GetTimeWindowLengthNs() - 1.5 * T::GetTimeSlotLengthNs(); + threadLoads[t]->RegisterIdlePeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs); + + timeNs = T::GetTimeWindowLengthNs(); + threadLoads[t]->RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs); + + for (ui64 s = 0; s + 2 < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 2].load(), T::GetTimeSlotPartCount() / 2); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 1].load(), T::GetTimeSlotMaxValue()); + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + for (ui64 s = 0; s + 2 < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 2].load(), T::GetTimeSlotPartCount() / 2); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 1].load(), T::GetTimeSlotMaxValue()); + } + + UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimator16ThreadLoadsAllTimeSlots) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 16; + constexpr auto estimatesCount = 16; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + for (auto e = 0u; e < estimatesCount; ++e) { + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + auto timeNs = threadLoads[t]->GetTimeWindowLengthNs(); + threadLoads[t]->RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs); + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } + } + + ui64 result = 0; + { + THPTimer timer; + TMinusOneThreadEstimator estimator; + result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + // output in microseconds + auto passed = timer.Passed() * 1000000; + Y_UNUSED(passed); + // Cerr << "timer : " << passed << " " << __LINE__ << Endl; + } + + for (ui64 t = 0; t < threadCount; ++t) { + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), T::GetTimeWindowLengthNs()); + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeWindowLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + } +} diff --git a/library/cpp/mime/types/mime.cpp b/library/cpp/mime/types/mime.cpp index e4cbcc86eb..74eeabea48 100644 --- a/library/cpp/mime/types/mime.cpp +++ b/library/cpp/mime/types/mime.cpp @@ -250,5 +250,6 @@ const char* MimeNames[MIME_MAX] = { "woff", // MIME_WOFF // 43 "woff2", // MIME_WOFF2 // 44 "ttf", // MIME_TTF // 45 - "webmanifest" // MIME_WEBMANIFEST // 46 + "webmanifest", // MIME_WEBMANIFEST // 46 + "cbor", // MIME_CBOR // 47 }; diff --git a/library/cpp/string_utils/CMakeLists.txt b/library/cpp/string_utils/CMakeLists.txt index d256782733..bbdcba85d9 100644 --- a/library/cpp/string_utils/CMakeLists.txt +++ b/library/cpp/string_utils/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(base64) +add_subdirectory(csv) add_subdirectory(indent_text) add_subdirectory(levenshtein_diff) add_subdirectory(parse_size) diff --git a/library/cpp/string_utils/csv/CMakeLists.txt b/library/cpp/string_utils/csv/CMakeLists.txt new file mode 100644 index 0000000000..7dffad3566 --- /dev/null +++ b/library/cpp/string_utils/csv/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-string_utils-csv) +target_link_libraries(cpp-string_utils-csv PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(cpp-string_utils-csv PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/string_utils/csv/csv.cpp +) diff --git a/library/cpp/string_utils/csv/csv.cpp b/library/cpp/string_utils/csv/csv.cpp new file mode 100644 index 0000000000..218473c62c --- /dev/null +++ b/library/cpp/string_utils/csv/csv.cpp @@ -0,0 +1,82 @@ +#include "csv.h" + +TStringBuf NCsvFormat::CsvSplitter::Consume() { + if (Begin == End) { + return nullptr; + } + TString::iterator TokenStart = Begin; + TString::iterator TokenEnd = Begin; + if (Quote == '\0') { + while (1) { + if (TokenEnd == End || *TokenEnd == Delimeter) { + Begin = TokenEnd; + return TStringBuf(TokenStart, TokenEnd); + } + ++TokenEnd; + } + } else { + bool Escape = false; + if (*Begin == Quote) { + Escape = true; + ++TokenStart; + ++TokenEnd; + Y_ENSURE(TokenStart != End, TStringBuf("RFC4180 violation: quotation mark must be followed by something")); + } + while (1) { + if (TokenEnd == End || (!Escape && *TokenEnd == Delimeter)) { + Begin = TokenEnd; + return TStringBuf(TokenStart, TokenEnd); + } else if (*TokenEnd == Quote) { + Y_ENSURE(Escape, TStringBuf("RFC4180 violation: quotation mark must be in the escaped string only")); + if (TokenEnd + 1 == End) { + Begin = TokenEnd + 1; + } else if (*(TokenEnd + 1) == Delimeter) { + Begin = TokenEnd + 1; + } else if (*(TokenEnd + 1) == Quote) { + CustomStringBufs.push_back(TStringBuf(TokenStart, (TokenEnd + 1))); + TokenEnd += 2; + TokenStart = TokenEnd; + continue; + } else { + Y_ENSURE(false, TStringBuf("RFC4180 violation: in escaped string quotation mark must be followed by a delimiter, EOL or another quotation mark")); + } + if (CustomStringBufs.size()) { + CustomString.clear(); + for (auto CustomStringBuf : CustomStringBufs) { + CustomString += TString{ CustomStringBuf }; + } + CustomString += TString{ TStringBuf(TokenStart, TokenEnd) }; + CustomStringBufs.clear(); + return TStringBuf(CustomString); + } else { + return TStringBuf(TokenStart, TokenEnd); + } + } + ++TokenEnd; + } + } +}; + +TString NCsvFormat::TLinesSplitter::ConsumeLine() { + bool Escape = false; + TString result; + TString line; + while (Input.ReadLine(line)) { + for (auto it = line.begin(); it != line.end(); ++it) { + if (*it == Quote) { + Escape = !Escape; + } + } + if (!result) { + result = line; + } else { + result += line; + } + if (!Escape) { + break; + } else { + result += "\n"; + } + } + return result; +}; diff --git a/library/cpp/string_utils/csv/csv.h b/library/cpp/string_utils/csv/csv.h new file mode 100644 index 0000000000..8cb96e6bb9 --- /dev/null +++ b/library/cpp/string_utils/csv/csv.h @@ -0,0 +1,64 @@ +#pragma once + +#include <util/generic/yexception.h> +#include <util/generic/strbuf.h> +#include <util/generic/vector.h> +#include <util/stream/input.h> + +/* + Split string by rfc4180 +*/ + +namespace NCsvFormat { + class TLinesSplitter { + private: + IInputStream& Input; + const char Quote; + public: + TLinesSplitter(IInputStream& input, const char quote = '"') + : Input(input) + , Quote(quote) { + } + TString ConsumeLine(); + }; + + class CsvSplitter { + public: + CsvSplitter(TString& data, const char delimeter = ',', const char quote = '"') + // quote = '\0' ignores quoting in values and words like simple split + : Delimeter(delimeter) + , Quote(quote) + , Begin(data.begin()) + , End(data.end()) + { + } + + bool Step() { + if (Begin == End) { + return false; + } + ++Begin; + return true; + } + + TStringBuf Consume(); + explicit operator TVector<TString>() { + TVector<TString> ret; + + do { + TStringBuf buf = Consume(); + ret.push_back(TString{buf}); + } while (Step()); + + return ret; + } + + private: + const char Delimeter; + const char Quote; + TString::iterator Begin; + const TString::const_iterator End; + TString CustomString; + TVector<TStringBuf> CustomStringBufs; + }; +} |