aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-02-09 11:44:35 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-02-09 11:46:17 +0300
commitb0967c30d3706b650b679fe119b6bd7b0924d328 (patch)
tree25579dfda238c2cc5b00324878303b3a05d09f45 /library
parent9b78acb9998e4a817a21fe60443c7c5d6a06b947 (diff)
downloadydb-22.5.10.tar.gz
Ydb stable 22-5-1022.5.10stable-22-5
x-stable-origin-commit: f696baac1a4b8d48eb52b52b35930eef6d0eab42
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/CMakeLists.txt1
-rw-r--r--library/cpp/actors/core/actor_ut.cpp6
-rw-r--r--library/cpp/actors/core/actorsystem.h45
-rw-r--r--library/cpp/actors/core/balancer.cpp26
-rw-r--r--library/cpp/actors/core/balancer.h3
-rw-r--r--library/cpp/actors/core/config.h11
-rw-r--r--library/cpp/actors/core/cpu_manager.cpp10
-rw-r--r--library/cpp/actors/core/cpu_manager.h2
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp504
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h67
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp352
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp22
-rw-r--r--library/cpp/actors/core/executor_pool_united.h2
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp4
-rw-r--r--library/cpp/actors/core/executor_thread.cpp30
-rw-r--r--library/cpp/actors/core/harmonizer.cpp431
-rw-r--r--library/cpp/actors/core/harmonizer.h30
-rw-r--r--library/cpp/actors/core/mon_stats.h12
-rw-r--r--library/cpp/actors/core/probes.h24
-rw-r--r--library/cpp/actors/core/worker_context.h1
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h25
-rw-r--r--library/cpp/actors/helpers/selfping_actor.cpp47
-rw-r--r--library/cpp/actors/helpers/selfping_actor.h4
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp6
-rw-r--r--library/cpp/actors/util/CMakeLists.txt1
-rw-r--r--library/cpp/actors/util/cpu_load_log.h227
-rw-r--r--library/cpp/actors/util/cpu_load_log_ut.cpp275
-rw-r--r--library/cpp/actors/util/thread_load_log.h363
-rw-r--r--library/cpp/actors/util/thread_load_log_ut.cpp966
-rw-r--r--library/cpp/mime/types/mime.cpp3
-rw-r--r--library/cpp/string_utils/CMakeLists.txt1
-rw-r--r--library/cpp/string_utils/csv/CMakeLists.txt17
-rw-r--r--library/cpp/string_utils/csv/csv.cpp82
-rw-r--r--library/cpp/string_utils/csv/csv.h64
34 files changed, 3288 insertions, 376 deletions
diff --git a/library/cpp/actors/core/CMakeLists.txt b/library/cpp/actors/core/CMakeLists.txt
index 64c617307c..51379561db 100644
--- a/library/cpp/actors/core/CMakeLists.txt
+++ b/library/cpp/actors/core/CMakeLists.txt
@@ -42,6 +42,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_io.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_thread.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/interconnect.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index a6752f7d4f..ab53e3ec3e 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -543,8 +543,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
setup->NodeId = 0;
setup->ExecutorsCount = 1;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
+
+ ui64 ts = GetCycleCountFast();
+ THolder<IHarmonizer> harmonizer(MakeHarmonizer(ts));
for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic");
+ setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic", harmonizer.Get());
+ harmonizer->AddPool(setup->Executors[i].Get());
}
setup->Scheduler = new TBasicSchedulerThread;
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 40499d7586..4801350067 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -124,10 +124,55 @@ namespace NActors {
return 1;
}
+ virtual i16 GetPriority() const {
+ return 0;
+ }
+
// generic
virtual TAffinity* Affinity() const = 0;
virtual void SetRealTimeMode() const {}
+
+ virtual ui32 GetThreadCount() const {
+ return 1;
+ };
+
+ virtual void SetThreadCount(ui32 threads) {
+ Y_UNUSED(threads);
+ }
+
+ virtual i16 GetBlockingThreadCount() const {
+ return 0;
+ }
+
+ virtual i16 GetDefaultThreadCount() const {
+ return 1;
+ }
+
+ virtual i16 GetMinThreadCount() const {
+ return 1;
+ }
+
+ virtual i16 GetMaxThreadCount() const {
+ return 1;
+
+ }
+
+ virtual bool IsThreadBeingStopped(i16 threadIdx) const {
+ Y_UNUSED(threadIdx);
+ return false;
+ }
+
+ virtual double GetThreadConsumedUs(i16 threadIdx) {
+ Y_UNUSED(threadIdx);
+ return 0.0;
+ }
+
+ virtual double GetThreadBookedUs(i16 threadIdx) {
+ Y_UNUSED(threadIdx);
+ return 0.0;
+ }
+
};
// could be proxy to in-pool schedulers (for NUMA-aware executors)
diff --git a/library/cpp/actors/core/balancer.cpp b/library/cpp/actors/core/balancer.cpp
index 3dcc45c56b..d82701bbfb 100644
--- a/library/cpp/actors/core/balancer.cpp
+++ b/library/cpp/actors/core/balancer.cpp
@@ -2,8 +2,9 @@
#include "probes.h"
-#include <library/cpp/actors/util/intrinsics.h>
+#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/intrinsics.h>
#include <util/system/spinlock.h>
@@ -27,11 +28,11 @@ namespace NActors {
TLevel() {}
- TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) {
+ TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) {
ScaleFactor = double(currentCpus) / cfg.Cpus;
- if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu
+ if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu
LoadClass = Underloaded;
- } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency
+ } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency
LoadClass = Overloaded;
} else {
LoadClass = Moderate;
@@ -82,6 +83,8 @@ namespace NActors {
TBalancerConfig Config;
public:
+
+ ui64 GetPeriodUs() override;
// Setup
TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
@@ -238,9 +241,12 @@ namespace NActors {
}
// Compute levels
- pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle);
- pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized
- pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1);
+ pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle,
+ pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
+ pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle,
+ 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized
+ pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1,
+ pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
// Prepare for balancing
pool.PrevCpus = pool.CurrentCpus;
@@ -263,7 +269,7 @@ namespace NActors {
TPool& from = **fromIter;
if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
- from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
+ from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
{
MoveCpu(from, to);
from.CurrentCpus--;
@@ -295,6 +301,10 @@ namespace NActors {
Lock.Release();
}
+ ui64 TBalancer::GetPeriodUs() {
+ return Config.PeriodUs;
+ }
+
IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
return new TBalancer(config, unitedPools, ts);
}
diff --git a/library/cpp/actors/core/balancer.h b/library/cpp/actors/core/balancer.h
index 9763ec79e1..e1f6f33bf3 100644
--- a/library/cpp/actors/core/balancer.h
+++ b/library/cpp/actors/core/balancer.h
@@ -10,6 +10,8 @@ namespace NActors {
ui64 Ts = 0; // Measurement timestamp
ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start
ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex
+ ui64 WorstActivationTimeUs = 0;
+ ui64 ExpectedLatencyIncreaseUs = 0;
};
// Pool cpu balancer
@@ -20,6 +22,7 @@ namespace NActors {
virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0;
virtual void Balance() = 0;
virtual void Unlock() = 0;
+ virtual ui64 GetPeriodUs() = 0;
// TODO: add method for reconfiguration on fly
};
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 0d65815fd9..0bf4b871d7 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -41,6 +41,10 @@ namespace NActors {
ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
int RealtimePriority = 0;
ui32 MaxActivityType = 5;
+ i16 MinThreadCount = 0;
+ i16 MaxThreadCount = 0;
+ i16 DefaultThreadCount = 0;
+ i16 Priority = 0;
};
struct TIOExecutorPoolConfig {
@@ -88,11 +92,18 @@ namespace NActors {
TBalancerConfig Balancer;
};
+ struct TSelfPingInfo {
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs;
+ };
+
struct TCpuManagerConfig {
TUnitedWorkersConfig UnitedWorkers;
TVector<TBasicExecutorPoolConfig> Basic;
TVector<TIOExecutorPoolConfig> IO;
TVector<TUnitedExecutorPoolConfig> United;
+ TVector<TSelfPingInfo> PingInfoByPool;
ui32 GetExecutorsCount() const {
return Basic.size() + IO.size() + United.size();
diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp
index 39089b5d83..0736caa539 100644
--- a/library/cpp/actors/core/cpu_manager.cpp
+++ b/library/cpp/actors/core/cpu_manager.cpp
@@ -16,10 +16,18 @@ namespace NActors {
UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
}
+ ui64 ts = GetCycleCountFast();
+ Harmonizer.Reset(MakeHarmonizer(ts));
+
Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx].Reset(CreateExecutorPool(excIdx));
+ if (excIdx < Config.PingInfoByPool.size()) {
+ Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]);
+ } else {
+ Harmonizer->AddPool(Executors[excIdx].Get());
+ }
}
}
@@ -89,7 +97,7 @@ namespace NActors {
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
- return new TBasicExecutorPool(cfg);
+ return new TBasicExecutorPool(cfg, Harmonizer.Get());
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h
index 454035477b..42bede91b8 100644
--- a/library/cpp/actors/core/cpu_manager.h
+++ b/library/cpp/actors/core/cpu_manager.h
@@ -1,6 +1,7 @@
#pragma once
#include "actorsystem.h"
+#include "harmonizer.h"
#include "executor_pool_basic.h"
#include "executor_pool_io.h"
#include "executor_pool_united.h"
@@ -11,6 +12,7 @@ namespace NActors {
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
THolder<TUnitedWorkers> UnitedWorkers;
THolder<IBalancer> Balancer;
+ THolder<IHarmonizer> Harmonizer;
TCpuManagerConfig Config;
public:
explicit TCpuManager(THolder<TActorSystemSetup>& setup)
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 4dce16939a..00e557fcb4 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -18,11 +18,16 @@ namespace NActors {
ui32 threads,
ui64 spinThreshold,
const TString& poolName,
+ IHarmonizer *harmonizer,
TAffinity* affinity,
TDuration timePerMailbox,
ui32 eventsPerMailbox,
int realtimePriority,
- ui32 maxActivityType)
+ ui32 maxActivityType,
+ i16 minThreadCount,
+ i16 maxThreadCount,
+ i16 defaultThreadCount,
+ i16 priority)
: TExecutorPoolBase(poolId, threads, affinity, maxActivityType)
, SpinThreshold(spinThreshold)
, SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
@@ -34,21 +39,50 @@ namespace NActors {
, ThreadUtilization(0)
, MaxUtilizationCounter(0)
, MaxUtilizationAccumulator(0)
+ , WrongWakenedThreadCount(0)
, ThreadCount(threads)
+ , MinThreadCount(minThreadCount)
+ , MaxThreadCount(maxThreadCount)
+ , DefaultThreadCount(defaultThreadCount)
+ , Harmonizer(harmonizer)
+ , Priority(priority)
{
+ i16 limit = Min(threads, (ui32)Max<i16>());
+ if (DefaultThreadCount) {
+ DefaultThreadCount = Min(DefaultThreadCount, limit);
+ } else {
+ DefaultThreadCount = limit;
+ }
+
+ MaxThreadCount = Min(Max(MaxThreadCount, DefaultThreadCount), limit);
+
+ if (MinThreadCount) {
+ MinThreadCount = Max((i16)1, Min(MinThreadCount, DefaultThreadCount));
+ } else {
+ MinThreadCount = DefaultThreadCount;
+ }
+ ThreadCount = MaxThreadCount;
+ auto semaphore = TSemaphore();
+ semaphore.CurrentThreadCount = ThreadCount;
+ Semaphore = semaphore.ConverToI64();
}
- TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg)
+ TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer)
: TBasicExecutorPool(
cfg.PoolId,
cfg.Threads,
cfg.SpinThreshold,
cfg.PoolName,
+ harmonizer,
new TAffinity(cfg.Affinity),
cfg.TimePerMailbox,
cfg.EventsPerMailbox,
cfg.RealtimePriority,
- cfg.MaxActivityType
+ cfg.MaxActivityType,
+ cfg.MinThreadCount,
+ cfg.MaxThreadCount,
+ cfg.DefaultThreadCount,
+ cfg.Priority
)
{}
@@ -56,126 +90,166 @@ namespace NActors {
Threads.Destroy();
}
+ bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) {
+ do {
+ if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ if (threadCtx.BlockedPad.Park()) // interrupted
+ return true;
+ timers.HPStart = GetCycleCountFast();
+ timers.Blocked += timers.HPStart - timers.HPNow;
+ }
+ } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag));
+ return false;
+ }
+
+ bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) {
+ do {
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ if (threadCtx.Pad.Park()) // interrupted
+ return true;
+ timers.HPStart = GetCycleCountFast();
+ timers.Parked += timers.HPStart - timers.HPNow;
+ } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag));
+ return false;
+ }
+
+ void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) {
+ ui64 start = GetCycleCountFast();
+ bool doSpin = true;
+ while (true) {
+ for (ui32 j = 0; doSpin && j < 12; ++j) {
+ if (GetCycleCountFast() >= (start + SpinThresholdCycles)) {
+ doSpin = false;
+ break;
+ }
+ for (ui32 i = 0; i < 12; ++i) {
+ if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
+ SpinLockPause();
+ } else {
+ doSpin = false;
+ break;
+ }
+ }
+ }
+ if (!doSpin) {
+ break;
+ }
+ if (RelaxedLoad(&StopFlag)) {
+ break;
+ }
+ }
+ }
+
+ bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) {
+#if defined ACTORSLIB_COLLECT_EXEC_STATS
+ if (AtomicGetAndIncrement(ThreadUtilization) == 0) {
+ // Initially counter contains -t0, the pool start timestamp
+ // When the first thread goes to sleep we add t1, so the counter
+ // becomes t1-t0 >= 0, or the duration of max utilization so far.
+ // If the counter was negative and becomes positive, that means
+ // counter just turned into a duration and we should store that
+ // duration. Otherwise another thread raced with us and
+ // subtracted some other timestamp t2.
+ const i64 t = GetCycleCountFast();
+ const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
+ if (x < 0 && x + t > 0)
+ AtomicStore(&MaxUtilizationAccumulator, x + t);
+ }
+#endif
+
+ Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
+
+ if (SpinThreshold > 0 && !needToBlock) {
+ // spin configured period
+ AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE);
+ GoToSpin(threadCtx);
+ // then - sleep
+ if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) {
+ if (GoToSleep(threadCtx, timers)) { // interrupted
+ return true;
+ }
+ }
+ }
+ } else {
+ AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED);
+ if (GoToSleep(threadCtx, timers)) { // interrupted
+ return true;
+ }
+ }
+
+ Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
+
+#if defined ACTORSLIB_COLLECT_EXEC_STATS
+ if (AtomicDecrement(ThreadUtilization) == 0) {
+ // When we started sleeping counter contained t1-t0, or the
+ // last duration of max utilization. Now we subtract t2 >= t1,
+ // which turns counter negative again, and the next sleep cycle
+ // at timestamp t3 would be adding some new duration t3-t2.
+ // If the counter was positive and becomes negative that means
+ // there are no current races with other threads and we should
+ // store the last positive duration we observed. Multiple
+ // threads may be adding and subtracting values in potentially
+ // arbitrary order, which would cause counter to oscillate
+ // around zero. When it crosses zero is a good indication of a
+ // correct value.
+ const i64 t = GetCycleCountFast();
+ const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
+ if (x > 0 && x - t < 0)
+ AtomicStore(&MaxUtilizationAccumulator, x);
+ }
+#endif
+ return false;
+ }
+
ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
ui32 workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
- NHPTimer::STime elapsed = 0;
- NHPTimer::STime parked = 0;
- NHPTimer::STime blocked = 0;
- NHPTimer::STime hpstart = GetCycleCountFast();
- NHPTimer::STime hpnow;
+ TTimers timers;
+
+ if (Harmonizer) {
+ LWPROBE(TryToHarmonize, PoolId, PoolName);
+ Harmonizer->Harmonize(timers.HPStart);
+ }
TThreadCtx& threadCtx = Threads[workerId];
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) {
- do {
- if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.BlockedPad.Park()) // interrupted
- return 0;
- hpstart = GetCycleCountFast();
- blocked += hpstart - hpnow;
- }
- } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag));
+ if (GoToBeBlocked(threadCtx, timers)) { // interrupted
+ return 0;
+ }
}
- const TAtomic x = AtomicDecrement(Semaphore);
+ bool needToWait = false;
+ bool needToBlock = false;
- if (x < 0) {
-#if defined ACTORSLIB_COLLECT_EXEC_STATS
- if (AtomicGetAndIncrement(ThreadUtilization) == 0) {
- // Initially counter contains -t0, the pool start timestamp
- // When the first thread goes to sleep we add t1, so the counter
- // becomes t1-t0 >= 0, or the duration of max utilization so far.
- // If the counter was negative and becomes positive, that means
- // counter just turned into a duration and we should store that
- // duration. Otherwise another thread raced with us and
- // subtracted some other timestamp t2.
- const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
- if (x < 0 && x + t > 0)
- AtomicStore(&MaxUtilizationAccumulator, x + t);
- }
-#endif
+ TAtomic x = AtomicGet(Semaphore);
+ do {
+ i64 oldX = x;
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ needToBlock = semaphore.CurrentSleepThreadCount < 0;
+ needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount;
- Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
-
- if (SpinThreshold > 0) {
- // spin configured period
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE);
- ui64 start = GetCycleCountFast();
- bool doSpin = true;
- while (true) {
- for (ui32 j = 0; doSpin && j < 12; ++j) {
- if (GetCycleCountFast() >= (start + SpinThresholdCycles)) {
- doSpin = false;
- break;
- }
- for (ui32 i = 0; i < 12; ++i) {
- if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
- SpinLockPause();
- } else {
- doSpin = false;
- break;
- }
- }
- }
- if (!doSpin) {
- break;
- }
- if (RelaxedLoad(&StopFlag)) {
- break;
- }
- }
- // then - sleep
- if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) {
- do {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.Pad.Park()) // interrupted
- return 0;
- hpstart = GetCycleCountFast();
- parked += hpstart - hpnow;
- } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED);
- }
- }
- } else {
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED);
- do {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.Pad.Park()) // interrupted
- return 0;
- hpstart = GetCycleCountFast();
- parked += hpstart - hpnow;
- } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED);
+ semaphore.OldSemaphore--;
+ if (needToWait) {
+ semaphore.CurrentSleepThreadCount++;
}
- Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x);
+ if (x == oldX) {
+ break;
+ }
+ } while (!StopFlag);
-#if defined ACTORSLIB_COLLECT_EXEC_STATS
- if (AtomicDecrement(ThreadUtilization) == 0) {
- // When we started sleeping counter contained t1-t0, or the
- // last duration of max utilization. Now we subtract t2 >= t1,
- // which turns counter negative again, and the next sleep cycle
- // at timestamp t3 would be adding some new duration t3-t2.
- // If the counter was positive and becomes negative that means
- // there are no current races with other threads and we should
- // store the last positive duration we observed. Multiple
- // threads may be adding and subtracting values in potentially
- // arbitrary order, which would cause counter to oscillate
- // around zero. When it crosses zero is a good indication of a
- // correct value.
- const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
- if (x > 0 && x - t < 0)
- AtomicStore(&MaxUtilizationAccumulator, x);
+ if (needToWait) {
+ if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted
+ return 0;
}
-#endif
} else {
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING);
}
@@ -183,14 +257,14 @@ namespace NActors {
// ok, has work suggested, must dequeue
while (!RelaxedLoad(&StopFlag)) {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed);
- if (parked > 0) {
- wctx.AddParkedCycles(parked);
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.Elapsed);
+ if (timers.Parked > 0) {
+ wctx.AddParkedCycles(timers.Parked);
}
- if (blocked > 0) {
- wctx.AddBlockedCycles(blocked);
+ if (timers.Blocked > 0) {
+ wctx.AddBlockedCycles(timers.Blocked);
}
return activation;
}
@@ -201,22 +275,26 @@ namespace NActors {
return 0;
}
- inline void TBasicExecutorPool::WakeUpLoop() {
- for (ui32 i = 0;;) {
- TThreadCtx& threadCtx = Threads[i % PoolThreads];
- switch (AtomicLoad(&threadCtx.WaitingFlag)) {
+ inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
+ for (i16 i = 0;;) {
+ TThreadCtx& threadCtx = Threads[i];
+ TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag));
+ switch (state) {
case TThreadCtx::WS_NONE:
case TThreadCtx::WS_RUNNING:
- ++i;
- break;
- case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
- return;
+ if (++i >= MaxThreadCount) {
+ i = 0;
}
break;
+ case TThreadCtx::WS_ACTIVE:
case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
- threadCtx.Pad.Unpark();
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, state)) {
+ if (state == TThreadCtx::WS_BLOCKED) {
+ threadCtx.Pad.Unpark();
+ }
+ if (i >= currentThreadCount) {
+ AtomicIncrement(WrongWakenedThreadCount);
+ }
return;
}
break;
@@ -228,14 +306,42 @@ namespace NActors {
void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
Activations.Push(activation, revolvingCounter);
- const TAtomic x = AtomicIncrement(Semaphore);
- if (x <= 0) { // we must find someone to wake-up
- WakeUpLoop();
+ bool needToWakeUp = false;
+
+ TAtomic x = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ do {
+ needToWakeUp = semaphore.CurrentSleepThreadCount > 0;
+ i64 oldX = semaphore.ConverToI64();
+ semaphore.OldSemaphore++;
+ if (needToWakeUp) {
+ semaphore.CurrentSleepThreadCount--;
+ }
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), oldX);
+ if (x == oldX) {
+ break;
+ }
+ semaphore = TSemaphore::GetSemaphore(x);
+ } while (true);
+
+ if (needToWakeUp) { // we must find someone to wake-up
+ WakeUpLoop(semaphore.CurrentThreadCount);
}
}
void TBasicExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000);
+ poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
+ poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
+ if (Harmonizer) {
+ TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId);
+ poolStats.IsNeedy = stats.IsNeedy;
+ poolStats.IsStarved = stats.IsStarved;
+ poolStats.IsHoggish = stats.IsHoggish;
+ poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState;
+ poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState;
+ poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
+ }
statsCopy.resize(PoolThreads + 1);
// Save counters from the pool object
@@ -345,87 +451,71 @@ namespace NActors {
with_lock (ChangeThreadsLock) {
size_t prevCount = GetThreadCount();
AtomicSet(ThreadCount, threads);
- if (prevCount < threads) {
- for (size_t i = prevCount; i < threads; ++i) {
- bool repeat = true;
- while (repeat) {
- switch (AtomicGet(Threads[i].BlockedFlag)) {
- case TThreadCtx::BS_BLOCKING:
- if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) {
- // thread not entry to blocked loop
- repeat = false;
- }
- break;
- case TThreadCtx::BS_BLOCKED:
- // thread entry to blocked loop and we wake it
- AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE);
- Threads[i].BlockedPad.Unpark();
- repeat = false;
- break;
- default:
- // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block
- Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up");
- }
- }
- }
- } else if (prevCount > threads) {
- // at first, start to block
- for (size_t i = threads; i < prevCount; ++i) {
- Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE);
- AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING);
- }
- // after check need to wake up threads
- for (size_t idx = threads; idx < prevCount; ++idx) {
- TThreadCtx& threadCtx = Threads[idx];
- auto waitingFlag = AtomicGet(threadCtx.WaitingFlag);
- auto blockedFlag = AtomicGet(threadCtx.BlockedFlag);
- // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go.
- // Either go to sleep and it will have to wake up,
- // or go to execute task and after completion will be blocked.
- while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) {
- waitingFlag = AtomicGet(threadCtx.WaitingFlag);
- blockedFlag = AtomicGet(threadCtx.BlockedFlag);
- }
- // next states:
- // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block
- // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block
- // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing
- // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked
-
- if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) {
- // need wake up
- Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING);
-
- // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored
- constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask;
- ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter);
-
- Activations.Push(emptyMailBoxHint, revolvingCounter);
-
- auto x = AtomicIncrement(Semaphore);
- if (x <= 0) {
- // try wake up. if success then go to next thread
- switch (waitingFlag){
- case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
- continue;
- }
- break;
- case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
- threadCtx.Pad.Unpark();
- continue;
- }
- break;
- default:
- ; // other thread woke this sleeping thread
- }
- // if thread has already been awakened then we must awaken the other
- WakeUpLoop();
- }
- }
- }
+ TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore));
+ i64 oldX = semaphore.ConverToI64();
+ semaphore.CurrentThreadCount = threads;
+ if (threads > prevCount) {
+ semaphore.CurrentSleepThreadCount += (i64)threads - prevCount;
+ semaphore.OldSemaphore -= (i64)threads - prevCount;
+ } else {
+ semaphore.CurrentSleepThreadCount -= (i64)prevCount - threads;
+ semaphore.OldSemaphore += prevCount - threads;
}
+ AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX);
+ LWPROBE(ThreadCount, PoolId, PoolName, threads, MinThreadCount, MaxThreadCount, DefaultThreadCount);
+ }
+ }
+
+ i16 TBasicExecutorPool::GetDefaultThreadCount() const {
+ return DefaultThreadCount;
+ }
+
+ i16 TBasicExecutorPool::GetMinThreadCount() const {
+ return MinThreadCount;
+ }
+
+ i16 TBasicExecutorPool::GetMaxThreadCount() const {
+ return MaxThreadCount;
+ }
+
+ bool TBasicExecutorPool::IsThreadBeingStopped(i16 threadIdx) const {
+ if ((ui32)threadIdx >= PoolThreads) {
+ return false;
+ }
+ auto blockedFlag = AtomicGet(Threads[threadIdx].BlockedFlag);
+ if (blockedFlag == TThreadCtx::BS_BLOCKING) {
+ return true;
+ }
+ return false;
+ }
+
+ double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) {
+ if ((ui32)threadIdx >= PoolThreads) {
+ return 0;
+ }
+ TThreadCtx& threadCtx = Threads[threadIdx];
+ TExecutorThreadStats stats;
+ threadCtx.Thread->GetCurrentStats(stats);
+ return Ts2Us(stats.ElapsedTicks);
+ }
+
+ double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) {
+ if ((ui32)threadIdx >= PoolThreads) {
+ return 0;
}
+ TThreadCtx& threadCtx = Threads[threadIdx];
+ TExecutorThreadStats stats;
+ threadCtx.Thread->GetCurrentStats(stats);
+ return stats.CpuNs / 1000.0;
+ }
+
+ i16 TBasicExecutorPool::GetBlockingThreadCount() const {
+ TAtomic x = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ return -Min<i16>(semaphore.CurrentSleepThreadCount, 0);
+ }
+
+ i16 TBasicExecutorPool::GetPriority() const {
+ return Priority;
}
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 023190f7fe..cd94a998f1 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -4,6 +4,7 @@
#include "executor_thread.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
+#include "harmonizer.h"
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/actors/util/threadparkpad.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
@@ -45,6 +46,14 @@ namespace NActors {
}
};
+ struct TTimers {
+ NHPTimer::STime Elapsed = 0;
+ NHPTimer::STime Parked = 0;
+ NHPTimer::STime Blocked = 0;
+ NHPTimer::STime HPStart = GetCycleCountFast();
+ NHPTimer::STime HPNow;
+ };
+
const ui64 SpinThreshold;
const ui64 SpinThresholdCycles;
@@ -62,11 +71,42 @@ namespace NActors {
TAtomic ThreadUtilization;
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
+ TAtomic WrongWakenedThreadCount;
TAtomic ThreadCount;
TMutex ChangeThreadsLock;
+ i16 MinThreadCount;
+ i16 MaxThreadCount;
+ i16 DefaultThreadCount;
+ IHarmonizer *Harmonizer;
+
+ const i16 Priority = 0;
+
public:
+ struct TSemaphore {
+ i64 OldSemaphore = 0; // 34 bits
+ // Sign bit
+ i16 CurrentSleepThreadCount = 0; // 14 bits
+ // Sign bit
+ i16 CurrentThreadCount = 0; // 14 bits
+
+ inline i64 ConverToI64() {
+ i64 value = (1ll << 34) + OldSemaphore;
+ return value
+ | (((i64)CurrentSleepThreadCount + (1 << 14)) << 35)
+ | ((i64)CurrentThreadCount << 50);
+ }
+
+ static inline TSemaphore GetSemaphore(i64 value) {
+ TSemaphore semaphore;
+ semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34);
+ semaphore.CurrentSleepThreadCount = ((value >> 35) & 0x7fff) - (1 << 14);
+ semaphore.CurrentThreadCount = (value >> 50) & 0x3fff;
+ return semaphore;
+ }
+ };
+
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX;
@@ -74,12 +114,17 @@ namespace NActors {
ui32 threads,
ui64 spinThreshold,
const TString& poolName = "",
+ IHarmonizer *harmonizer = nullptr,
TAffinity* affinity = nullptr,
TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX,
int realtimePriority = 0,
- ui32 maxActivityType = 1);
- explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg);
+ ui32 maxActivityType = 1,
+ i16 minThreadCount = 0,
+ i16 maxThreadCount = 0,
+ i16 defaultThreadCount = 0,
+ i16 priority = 0);
+ explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer);
~TBasicExecutorPool();
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
@@ -102,10 +147,22 @@ namespace NActors {
void SetRealTimeMode() const override;
- ui32 GetThreadCount() const;
- void SetThreadCount(ui32 threads);
+ ui32 GetThreadCount() const override;
+ void SetThreadCount(ui32 threads) override;
+ i16 GetDefaultThreadCount() const override;
+ i16 GetMinThreadCount() const override;
+ i16 GetMaxThreadCount() const override;
+ bool IsThreadBeingStopped(i16 threadIdx) const override;
+ double GetThreadConsumedUs(i16 threadIdx) override;
+ double GetThreadBookedUs(i16 threadIdx) override;
+ i16 GetBlockingThreadCount() const override;
+ i16 GetPriority() const override;
private:
- void WakeUpLoop();
+ void WakeUpLoop(i16 currentThreadCount);
+ bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock);
+ void GoToSpin(TThreadCtx& threadCtx);
+ bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers);
+ bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers);
};
}
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 76dff693af..067fb30a1c 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -6,10 +6,14 @@
#include <library/cpp/actors/util/should_continue.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/actors/protos/unittests.pb.h>
using namespace NActors;
+#define VALUES_EQUAL(a, b, ...) \
+ UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount \
+ << ' ' << (i64)semaphore.CurrentThreadCount __VA_ARGS__);
+
////////////////////////////////////////////////////////////////////////////////
struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
@@ -90,138 +94,59 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)
Y_UNIT_TEST_SUITE(BasicExecutorPool) {
- Y_UNIT_TEST(DecreaseIncreaseThreadsCount) {
- const size_t msgCount = 1e4;
- const size_t size = 4;
- const size_t halfSize = size / 2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+ Y_UNIT_TEST(Semaphore) {
+ TBasicExecutorPool::TSemaphore semaphore;
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(0);
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
+ VALUES_EQUAL(0, semaphore.ConverToI64());
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(-1);
+ VALUES_EQUAL(-1, semaphore.ConverToI64());
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(1);
+ VALUES_EQUAL(1, semaphore.ConverToI64());
- executorPool->SetThreadCount(halfSize);
- TTestSenderActor* actors[size];
- TActorId actorIds[size];
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
+ for (i64 value = -1'000'000; value <= 1'000'000; ++value) {
+ VALUES_EQUAL(TBasicExecutorPool::TSemaphore::GetSemaphore(value).ConverToI64(), value);
}
- const int testCount = 2;
-
- TExecutorPoolStats poolStats[testCount];
- TVector<TExecutorThreadStats> statsCopy[testCount];
-
- for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
+ for (i8 sleepThreads = -10; sleepThreads <= 10; ++sleepThreads) {
+
+ semaphore = TBasicExecutorPool::TSemaphore();
+ semaphore.CurrentSleepThreadCount = sleepThreads;
+ i64 initialValue = semaphore.ConverToI64();
+
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(initialValue - 1);
+ VALUES_EQUAL(-1, semaphore.OldSemaphore);
+
+ i64 value = initialValue;
+ value -= 100;
+ for (i32 expected = -100; expected <= 100; ++expected) {
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
+ UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.CurrentThreadCount);
+ UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.CurrentThreadCount);
+ semaphore = TBasicExecutorPool::TSemaphore();
+ semaphore.OldSemaphore = expected;
+ semaphore.CurrentSleepThreadCount = sleepThreads;
+ UNIT_ASSERT_VALUES_EQUAL(semaphore.ConverToI64(), value);
+ value++;
}
- for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- Sleep(TDuration::MilliSeconds(100));
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Stop();
+ for (i32 expected = 101; expected >= -101; --expected) {
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
+ UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.CurrentThreadCount);
+ UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.CurrentThreadCount);
+ value--;
}
-
- executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
}
- for (size_t i = 1; i <= halfSize; ++i) {
- UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
-
- for (size_t i = halfSize + 1; i <= size; ++i) {
- UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
-
- executorPool->SetThreadCount(size);
-
- for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
- }
- for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- Sleep(TDuration::MilliSeconds(100));
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Stop();
- }
-
- executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
- }
-
- for (size_t i = 1; i <= size; ++i) {
- UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
- }
-
- Y_UNIT_TEST(ChangeCount) {
- const size_t msgCount = 1e3;
- const size_t size = 4;
- const size_t halfSize = size / 2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto begin = TInstant::Now();
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
- executorPool->SetThreadCount(halfSize);
-
- TTestSenderActor* actors[size];
- TActorId actorIds[size];
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actorIds[i], msgCount);
- }
- for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- const i32 N = 6;
- const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 };
-
- ui64 counter = 0;
-
- TTestSenderActor* changerActor = new TTestSenderActor([&]{
- executorPool->SetThreadCount(threadsCouns[counter]);
- counter++;
- if (counter == N) {
- counter = 0;
- }
- });
- TActorId changerActorId = actorSystem.Register(changerActor);
- changerActor->Start(changerActorId, msgCount);
- actorSystem.Send(changerActorId, new TEvMsg());
-
- while (true) {
- size_t maxCounter = 0;
- for (size_t i = 0; i < size; ++i) {
- maxCounter = Max(maxCounter, actors[i]->GetCounter());
- }
-
- if (maxCounter == 0) {
- break;
- }
-
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
-
- Sleep(TDuration::MilliSeconds(1));
- }
-
- changerActor->Stop();
+ //UNIT_ASSERT_VALUES_EQUAL_C(-1, TBasicExecutorPool::TSemaphore::GetSemaphore(value-1).OldSemaphore);
}
Y_UNIT_TEST(CheckCompleteOne) {
@@ -433,3 +358,182 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0);
}
}
+
+Y_UNIT_TEST_SUITE(ChangingThreadsCountInBasicExecutorPool) {
+
+ struct TMockState {
+ void ActorDo() {}
+ };
+
+ struct TTestActors {
+ const size_t Count;
+ TArrayHolder<TTestSenderActor*> Actors;
+ TArrayHolder<TActorId> ActorIds;
+
+ TTestActors(size_t count)
+ : Count(count)
+ , Actors(new TTestSenderActor*[count])
+ , ActorIds(new TActorId[count])
+ { }
+
+ void Start(TActorSystem &actorSystem, size_t msgCount) {
+ for (size_t i = 0; i < Count; ++i) {
+ Actors[i]->Start(Actors[i]->SelfId(), msgCount);
+ }
+ for (size_t i = 0; i < Count; ++i) {
+ actorSystem.Send(ActorIds[i], new TEvMsg());
+ }
+ }
+
+ void Stop() {
+ for (size_t i = 0; i < Count; ++i) {
+ Actors[i]->Stop();
+ }
+ }
+ };
+
+ template <typename TState = TMockState>
+ struct TTestCtx {
+ const size_t MaxThreadCount;
+ const size_t SendingMessageCount;
+ std::unique_ptr<TBasicExecutorPool> ExecutorPool;
+ THolder<TActorSystemSetup> Setup;
+ TActorSystem ActorSystem;
+
+ TState State;
+
+ TTestCtx(size_t maxThreadCount, size_t sendingMessageCount)
+ : MaxThreadCount(maxThreadCount)
+ , SendingMessageCount(sendingMessageCount)
+ , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50))
+ , Setup(GetActorSystemSetup(ExecutorPool.get()))
+ , ActorSystem(Setup)
+ {
+ }
+
+ TTestCtx(size_t maxThreadCount, size_t sendingMessageCount, const TState &state)
+ : MaxThreadCount(maxThreadCount)
+ , SendingMessageCount(sendingMessageCount)
+ , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50))
+ , Setup(GetActorSystemSetup(ExecutorPool.get()))
+ , ActorSystem(Setup)
+ , State(state)
+ {
+ }
+
+ ~TTestCtx() {
+ ExecutorPool.release();
+ }
+
+ TTestActors RegisterCheckActors(size_t actorCount) {
+ TTestActors res(actorCount);
+ for (size_t i = 0; i < actorCount; ++i) {
+ res.Actors[i] = new TTestSenderActor([&] {
+ State.ActorDo();
+ });
+ res.ActorIds[i] = ActorSystem.Register(res.Actors[i]);
+ }
+ return res;
+ }
+ };
+
+ struct TCheckingInFlightState {
+ TAtomic ExpectedMaximum = 0;
+ TAtomic CurrentInFlight = 0;
+
+ void ActorStartProcessing() {
+ ui32 inFlight = AtomicIncrement(CurrentInFlight);
+ ui32 maximum = AtomicGet(ExpectedMaximum);
+ if (maximum) {
+ UNIT_ASSERT_C(inFlight <= maximum, "inFlight# " << inFlight << " maximum# " << maximum);
+ }
+ }
+
+ void ActorStopProcessing() {
+ AtomicDecrement(CurrentInFlight);
+ }
+
+ void ActorDo() {
+ ActorStartProcessing();
+ NanoSleep(1'000'000);
+ ActorStopProcessing();
+ }
+ };
+
+ Y_UNIT_TEST(DecreaseIncreaseThreadCount) {
+ const size_t msgCount = 1e2;
+ const size_t size = 4;
+ const size_t testCount = 2;
+ TTestCtx<TCheckingInFlightState> ctx(size, msgCount);
+ ctx.ActorSystem.Start();
+
+ TVector<TExecutorThreadStats> statsCopy[testCount];
+
+ TTestActors testActors = ctx.RegisterCheckActors(size);
+
+ const size_t N = 6;
+ const size_t threadsCounts[N] = { 1, 3, 2, 3, 1, 4 };
+ for (ui32 idx = 0; idx < 4 * N; ++idx) {
+ size_t currentThreadCount = threadsCounts[idx];
+ ctx.ExecutorPool->SetThreadCount(currentThreadCount);
+ AtomicSet(ctx.State.ExpectedMaximum, currentThreadCount);
+
+ for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
+ testActors.Start(ctx.ActorSystem, msgCount);
+ Sleep(TDuration::MilliSeconds(100));
+ testActors.Stop();
+ }
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ ctx.ActorSystem.Stop();
+ }
+
+ Y_UNIT_TEST(ContiniousChangingThreadCount) {
+ const size_t msgCount = 1e2;
+ const size_t size = 4;
+
+ auto begin = TInstant::Now();
+ TTestCtx<TCheckingInFlightState> ctx(size, msgCount, TCheckingInFlightState{msgCount});
+ ctx.ActorSystem.Start();
+ TTestActors testActors = ctx.RegisterCheckActors(size);
+
+ testActors.Start(ctx.ActorSystem, msgCount);
+
+ const size_t N = 6;
+ const size_t threadsCouns[N] = { 1, 3, 2, 3, 1, 4 };
+
+ ui64 counter = 0;
+
+ TTestSenderActor* changerActor = new TTestSenderActor([&]{
+ ctx.State.ActorStartProcessing();
+ AtomicSet(ctx.State.ExpectedMaximum, 0);
+ ctx.ExecutorPool->SetThreadCount(threadsCouns[counter]);
+ NanoSleep(10'000'000);
+ AtomicSet(ctx.State.ExpectedMaximum, threadsCouns[counter]);
+ counter++;
+ if (counter == N) {
+ counter = 0;
+ }
+ ctx.State.ActorStopProcessing();
+ });
+ TActorId changerActorId = ctx.ActorSystem.Register(changerActor);
+ changerActor->Start(changerActorId, msgCount);
+ ctx.ActorSystem.Send(changerActorId, new TEvMsg());
+
+ while (true) {
+ size_t maxCounter = 0;
+ for (size_t i = 0; i < size; ++i) {
+ maxCounter = Max(maxCounter, testActors.Actors[i]->GetCounter());
+ }
+ if (maxCounter == 0) {
+ break;
+ }
+ auto now = TInstant::Now();
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
+ Sleep(TDuration::MilliSeconds(1));
+ }
+
+ changerActor->Stop();
+ ctx.ActorSystem.Stop();
+ }
+}
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index dac6245635..a2cb269280 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -7,6 +7,7 @@
#include "mailbox.h"
#include "scheduler_queue.h"
#include <library/cpp/actors/util/affinity.h>
+#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/futex.h>
#include <library/cpp/actors/util/intrinsics.h>
@@ -953,6 +954,8 @@ namespace NActors {
// Thread-safe per pool stats
// NOTE: It's guaranteed that cpu never executes two instance of the same pool
TVector<TExecutorThreadStats> PoolStats;
+ TCpuLoadLog<1024> LoadLog;
+
// Configuration
TCpuId CpuId;
@@ -1000,7 +1003,9 @@ namespace NActors {
}
bool ActiveWait(ui64 spinThresholdTs, TPoolId& result) {
- ui64 deadline = GetCycleCountFast() + spinThresholdTs;
+ ui64 ts = GetCycleCountFast();
+ LoadLog.RegisterBusyPeriod(ts);
+ ui64 deadline = ts + spinThresholdTs;
while (GetCycleCountFast() < deadline) {
for (ui32 i = 0; i < 12; ++i) {
TPoolId current = State.CurrentPool();
@@ -1008,6 +1013,7 @@ namespace NActors {
SpinLockPause();
} else {
result = current;
+ LoadLog.RegisterIdlePeriod(GetCycleCountFast());
return true; // wakeup
}
}
@@ -1269,15 +1275,25 @@ namespace NActors {
if (Pools[pool].IsUnited()) {
ui64 ElapsedTs = 0;
ui64 ParkedTs = 0;
+ TStackVec<TCpuLoadLog<1024>*, 128> logs;
+ ui64 worstActivationTimeUs = 0;
for (TCpu* cpu : Pools[pool].WakeOrderCpus) {
- const TExecutorThreadStats& cpuStats = cpu->PoolStats[pool];
+ TExecutorThreadStats& cpuStats = cpu->PoolStats[pool];
ElapsedTs += cpuStats.ElapsedTicks;
ParkedTs += cpuStats.ParkedTicks;
+ worstActivationTimeUs = Max(worstActivationTimeUs, cpuStats.WorstActivationTimeUs);
+ cpuStats.WorstActivationTimeUs = 0;
+ logs.push_back(&cpu->LoadLog);
}
+ ui64 minPeriodTs = Min(ui64(Us2Ts(Balancer->GetPeriodUs())), ui64((1024ull-2ull)*64ull*128ull*1024ull));
+ ui64 estimatedTs = MinusOneCpuEstimator.MaxLatencyIncreaseWithOneLessCpu(
+ &logs[0], logs.size(), ts, minPeriodTs);
TBalancerStats stats;
stats.Ts = ts;
stats.CpuUs = Ts2Us(ElapsedTs);
stats.IdleUs = Ts2Us(ParkedTs);
+ stats.ExpectedLatencyIncreaseUs = Ts2Us(estimatedTs);
+ stats.WorstActivationTimeUs = worstActivationTimeUs;
Balancer->SetPoolStats(pool, stats);
}
}
@@ -1332,11 +1348,13 @@ namespace NActors {
return result;
}
wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed());
+ cpu.LoadLog.RegisterBusyPeriod(GetCycleCountFast());
bool wakeup;
do {
wakeup = cpu.BlockedWait(result, Config.Balancer.PeriodUs * 1000);
wctx.AddParkedCycles(timeTracker.Elapsed());
} while (!wakeup);
+ cpu.LoadLog.RegisterIdlePeriod(GetCycleCountFast());
return result;
}
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index a090ba2466..0895b06462 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -8,6 +8,7 @@
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -34,6 +35,7 @@ namespace NActors {
TCpuAllocationConfig Allocation;
volatile bool StopFlag = false;
+ TMinusOneCpuEstimator<1024> MinusOneCpuEstimator;
public:
TUnitedWorkers(
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index a1595d8588..88b04e6472 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -111,6 +111,10 @@ struct TRoundRobinBalancer: public IBalancer {
State->Load(assigned, current);
State->AssignPool(NextPool[assigned]);
}
+
+ ui64 GetPeriodUs() override {
+ return 1000;
+ }
};
void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) {
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 446b651efd..4271dadab2 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -87,7 +87,7 @@ namespace NActors {
}
template <class T>
- inline TString SafeTypeName(T* t) {
+ inline TString SafeTypeName(const T* t) {
if (t == nullptr) {
return "nullptr";
}
@@ -98,11 +98,7 @@ namespace NActors {
}
}
- inline TString ActorTypeName(const IActor* actor, ui32 activityType) {
- return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)");
- }
-
- inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient,
+ inline void LwTraceSlowDelivery(IEventHandle* ev, const std::type_info* actorType, ui32 poolId, const TActorId& currentRecipient,
double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) {
const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
LWPROBE(EventSlowDelivery,
@@ -112,10 +108,10 @@ namespace NActors {
eventsExecutedBefore,
baseEv ? SafeTypeName(baseEv) : (ev ? ToString(ev->Type) : TString("nullptr")),
currentRecipient.ToString(),
- SafeTypeName(actor));
+ SafeTypeName(actorType));
}
- inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const IActor* actor, ui32 poolId, ui32 activityType,
+ inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const std::type_info* actorType, ui32 poolId,
const TActorId& currentRecipient, double eventMs) {
// Event could have been destroyed by actor->Receive();
const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
@@ -124,7 +120,7 @@ namespace NActors {
eventMs,
baseEv ? SafeTypeName(baseEv) : ToString(evTypeForTracing),
currentRecipient.ToString(),
- ActorTypeName(actor, activityType));
+ SafeTypeName(actorType));
}
template <typename TMailbox>
@@ -137,6 +133,7 @@ namespace NActors {
NHPTimer::STime hpprev = hpstart;
IActor* actor = nullptr;
+ const std::type_info* actorType = nullptr;
ui32 prevActivityType = std::numeric_limits<ui32>::max();
TActorId recipient;
for (ui32 executed = 0; executed < Ctx.EventsPerMailbox; ++executed) {
@@ -148,6 +145,9 @@ namespace NActors {
TActorContext ctx(*mailbox, *this, hpprev, recipient);
TlsActivationContext = &ctx;
+ // Since actor is not null there should be no exceptions
+ actorType = &typeid(*actor);
+
#ifdef USE_ACTOR_CALLSTACK
TCallstack::GetTlsCallstack() = ev->Callstack;
TCallstack::GetTlsCallstack().SetLinesToSkip();
@@ -165,7 +165,7 @@ namespace NActors {
i64 usecDeliv = Ctx.AddEventDeliveryStats(ev->SendTime, hpprev);
if (usecDeliv > 5000) {
double sinceActivationMs = NHPTimer::GetSeconds(hpprev - hpstart) * 1000.0;
- LwTraceSlowDelivery(ev.Get(), actor, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed);
+ LwTraceSlowDelivery(ev.Get(), actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed);
}
ui32 evTypeForTracing = ev->Type;
@@ -191,7 +191,7 @@ namespace NActors {
hpnow = GetCycleCountFast();
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
if (elapsed > 1000000) {
- LwTraceSlowEvent(ev.Get(), evTypeForTracing, actor, Ctx.PoolId, activityType, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
+ LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
}
// The actor might have been destroyed
@@ -200,6 +200,8 @@ namespace NActors {
CurrentRecipient = TActorId();
} else {
+ actorType = nullptr;
+
TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
if (nonDelivered.Get()) {
ActorSystem->Send(nonDelivered);
@@ -223,7 +225,7 @@ namespace NActors {
CyclesToDuration(hpnow - hpstart),
Ctx.WorkerId,
recipient.ToString(),
- SafeTypeName(actor));
+ SafeTypeName(actorType));
break;
}
@@ -239,7 +241,7 @@ namespace NActors {
CyclesToDuration(hpnow - hpstart),
Ctx.WorkerId,
recipient.ToString(),
- SafeTypeName(actor));
+ SafeTypeName(actorType));
break;
}
@@ -254,7 +256,7 @@ namespace NActors {
CyclesToDuration(hpnow - hpstart),
Ctx.WorkerId,
recipient.ToString(),
- SafeTypeName(actor));
+ SafeTypeName(actorType));
break;
}
} else {
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
new file mode 100644
index 0000000000..f318d8909c
--- /dev/null
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -0,0 +1,431 @@
+#include "harmonizer.h"
+
+#include "probes.h"
+#include "actorsystem.h"
+
+#include <library/cpp/actors/util/cpu_load_log.h>
+#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/intrinsics.h>
+
+#include <util/system/spinlock.h>
+
+#include <algorithm>
+
+namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+constexpr bool CheckBinaryPower(ui64 value) {
+ return !(value & (value - 1));
+}
+
+struct TValueHistory {
+ static constexpr ui64 HistoryBufferSize = 8;
+ static_assert(CheckBinaryPower(HistoryBufferSize));
+
+ double History[HistoryBufferSize] = {0.0};
+ ui64 HistoryIdx = 0;
+ ui64 LastTs = Max<ui64>();
+ double LastUs = 0.0;
+ double AccumulatedUs = 0.0;
+ ui64 AccumulatedTs = 0;
+
+ template <bool WithTail=false>
+ double GetAvgPartForLastSeconds(ui8 seconds) {
+ double sum = AccumulatedUs;
+ size_t idx = HistoryIdx;
+ ui8 leftSeconds = seconds;
+ do {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ if (WithTail || leftSeconds) {
+ sum += History[idx];
+ } else {
+ ui64 tsInSecond = Us2Ts(1'000'000.0);
+ sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond;
+ }
+ } while (leftSeconds);
+ double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0);
+ double avg = sum / duration;
+ return avg;
+ }
+
+ double GetAvgPart() {
+ return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
+ }
+
+ void Register(ui64 ts, double valueUs) {
+ if (ts < LastTs) {
+ LastTs = ts;
+ LastUs = valueUs;
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ return;
+ }
+ ui64 lastTs = std::exchange(LastTs, ts);
+ ui64 dTs = ts - lastTs;
+ double lastUs = std::exchange(LastUs, valueUs);
+ double dUs = valueUs - lastUs;
+ LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs);
+
+ if (dTs > Us2Ts(8'000'000.0)) {
+ dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
+ for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
+ History[idx] = dUs;
+ }
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ return;
+ }
+
+ while (dTs > 0) {
+ if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
+ AccumulatedTs += dTs;
+ AccumulatedUs += dUs;
+ break;
+ } else {
+ ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
+ double addUs = dUs * addTs / dTs;
+ dTs -= addTs;
+ dUs -= addUs;
+ History[HistoryIdx] = AccumulatedUs + addUs;
+ HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ }
+ }
+ }
+};
+
+struct TThreadInfo {
+ TValueHistory Consumed;
+ TValueHistory Booked;
+};
+
+struct TPoolInfo {
+ std::vector<TThreadInfo> ThreadInfo;
+ IExecutorPool* Pool = nullptr;
+ i16 DefaultThreadCount = 0;
+ i16 MinThreadCount = 0;
+ i16 MaxThreadCount = 0;
+ i16 Priority = 0;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs = 0;
+ ui64 LastUpdateTs = 0;
+
+ TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
+ TAtomic IncreasingThreadsByNeedyState = 0;
+ TAtomic DecreasingThreadsByStarvedState = 0;
+ TAtomic DecreasingThreadsByHoggishState = 0;
+
+ bool IsBeingStopped(i16 threadIdx);
+ double GetBooked(i16 threadIdx);
+ double GetlastSecondPoolBooked(i16 threadIdx);
+ double GetConsumed(i16 threadIdx);
+ double GetlastSecondPoolConsumed(i16 threadIdx);
+ void PullStats(ui64 ts);
+ i16 GetThreadCount();
+ void SetThreadCount(i16 threadCount);
+ bool IsAvgPingGood();
+};
+
+bool TPoolInfo::IsBeingStopped(i16 threadIdx) {
+ return Pool->IsThreadBeingStopped(threadIdx);
+}
+
+double TPoolInfo::GetBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
+}
+
+#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
+void TPoolInfo::PullStats(ui64 ts) {
+ for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
+ TThreadInfo &threadInfo = ThreadInfo[threadIdx];
+ threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx));
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
+ threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx));
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
+ }
+}
+#undef UNROLL_HISTORY
+
+i16 TPoolInfo::GetThreadCount() {
+ return Pool->GetThreadCount();
+}
+
+void TPoolInfo::SetThreadCount(i16 threadCount) {
+ Pool->SetThreadCount(threadCount);
+}
+
+bool TPoolInfo::IsAvgPingGood() {
+ bool res = true;
+ if (AvgPingCounter) {
+ res &= *AvgPingCounter > MaxAvgPingUs;
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
+ }
+ return res;
+}
+
+class THarmonizer: public IHarmonizer {
+private:
+ std::atomic<bool> IsDisabled = false;
+ TSpinLock Lock;
+ std::atomic<ui64> NextHarmonizeTs = 0;
+ std::vector<TPoolInfo> Pools;
+ std::vector<ui16> PriorityOrder;
+
+ void PullStats(ui64 ts);
+ void HarmonizeImpl(ui64 ts);
+ void CalculatePriorityOrder();
+public:
+ THarmonizer(ui64 ts);
+ virtual ~THarmonizer();
+ double Rescale(double value) const;
+ void Harmonize(ui64 ts) override;
+ void DeclareEmergency(ui64 ts) override;
+ void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
+ void Enable(bool enable) override;
+ TPoolHarmonizedStats GetPoolStats(i16 poolId) const override;
+};
+
+THarmonizer::THarmonizer(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+THarmonizer::~THarmonizer() {
+}
+
+double THarmonizer::Rescale(double value) const {
+ return Max(0.0, Min(1.0, value * (1.0/0.9)));
+}
+
+void THarmonizer::PullStats(ui64 ts) {
+ for (TPoolInfo &pool : Pools) {
+ pool.PullStats(ts);
+ }
+}
+
+Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
+ return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+}
+
+Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
+ return booked < currentThreadCount - 1;
+}
+
+void THarmonizer::HarmonizeImpl(ui64 ts) {
+ bool isStarvedPresent = false;
+ double booked = 0.0;
+ double consumed = 0.0;
+ double lastSecondBooked = 0.0;
+ i64 beingStopped = 0;
+ i64 total = 0;
+ TStackVec<size_t, 8> needyPools;
+ TStackVec<size_t, 8> hoggishPools;
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ total += pool.DefaultThreadCount;
+ double poolBooked = 0.0;
+ double poolConsumed = 0.0;
+ double lastSecondPoolBooked = 0.0;
+ double lastSecondPoolConsumed = 0.0;
+ beingStopped += pool.Pool->GetBlockingThreadCount();
+ for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
+ poolBooked += Rescale(pool.GetBooked(threadIdx));
+ lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx));
+ poolConsumed += Rescale(pool.GetConsumed(threadIdx));
+ lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx));
+ }
+ bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked);
+ if (isStarved) {
+ isStarvedPresent = true;
+ }
+ ui32 currentThreadCount = pool.GetThreadCount();
+ bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount;
+ if (pool.AvgPingCounter) {
+ if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
+ isNeedy = false;
+ } else {
+ pool.LastUpdateTs = ts;
+ }
+ }
+ if (isNeedy) {
+ needyPools.push_back(poolIdx);
+ }
+ bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
+ || IsHoggish(lastSecondPoolBooked, currentThreadCount);
+ if (isHoggish) {
+ hoggishPools.push_back(poolIdx);
+ }
+ booked += poolBooked;
+ consumed += poolConsumed;
+ AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2));
+ LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
+ }
+ double budget = total - Max(booked, lastSecondBooked);
+ if (budget < -0.1) {
+ isStarvedPresent = true;
+ }
+ double overbooked = consumed - booked;
+ if (isStarvedPresent) {
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ TStackVec<size_t> reorder;
+ for (size_t i = 0; i < Pools.size(); ++i) {
+ reorder.push_back(i);
+ }
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ if (threadCount > pool.DefaultThreadCount) {
+ pool.SetThreadCount(threadCount - 1);
+ AtomicIncrement(pool.DecreasingThreadsByStarvedState);
+ overbooked--;
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
+ }
+ } else {
+ for (size_t needyPoolIdx : needyPools) {
+ TPoolInfo &pool = Pools[needyPoolIdx];
+ if (budget >= 1.0) {
+ i64 threadCount = pool.GetThreadCount();
+ if (threadCount + 1 <= pool.MaxThreadCount) {
+ AtomicIncrement(pool.IncreasingThreadsByNeedyState);
+ pool.SetThreadCount(threadCount + 1);
+ budget -= 1.0;
+ LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ }
+ }
+ }
+ }
+ for (size_t hoggishPoolIdx : hoggishPools) {
+ TPoolInfo &pool = Pools[hoggishPoolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ if (threadCount > pool.MinThreadCount) {
+ AtomicIncrement(pool.DecreasingThreadsByHoggishState);
+ LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ pool.SetThreadCount(threadCount - 1);
+ }
+ }
+}
+
+void THarmonizer::CalculatePriorityOrder() {
+ PriorityOrder.resize(Pools.size());
+ Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
+ Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs].Priority != Pools[rhs].Priority) {
+ return Pools[lhs].Priority < Pools[rhs].Priority;
+ }
+ return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId;
+ });
+}
+
+void THarmonizer::Harmonize(ui64 ts) {
+ if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
+ return;
+ }
+ // Check again under the lock
+ if (IsDisabled) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true);
+ Lock.Release();
+ return;
+ }
+ // Will never reach this line disabled
+ ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
+ LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
+
+ if (PriorityOrder.empty()) {
+ CalculatePriorityOrder();
+ }
+
+ PullStats(ts);
+ HarmonizeImpl(ts);
+
+ Lock.Release();
+}
+
+void THarmonizer::DeclareEmergency(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
+ TGuard<TSpinLock> guard(Lock);
+ TPoolInfo poolInfo;
+ poolInfo.Pool = pool;
+ poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
+ poolInfo.MinThreadCount = pool->GetMinThreadCount();
+ poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
+ poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount);
+ poolInfo.Priority = pool->GetPriority();
+ pool->SetThreadCount(poolInfo.DefaultThreadCount);
+ if (pingInfo) {
+ poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
+ poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
+ poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
+ }
+ Pools.push_back(poolInfo);
+ PriorityOrder.clear();
+};
+
+void THarmonizer::Enable(bool enable) {
+ TGuard<TSpinLock> guard(Lock);
+ IsDisabled = enable;
+}
+
+IHarmonizer* MakeHarmonizer(ui64 ts) {
+ return new THarmonizer(ts);
+}
+
+TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const {
+ const TPoolInfo &pool = Pools[poolId];
+ ui64 flags = RelaxedLoad(&pool.LastFlags);
+ return TPoolHarmonizedStats {
+ .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
+ .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
+ .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .IsNeedy = static_cast<bool>(flags & 1),
+ .IsStarved = static_cast<bool>(flags & 2),
+ .IsHoggish = static_cast<bool>(flags & 4),
+ };
+}
+
+}
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
new file mode 100644
index 0000000000..61f13e43ac
--- /dev/null
+++ b/library/cpp/actors/core/harmonizer.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "defs.h"
+#include "config.h"
+
+namespace NActors {
+ class IExecutorPool;
+
+ struct TPoolHarmonizedStats {
+ ui64 IncreasingThreadsByNeedyState = 0;
+ ui64 DecreasingThreadsByStarvedState = 0;
+ ui64 DecreasingThreadsByHoggishState = 0;
+ bool IsNeedy = false;
+ bool IsStarved = false;
+ bool IsHoggish = false;
+ };
+
+ // Pool cpu harmonizer
+ class IHarmonizer {
+ public:
+ virtual ~IHarmonizer() {}
+ virtual void Harmonize(ui64 ts) = 0;
+ virtual void DeclareEmergency(ui64 ts) = 0;
+ virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0;
+ virtual void Enable(bool enable) = 0;
+ virtual TPoolHarmonizedStats GetPoolStats(i16 poolId) const = 0;
+ };
+
+ IHarmonizer* MakeHarmonizer(ui64 ts);
+}
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 6d482926d1..117d2ad41d 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -60,6 +60,14 @@ namespace NActors {
struct TExecutorPoolStats {
ui64 MaxUtilizationTime = 0;
+ ui64 IncreasingThreadsByNeedyState = 0;
+ ui64 DecreasingThreadsByStarvedState = 0;
+ ui64 DecreasingThreadsByHoggishState = 0;
+ i16 WrongWakenedThreadCount = 0;
+ i16 CurrentThreadCount = 0;
+ bool IsNeedy = false;
+ bool IsStarved = false;
+ bool IsHoggish = false;
};
struct TExecutorThreadStats {
@@ -69,6 +77,7 @@ namespace NActors {
ui64 NonDeliveredEvents = 0;
ui64 EmptyMailboxActivation = 0;
ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion)
+ ui64 WorstActivationTimeUs = 0;
NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
NHPTimer::STime BlockedTicks = 0;
@@ -111,6 +120,9 @@ namespace NActors {
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
CpuNs += RelaxedLoad(&other.CpuNs);
+ RelaxedStore(
+ &WorstActivationTimeUs,
+ std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index 4912d6dd26..11bbf81287 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -166,6 +166,30 @@
PROBE(MoveCpu, GROUPS("PoolCpuBalancer"), \
TYPES(ui32, ui64, TString, TString, ui32), \
NAMES("fromPoolId", "toPoolId", "fromPool", "toPool", "cpu")) \
+ PROBE(ThreadCount, GROUPS("BasicThreadPool"), \
+ TYPES(ui32, TString, ui32, ui32, ui32, ui32), \
+ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \
+ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \
+ NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \
+ PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, TString, ui32, ui32, ui32), \
+ NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \
+ PROBE(TryToHarmonize, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString), \
+ NAMES("poolId", "pool")) \
+ PROBE(SavedValues, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, TString, double, double, double, double, double, double, double, double), \
+ NAMES("poolId", "pool", "valueName", "[0]", "[1]", "[2]", "[3]", "[4]", "[5]", "[6]", "[7]")) \
+ PROBE(RegisterValue, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, ui64, ui64, double, double, double), \
+ NAMES("ts", "lastTs", "dTs", "8sTs", "us", "lastUs", "dUs")) \
+ PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, bool, bool), \
+ NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \
+ PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, ui64), \
+ NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \
/**/
LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER)
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index b4c37a7629..384a13c5ee 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -95,6 +95,7 @@ namespace NActors {
i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0;
double usec = NHPTimer::GetSeconds(ts) * 1000000.0;
Stats->ActivationTimeHistogram.Add(usec);
+ Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec);
return usec;
}
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 61d0b45780..b1217b1d63 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -124,6 +124,15 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
+ NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
+ NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
+ NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
+ NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
+ NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
+ NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
+
THistogramCounters LegacyActivationTimeHistogram;
NMonitoring::THistogramPtr ActivationTimeHistogram;
@@ -167,6 +176,14 @@ private:
MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
+ WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
+ CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
+ IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
+ IsStarved = PoolGroup->GetCounter("IsStarved", false);
+ IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
+ IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
+ DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
+ DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
ActivationTimeHistogram = PoolGroup->GetHistogram(
@@ -203,6 +220,14 @@ private:
*MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
*MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
*MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
+ *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
+ *CurrentThreadCount = poolStats.CurrentThreadCount;
+ *IsNeedy = poolStats.IsNeedy;
+ *IsStarved = poolStats.IsStarved;
+ *IsHoggish = poolStats.IsHoggish;
+ *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
+ *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
+ *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
ActivationTimeHistogram->Reset();
diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp
index f9bfaf8dc0..dc383f8c4c 100644
--- a/library/cpp/actors/helpers/selfping_actor.cpp
+++ b/library/cpp/actors/helpers/selfping_actor.cpp
@@ -61,10 +61,14 @@ struct TAvgOperation {
class TSelfPingActor : public TActorBootstrapped<TSelfPingActor> {
private:
const TDuration SendInterval;
- const NMonitoring::TDynamicCounters::TCounterPtr Counter;
+ const NMonitoring::TDynamicCounters::TCounterPtr MaxPingCounter;
+ const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ const NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter;
- NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow;
+ NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> MaxPingSlidingWindow;
+ NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSlidingWindow;
+ NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> AvgPingSmallSlidingWindow;
NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow;
THPTimer Timer;
@@ -74,12 +78,19 @@ public:
return SELF_PING_ACTOR;
}
- TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ TSelfPingActor(TDuration sendInterval,
+ const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
: SendInterval(sendInterval)
- , Counter(counter)
+ , MaxPingCounter(maxPingCounter)
+ , AvgPingCounter(avgPingCounter)
+ , AvgPingCounterWithSmallWindow(avgPingSmallWindowCounter)
, CalculationTimeCounter(calculationTimeCounter)
- , SlidingWindow(TDuration::Seconds(15), 100)
+ , MaxPingSlidingWindow(TDuration::Seconds(15), 100)
+ , AvgPingSlidingWindow(TDuration::Seconds(15), 100)
+ , AvgPingSmallSlidingWindow(TDuration::Seconds(1), 100)
, CalculationSlidingWindow(TDuration::Seconds(15), 100)
{
}
@@ -154,11 +165,23 @@ public:
const double passedTime = hpNow - e.TimeStart;
const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0;
- *Counter = SlidingWindow.Update(delayUs, now);
+ if (MaxPingCounter) {
+ *MaxPingCounter = MaxPingSlidingWindow.Update(delayUs, now);
+ }
+ if (AvgPingCounter) {
+ auto res = AvgPingSlidingWindow.Update({1, delayUs}, now);
+ *AvgPingCounter = double(res.Sum) / double(res.Count + 1);
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ auto res = AvgPingSmallSlidingWindow.Update({1, delayUs}, now);
+ *AvgPingCounterWithSmallWindow = double(res.Sum) / double(res.Count + 1);
+ }
- ui64 d = MeasureTaskDurationNs();
- auto res = CalculationSlidingWindow.Update({1, d}, now);
- *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
+ if (CalculationTimeCounter) {
+ ui64 d = MeasureTaskDurationNs();
+ auto res = CalculationSlidingWindow.Update({1, d}, now);
+ *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
+ }
SchedulePing(ctx, hpNow);
}
@@ -174,10 +197,12 @@ private:
IActor* CreateSelfPingActor(
TDuration sendInterval,
- const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
{
- return new TSelfPingActor(sendInterval, counter, calculationTimeCounter);
+ return new TSelfPingActor(sendInterval, maxPingCounter, avgPingCounter, avgPingSmallWindowCounter, calculationTimeCounter);
}
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h
index d7d07f9fa8..a976a4f425 100644
--- a/library/cpp/actors/helpers/selfping_actor.h
+++ b/library/cpp/actors/helpers/selfping_actor.h
@@ -7,7 +7,9 @@ namespace NActors {
NActors::IActor* CreateSelfPingActor(
TDuration sendInterval,
- const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& maxPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingCounter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& avgPingSmallWindowCounter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter);
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index 459635fa24..542f817755 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -22,13 +22,17 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr());
NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr());
+ NMonitoring::TDynamicCounters::TCounterPtr counter3(new NMonitoring::TCounterForPtr());
+ NMonitoring::TDynamicCounters::TCounterPtr counter4(new NMonitoring::TCounterForPtr());
auto actor = CreateSelfPingActor(
TDuration::MilliSeconds(100), // sendInterval (unused in test)
- counter, counter2);
+ counter, counter2, counter3, counter4);
UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);
UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counter3->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counter4->Val(), 0);
const TActorId actorId = runtime->Register(actor);
Y_UNUSED(actorId);
diff --git a/library/cpp/actors/util/CMakeLists.txt b/library/cpp/actors/util/CMakeLists.txt
index 40d958d75e..233e1fe0fc 100644
--- a/library/cpp/actors/util/CMakeLists.txt
+++ b/library/cpp/actors/util/CMakeLists.txt
@@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-util PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ library-cpp-pop_count
)
target_sources(cpp-actors-util PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp
diff --git a/library/cpp/actors/util/cpu_load_log.h b/library/cpp/actors/util/cpu_load_log.h
new file mode 100644
index 0000000000..e4ae612246
--- /dev/null
+++ b/library/cpp/actors/util/cpu_load_log.h
@@ -0,0 +1,227 @@
+#pragma once
+
+#include "defs.h"
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <library/cpp/pop_count/popcount.h>
+
+static constexpr ui64 BitDurationNs = 131'072; // A power of 2
+
+template <ui64 DataSize>
+struct TCpuLoadLog {
+ static constexpr ui64 BitsSize = DataSize * 64;
+ TAtomic LastTimeNs = 0;
+ ui64 Data[DataSize];
+
+ TCpuLoadLog() {
+ LastTimeNs = 0;
+ for (size_t i = 0; i < DataSize; ++i) {
+ Data[i] = 0;
+ }
+ }
+
+ TCpuLoadLog(ui64 timeNs) {
+ LastTimeNs = timeNs;
+ for (size_t i = 0; i < DataSize; ++i) {
+ Data[i] = 0;
+ }
+ }
+
+ void RegisterBusyPeriod(ui64 timeNs) {
+ RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs));
+ }
+
+ template <bool ModifyLastTime>
+ void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) {
+ timeNs |= 1ull;
+ if (timeNs < lastTimeNs) {
+ for (ui64 i = 0; i < DataSize; ++i) {
+ AtomicSet(Data[i], ~0ull);
+ }
+ if (ModifyLastTime) {
+ AtomicSet(LastTimeNs, timeNs);
+ }
+ return;
+ }
+ const ui64 lastIdx = timeNs / BitDurationNs;
+ const ui64 curIdx = lastTimeNs / BitDurationNs;
+ ui64 firstElementIdx = curIdx / 64;
+ const ui64 firstBitIdx = curIdx % 64;
+ const ui64 lastElementIdx = lastIdx / 64;
+ const ui64 lastBitIdx = lastIdx % 64;
+ if (firstElementIdx == lastElementIdx) {
+ ui64 prevValue = 0;
+ if (firstBitIdx != 0) {
+ prevValue = AtomicGet(Data[firstElementIdx % DataSize]);
+ }
+ const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx));
+ const ui64 newValue = prevValue | bits;
+ AtomicSet(Data[firstElementIdx % DataSize], newValue);
+ if (ModifyLastTime) {
+ AtomicSet(LastTimeNs, timeNs);
+ }
+ return;
+ }
+ // process the first element
+ ui64 prevValue = 0;
+ if (firstBitIdx != 0) {
+ prevValue = AtomicGet(Data[firstElementIdx % DataSize]);
+ }
+ const ui64 bits = ((~0ull) << firstBitIdx);
+ const ui64 newValue = (prevValue | bits);
+ AtomicSet(Data[firstElementIdx % DataSize], newValue);
+ ++firstElementIdx;
+ // process the fully filled elements
+ const ui64 firstLoop = firstElementIdx / DataSize;
+ const ui64 lastLoop = lastElementIdx / DataSize;
+ const ui64 lastOffset = lastElementIdx % DataSize;
+ if (firstLoop < lastLoop) {
+ for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) {
+ AtomicSet(Data[i], ~0ull);
+ }
+ for (ui64 i = 0; i < lastOffset; ++i) {
+ AtomicSet(Data[i], ~0ull);
+ }
+ } else {
+ for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) {
+ AtomicSet(Data[i], ~0ull);
+ }
+ }
+ // process the last element
+ const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx));
+ AtomicSet(Data[lastOffset], newValue2);
+ if (ModifyLastTime) {
+ AtomicSet(LastTimeNs, timeNs);
+ }
+ }
+
+ void RegisterIdlePeriod(ui64 timeNs) {
+ timeNs &= ~1ull;
+ ui64 lastTimeNs = AtomicGet(LastTimeNs);
+ if (timeNs < lastTimeNs) {
+ // Fast check first, slower chec later
+ if ((timeNs | 1ull) < lastTimeNs) {
+ // Time goes back, dont panic, just mark the whole array 'busy'
+ for (ui64 i = 0; i < DataSize; ++i) {
+ AtomicSet(Data[i], ~0ull);
+ }
+ AtomicSet(LastTimeNs, timeNs);
+ return;
+ }
+ }
+ const ui64 curIdx = lastTimeNs / BitDurationNs;
+ const ui64 lastIdx = timeNs / BitDurationNs;
+ ui64 firstElementIdx = curIdx / 64;
+ const ui64 lastElementIdx = lastIdx / 64;
+ if (firstElementIdx >= lastElementIdx) {
+ AtomicSet(LastTimeNs, timeNs);
+ return;
+ }
+ // process the first partially filled element
+ ++firstElementIdx;
+ // process all other elements
+ const ui64 firstLoop = firstElementIdx / DataSize;
+ const ui64 lastLoop = lastElementIdx / DataSize;
+ const ui64 lastOffset = lastElementIdx % DataSize;
+ if (firstLoop < lastLoop) {
+ for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) {
+ AtomicSet(Data[i], 0);
+ }
+ for (ui64 i = 0; i <= lastOffset; ++i) {
+ AtomicSet(Data[i], 0);
+ }
+ } else {
+ for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) {
+ AtomicSet(Data[i], 0);
+ }
+ }
+ AtomicSet(LastTimeNs, timeNs);
+ }
+};
+
+template <ui64 DataSize>
+struct TMinusOneCpuEstimator {
+ static constexpr ui64 BitsSize = DataSize * 64;
+ ui64 BeginDelayIdx;
+ ui64 EndDelayIdx;
+ ui64 Idle;
+ ui64 Delay[BitsSize];
+
+ ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) {
+ Y_VERIFY(logCount > 0);
+ ui64 endTimeNs = timeNs;
+
+ ui64 lastTimeNs = timeNs;
+ for (i64 log_idx = 0; log_idx < logCount; ++log_idx) {
+ ui64 x = AtomicGet(logs[log_idx]->LastTimeNs);
+ if ((x & 1) == 1) {
+ lastTimeNs = Min(lastTimeNs, x);
+ } else {
+ logs[log_idx]->template RegisterBusyPeriod<false>(endTimeNs, x);
+ }
+ }
+ const ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0;
+
+ ui64 beginIdx = beginTimeNs / BitDurationNs;
+ ui64 lastIdx = lastTimeNs / BitDurationNs;
+ ui64 beginElementIdx = beginIdx / 64;
+ ui64 lastElementIdx = lastIdx / 64;
+
+ BeginDelayIdx = 0;
+ EndDelayIdx = 0;
+ Idle = 0;
+ ui64 maxDelay = 0;
+ ui64 bucket = 0;
+ for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) {
+ ui64 i = idx % DataSize;
+ ui64 input = AtomicGet(logs[0]->Data[i]);
+ ui64 all_busy = ~0ull;
+ for (i64 log_idx = 1; log_idx < logCount; ++log_idx) {
+ ui64 x = AtomicGet(logs[log_idx]->Data[i]);
+ all_busy &= x;
+ }
+ if (!input) {
+ if (!bucket) {
+ Idle += 64 - PopCount(all_busy);
+ continue;
+ }
+ }
+ for (i64 bit_idx = 0; bit_idx < 64; ++bit_idx) {
+ ui64 x = (1ull << bit_idx);
+ if (all_busy & x) {
+ if (input & x) {
+ // Push into the queue
+ bucket++;
+ Delay[EndDelayIdx] = EndDelayIdx;
+ ++EndDelayIdx;
+ } else {
+ // All busy
+ }
+ } else {
+ if (input & x) {
+ // Move success
+ } else {
+ if (bucket) {
+ // Remove from the queue
+ bucket--;
+ ui64 stored = Delay[BeginDelayIdx];
+ ++BeginDelayIdx;
+ ui64 delay = EndDelayIdx - stored;
+ maxDelay = Max(maxDelay, delay);
+ //Cerr << "bit_idx: " << bit_idx << " stored: " << stored << " delay: " << delay << Endl;
+ } else {
+ Idle++;
+ }
+ }
+ }
+ }
+ }
+ if (bucket) {
+ ui64 stored = Delay[BeginDelayIdx];
+ ui64 delay = EndDelayIdx - stored;
+ maxDelay = Max(maxDelay, delay);
+ //Cerr << "last stored: " << stored << " delay: " << delay << Endl;
+ }
+ return maxDelay * BitDurationNs;
+ }
+};
+
diff --git a/library/cpp/actors/util/cpu_load_log_ut.cpp b/library/cpp/actors/util/cpu_load_log_ut.cpp
new file mode 100644
index 0000000000..7109123c6e
--- /dev/null
+++ b/library/cpp/actors/util/cpu_load_log_ut.cpp
@@ -0,0 +1,275 @@
+#include "cpu_load_log.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/random/random.h>
+#include <util/system/hp_timer.h>
+#include <util/system/sanitizers.h>
+#include <util/system/thread.h>
+
+Y_UNIT_TEST_SUITE(CpuLoadLog) {
+
+ TString PrintBits(ui64 x) {
+ TStringStream str;
+ for (ui64 i = 0; i < 64; ++i) {
+ if (x & (1ull << i)) {
+ str << "1";
+ } else {
+ str << "0";
+ }
+ }
+ return str.Str();
+ }
+
+ Y_UNIT_TEST(FillAll) {
+ TCpuLoadLog<5> log(100*BitDurationNs);
+ log.RegisterBusyPeriod(101*BitDurationNs);
+ log.RegisterBusyPeriod(163*BitDurationNs);
+ log.RegisterBusyPeriod(164*BitDurationNs);
+ log.RegisterBusyPeriod(165*BitDurationNs);
+ log.RegisterBusyPeriod(331*BitDurationNs);
+ log.RegisterBusyPeriod(340*BitDurationNs);
+ log.RegisterBusyPeriod(420*BitDurationNs);
+ log.RegisterBusyPeriod(511*BitDurationNs);
+ //for (ui64 i = 0; i < 5; ++i) {
+ // Cerr << "i: " << i << " bits: " << PrintBits(log.Data[i]) << Endl;
+ //}
+ for (ui64 i = 0; i < 5; ++i) {
+ UNIT_ASSERT_C((ui64(log.Data[i]) == ~ui64(0)), "Unequal at " << i << "\n got: " << PrintBits(log.Data[i])
+ << "\n expected: " << PrintBits(~ui64(0)));
+ }
+ }
+
+ Y_UNIT_TEST(PartialFill) {
+ TCpuLoadLog<5> log(0*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b0ull));
+ log.RegisterBusyPeriod(0*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull));
+ log.RegisterBusyPeriod(0*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull));
+ log.RegisterBusyPeriod(1*BitDurationNs/2);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull));
+ log.RegisterBusyPeriod(1*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull));
+ log.RegisterIdlePeriod(3*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull));
+ log.RegisterBusyPeriod(3*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1011ull));
+ log.RegisterBusyPeriod(63*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b0ull));
+ log.RegisterBusyPeriod(128*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0b1ull));
+ log.RegisterBusyPeriod(1*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull));
+ log.RegisterBusyPeriod(2*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull));
+ log.RegisterBusyPeriod(64*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull));
+ log.RegisterIdlePeriod(128*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull));
+ log.RegisterIdlePeriod(192*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull));
+ log.RegisterBusyPeriod(192*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull));
+ log.RegisterIdlePeriod((192+5*64-1)*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull));
+ log.RegisterIdlePeriod((192+15*64)*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull));
+ }
+
+ Y_UNIT_TEST(Estimator) {
+ TCpuLoadLog<5> *log[10];
+ log[0] = new TCpuLoadLog<5>(0*BitDurationNs);
+ log[1] = new TCpuLoadLog<5>(0*BitDurationNs);
+ TMinusOneCpuEstimator<5> estimator;
+
+
+ for (ui64 i = 0; i < 5*64; i+=2) {
+ log[0]->RegisterIdlePeriod(i*BitDurationNs);
+ log[0]->RegisterBusyPeriod(i*BitDurationNs);
+ }
+ log[0]->RegisterIdlePeriod((5*64-2)*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[0]),
+ PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[4]),
+ PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull));
+ for (ui64 i = 0; i < 5*64-1; i+=2) {
+ log[1]->RegisterIdlePeriod((i+1)*BitDurationNs);
+ log[1]->RegisterBusyPeriod((i+1)*BitDurationNs);
+ }
+ log[1]->RegisterIdlePeriod((5*64-2+1)*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[0]),
+ PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull));
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[4]),
+ PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull));
+
+ ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64)*BitDurationNs-1, 3*64*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 1);
+
+ value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64+10)*BitDurationNs, 3*64*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 12);
+
+ delete log[0];
+ delete log[1];
+ }
+
+ Y_UNIT_TEST(Estimator2) {
+ TCpuLoadLog<5> *log[2];
+ log[0] = new TCpuLoadLog<5>(0*BitDurationNs);
+ log[1] = new TCpuLoadLog<5>(0*BitDurationNs);
+ TMinusOneCpuEstimator<5> estimator;
+
+ for (ui64 i = 0; i < 5*64; i+=2) {
+ log[0]->RegisterIdlePeriod(i*BitDurationNs);
+ log[0]->RegisterBusyPeriod(i*BitDurationNs);
+ }
+ for (ui64 i = 0; i < 5; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[i]),
+ PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull));
+ }
+ for (ui64 i = 0; i < 5*64-1; i+=2) {
+ log[1]->RegisterIdlePeriod((i+1)*BitDurationNs);
+ log[1]->RegisterBusyPeriod((i+1)*BitDurationNs);
+ }
+ for (ui64 i = 0; i < 5; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[i]),
+ PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull));
+ }
+
+ log[0]->Data[2] = ~0ull;
+ ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64-1)*BitDurationNs, 3*64*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 32);
+
+ delete log[0];
+ delete log[1];
+ }
+
+ Y_UNIT_TEST(Estimator3) {
+ TCpuLoadLog<5> *log[3];
+ log[0] = new TCpuLoadLog<5>(0*BitDurationNs);
+ log[1] = new TCpuLoadLog<5>(0*BitDurationNs);
+ log[2] = new TCpuLoadLog<5>(0*BitDurationNs);
+ TMinusOneCpuEstimator<5> estimator;
+
+ for (ui64 i = 0; i < 5*64; i+=8) {
+ log[0]->RegisterIdlePeriod(i*BitDurationNs);
+ log[0]->RegisterBusyPeriod((i+3)*BitDurationNs);
+ log[1]->RegisterIdlePeriod(i*BitDurationNs);
+ log[1]->RegisterBusyPeriod((i+3)*BitDurationNs);
+ log[2]->RegisterIdlePeriod(i*BitDurationNs);
+ log[2]->RegisterBusyPeriod((i+3)*BitDurationNs);
+ }
+ for (ui64 i = 0; i < 5; ++i) {
+ for (ui64 n = 0; n < 3; ++n) {
+ UNIT_ASSERT_VALUES_EQUAL_C(PrintBits(log[n]->Data[i]),
+ PrintBits(0b0000111100001111000011110000111100001111000011110000111100001111ull),
+ " i: " << i << " n: " << n);
+ }
+ }
+
+ ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 3, (5*64-5)*BitDurationNs, 3*64*BitDurationNs);
+ UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 4);
+
+ delete log[0];
+ delete log[1];
+ delete log[2];
+ }
+ /*
+ class TWorkerThread : public ISimpleThread {
+ private:
+ std::function<void()> Func;
+ double Time = 0.0;
+
+ public:
+ TWorkerThread(std::function<void()> func)
+ : Func(std::move(func))
+ { }
+
+ double GetTime() const {
+ return Time;
+ }
+
+ static THolder<TWorkerThread> Spawn(std::function<void()> func) {
+ THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func));
+ thread->Start();
+ return thread;
+ }
+
+ private:
+ void* ThreadProc() noexcept override {
+ THPTimer timer;
+ Func();
+ Time = timer.Passed();
+ return nullptr;
+ }
+ };
+
+ void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) {
+ // Concurrency factor 4 is up to 16 threads
+
+ auto workerFunc = [&](size_t threadIndex) {
+ };
+
+ TVector<THolder<TWorkerThread>> workers(threads);
+ for (size_t i = 0; i < threads; ++i) {
+ workers[i] = TWorkerThread::Spawn([workerFunc, i]() {
+ workerFunc(i);
+ });
+ }
+
+ double maxTime = 0;
+ for (size_t i = 0; i < threads; ++i) {
+ workers[i]->Join();
+ maxTime = Max(maxTime, workers[i]->GetTime());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(popped, 0u);
+
+ Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl;
+ }
+
+ void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) {
+ for (size_t i = 0; i < 3; ++i) {
+ DoConcurrentPushPop(threads, perThreadCount);
+ }
+ }
+
+ static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000);
+
+ Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); }
+ */
+}
diff --git a/library/cpp/actors/util/thread_load_log.h b/library/cpp/actors/util/thread_load_log.h
new file mode 100644
index 0000000000..b4b34d47bb
--- /dev/null
+++ b/library/cpp/actors/util/thread_load_log.h
@@ -0,0 +1,363 @@
+#pragma once
+
+#include "defs.h"
+
+#include <util/system/types.h>
+
+#include <type_traits>
+#include <algorithm>
+#include <atomic>
+#include <limits>
+#include <queue>
+
+template <ui64 TIME_SLOT_COUNT, ui64 TIME_SLOT_LENGTH_NS = 131'072, typename Type = std::uint8_t>
+class TThreadLoad {
+public:
+ using TimeSlotType = Type;
+
+private:
+ static constexpr auto TIME_SLOT_MAX_VALUE = std::numeric_limits<TimeSlotType>::max();
+ static constexpr ui64 TIME_SLOT_PART_COUNT = TIME_SLOT_MAX_VALUE + 1;
+ static constexpr auto TIME_SLOT_PART_LENGTH_NS = TIME_SLOT_LENGTH_NS / TIME_SLOT_PART_COUNT;
+
+ template <typename T>
+ static void AtomicAddBound(std::atomic<T>& val, i64 inc) {
+ if (inc == 0) {
+ return;
+ }
+
+ auto newVal = val.load();
+ auto oldVal = newVal;
+
+ do {
+ static constexpr auto MAX_VALUE = std::numeric_limits<T>::max();
+
+ if (oldVal >= MAX_VALUE) {
+ return;
+ }
+ newVal = std::min<i64>(MAX_VALUE, static_cast<i64>(oldVal) + inc);
+ } while (!val.compare_exchange_weak(oldVal, newVal));
+ }
+
+ template <typename T>
+ static void AtomicSubBound(std::atomic<T>& val, i64 sub) {
+ if (sub == 0) {
+ return;
+ }
+
+ auto newVal = val.load();
+ auto oldVal = newVal;
+
+ do {
+ if (oldVal == 0) {
+ return;
+ }
+ newVal = std::max<i64>(0, static_cast<i64>(oldVal) - sub);
+ } while (!val.compare_exchange_weak(oldVal, newVal));
+ }
+
+ void UpdateCompleteTimeSlots(ui64 firstSlotNumber, ui64 lastSlotNumber, TimeSlotType timeSlotValue) {
+ ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT;
+ ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT;
+
+ const ui64 firstTimeSlotsPass = firstSlotNumber / TIME_SLOT_COUNT;
+ const ui64 lastTimeSlotsPass = lastSlotNumber / TIME_SLOT_COUNT;
+
+ if (firstTimeSlotsPass == lastTimeSlotsPass) {
+ // first and last time slots are in the same pass
+ for (auto slotNumber = firstSlotNumber + 1; slotNumber < lastSlotNumber; ++slotNumber) {
+ auto slotIndex = slotNumber % TIME_SLOT_COUNT;
+ TimeSlots[slotIndex] = timeSlotValue;
+ }
+ } else if (firstTimeSlotsPass + 1 == lastTimeSlotsPass) {
+ for (auto slotIndex = (firstSlotNumber + 1) % TIME_SLOT_COUNT; firstSlotIndex < slotIndex && slotIndex < TIME_SLOT_COUNT; ++slotIndex) {
+ TimeSlots[slotIndex] = timeSlotValue;
+ }
+ for (auto slotIndex = 0u; slotIndex < lastSlotIndex; ++slotIndex) {
+ TimeSlots[slotIndex] = timeSlotValue;
+ }
+ } else {
+ for (auto slotIndex = 0u; slotIndex < TIME_SLOT_COUNT; ++slotIndex) {
+ TimeSlots[slotIndex] = timeSlotValue;
+ }
+ }
+ }
+
+public:
+ std::atomic<ui64> LastTimeNs;
+ std::atomic<TimeSlotType> TimeSlots[TIME_SLOT_COUNT];
+ std::atomic<bool> LastRegisteredPeriodIsBusy = false;
+
+ explicit TThreadLoad(ui64 timeNs = 0) {
+ static_assert(std::is_unsigned<TimeSlotType>::value);
+
+ LastTimeNs = timeNs;
+ for (size_t i = 0; i < TIME_SLOT_COUNT; ++i) {
+ TimeSlots[i] = 0;
+ }
+ }
+
+ static constexpr auto GetTimeSlotCount() {
+ return TIME_SLOT_COUNT;
+ }
+
+ static constexpr auto GetTimeSlotLengthNs() {
+ return TIME_SLOT_LENGTH_NS;
+ }
+
+ static constexpr auto GetTimeSlotPartLengthNs() {
+ return TIME_SLOT_PART_LENGTH_NS;
+ }
+
+ static constexpr auto GetTimeSlotPartCount() {
+ return TIME_SLOT_PART_COUNT;
+ }
+
+ static constexpr auto GetTimeSlotMaxValue() {
+ return TIME_SLOT_MAX_VALUE;
+ }
+
+ static constexpr auto GetTimeWindowLengthNs() {
+ return TIME_SLOT_COUNT * TIME_SLOT_LENGTH_NS;
+ }
+
+ void RegisterBusyPeriod(ui64 timeNs) {
+ RegisterBusyPeriod<true>(timeNs, LastTimeNs.load());
+ }
+
+ template <bool ModifyLastTime>
+ void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) {
+ LastRegisteredPeriodIsBusy = true;
+
+ if (timeNs < lastTimeNs) {
+ // when time goes back, mark all time slots as 'free'
+ for (size_t i = 0u; i < TIME_SLOT_COUNT; ++i) {
+ TimeSlots[i] = 0;
+ }
+
+ if (ModifyLastTime) {
+ LastTimeNs = timeNs;
+ }
+
+ return;
+ }
+
+ // lastTimeNs <= timeNs
+ ui64 firstSlotNumber = lastTimeNs / TIME_SLOT_LENGTH_NS;
+ ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT;
+ ui64 lastSlotNumber = timeNs / TIME_SLOT_LENGTH_NS;
+ ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT;
+
+ if (firstSlotNumber == lastSlotNumber) {
+ ui32 slotLengthNs = timeNs - lastTimeNs;
+ ui32 slotPartsCount = (slotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
+ AtomicAddBound(TimeSlots[firstSlotIndex], slotPartsCount);
+
+ if (ModifyLastTime) {
+ LastTimeNs = timeNs;
+ }
+ return;
+ }
+
+ ui32 firstSlotLengthNs = TIME_SLOT_LENGTH_NS - (lastTimeNs % TIME_SLOT_LENGTH_NS);
+ ui32 firstSlotPartsCount = (firstSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
+ ui32 lastSlotLengthNs = timeNs % TIME_SLOT_LENGTH_NS;
+ ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
+
+ // process first time slot
+ AtomicAddBound(TimeSlots[firstSlotIndex], firstSlotPartsCount);
+
+ // process complete time slots
+ UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, TIME_SLOT_MAX_VALUE);
+
+ // process last time slot
+ AtomicAddBound(TimeSlots[lastSlotIndex], lastSlotPartsCount);
+
+ if (ModifyLastTime) {
+ LastTimeNs = timeNs;
+ }
+ }
+
+ void RegisterIdlePeriod(ui64 timeNs) {
+ LastRegisteredPeriodIsBusy = false;
+
+ ui64 lastTimeNs = LastTimeNs.load();
+ if (timeNs < lastTimeNs) {
+ // when time goes back, mark all time slots as 'busy'
+ for (size_t i = 0u; i < TIME_SLOT_COUNT; ++i) {
+ TimeSlots[i] = TIME_SLOT_MAX_VALUE;
+ }
+ LastTimeNs = timeNs;
+ return;
+ }
+
+ // lastTimeNs <= timeNs
+ ui64 firstSlotNumber = lastTimeNs / TIME_SLOT_LENGTH_NS;
+ ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT;
+ ui64 lastSlotNumber = timeNs / TIME_SLOT_LENGTH_NS;
+ ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT;
+
+ if (firstSlotNumber == lastSlotNumber) {
+ ui32 slotLengthNs = timeNs - lastTimeNs;
+ ui32 slotPartsCount = slotLengthNs / TIME_SLOT_PART_LENGTH_NS;
+
+ AtomicSubBound(TimeSlots[firstSlotIndex], slotPartsCount);
+
+ LastTimeNs = timeNs;
+ return;
+ }
+
+ ui32 firstSlotLengthNs = TIME_SLOT_LENGTH_NS - (lastTimeNs % TIME_SLOT_LENGTH_NS);
+ ui32 firstSlotPartsCount = (firstSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
+ ui32 lastSlotLengthNs = timeNs % TIME_SLOT_LENGTH_NS;
+ ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
+
+ // process first time slot
+ AtomicSubBound(TimeSlots[firstSlotIndex], firstSlotPartsCount);
+
+ // process complete time slots
+ UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, 0);
+
+ // process last time slot
+ AtomicSubBound(TimeSlots[lastSlotIndex], lastSlotPartsCount);
+
+ LastTimeNs = timeNs;
+ }
+};
+
+class TMinusOneThreadEstimator {
+private:
+ template <typename T, int MaxSize>
+ class TArrayQueue {
+ public:
+ bool empty() const {
+ return FrontIndex == -1;
+ }
+
+ bool full() const {
+ return (RearIndex + 1) % MaxSize == FrontIndex;
+ }
+
+ T& front() {
+ return Data[FrontIndex];
+ }
+
+ bool push(T &&t) {
+ if (full()) {
+ return false;
+ }
+
+ if (FrontIndex == -1) {
+ FrontIndex = 0;
+ }
+
+ RearIndex = (RearIndex + 1) % MaxSize;
+ Data[RearIndex] = std::move(t);
+ return true;
+ }
+
+ bool pop() {
+ if (empty()) {
+ return false;
+ }
+
+ if (FrontIndex == RearIndex) {
+ FrontIndex = RearIndex = -1;
+ } else {
+ FrontIndex = (FrontIndex + 1) % MaxSize;
+ }
+
+ return true;
+ }
+
+ private:
+ int FrontIndex = -1;
+ int RearIndex = -1;
+ T Data[MaxSize];
+ };
+
+public:
+ template <typename T>
+ ui64 MaxLatencyIncreaseWithOneLessCpu(T **threadLoads, ui32 threadCount, ui64 timeNs, ui64 periodNs) {
+ Y_VERIFY(threadCount > 0);
+
+ struct TTimeSlotData {
+ typename T::TimeSlotType Load;
+ ui64 Index;
+ };
+
+ ui64 lastTimeNs = timeNs;
+ for (auto threadIndex = 0u; threadIndex < threadCount; ++threadIndex) {
+ if (threadLoads[threadIndex]->LastRegisteredPeriodIsBusy.load()) {
+ lastTimeNs = std::min(lastTimeNs, threadLoads[threadIndex]->LastTimeNs.load());
+ } else {
+ // make interval [lastTimeNs, timeNs] 'busy'
+ threadLoads[threadIndex]->template RegisterBusyPeriod<false>(timeNs, threadLoads[threadIndex]->LastTimeNs.load());
+ }
+ }
+
+ periodNs = std::min(T::GetTimeWindowLengthNs(), periodNs);
+
+ ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0;
+
+ ui64 firstSlotNumber = beginTimeNs / T::GetTimeSlotLengthNs();
+ ui64 lastSlotNumber = (lastTimeNs + T::GetTimeSlotLengthNs() - 1) / T::GetTimeSlotLengthNs();
+
+ ui64 maxTimeSlotShiftCount = 0u;
+ TArrayQueue<TTimeSlotData, T::GetTimeSlotCount()> firstThreadLoadDataQueue;
+
+ for (auto slotNumber = firstSlotNumber; slotNumber < lastSlotNumber; ++slotNumber) {
+ ui64 slotIndex = slotNumber % T::GetTimeSlotCount();
+
+ typename T::TimeSlotType firstThreadTimeSlotValue = threadLoads[0]->TimeSlots[slotIndex].load();
+
+ // distribute previous load of the first thread by other threads
+ auto foundIdleThread = false;
+
+ for (auto threadIndex = 1u; threadIndex < threadCount; ++threadIndex) {
+ typename T::TimeSlotType thisThreadAvailableTimeSlotLoad = threadLoads[threadIndex]->GetTimeSlotMaxValue() - threadLoads[threadIndex]->TimeSlots[slotIndex].load();
+
+ while (!firstThreadLoadDataQueue.empty() && thisThreadAvailableTimeSlotLoad > 0) {
+ auto& firstThreadLoadData = firstThreadLoadDataQueue.front();
+
+ auto distributedLoad = std::min(thisThreadAvailableTimeSlotLoad, firstThreadLoadData.Load);
+
+ thisThreadAvailableTimeSlotLoad -= distributedLoad;
+ firstThreadLoadData.Load -= distributedLoad;
+
+ if (firstThreadLoadData.Load == 0) {
+ auto timeSlotShiftCount = slotIndex - firstThreadLoadData.Index;
+ maxTimeSlotShiftCount = std::max(maxTimeSlotShiftCount, timeSlotShiftCount);
+ auto res = firstThreadLoadDataQueue.pop();
+ Y_VERIFY(res);
+ }
+ }
+
+ if (thisThreadAvailableTimeSlotLoad == threadLoads[threadIndex]->GetTimeSlotMaxValue()) {
+ foundIdleThread = true;
+ }
+ }
+
+ // distribute current load of the first thread by other threads
+ if (firstThreadTimeSlotValue > 0) {
+ if (foundIdleThread) {
+ // The current load of the first thead can be
+ // moved to the idle thread so there is nothing to do
+ } else {
+ // The current load of the first thread can be later
+ // processed by the following time slots of other threads
+ auto res = firstThreadLoadDataQueue.push({firstThreadTimeSlotValue, slotIndex});
+ Y_VERIFY(res);
+ }
+ }
+ }
+
+ if (!firstThreadLoadDataQueue.empty()) {
+ const auto& timeSlotData = firstThreadLoadDataQueue.front();
+ auto timeSlotShiftCount = T::GetTimeSlotCount() - timeSlotData.Index;
+ maxTimeSlotShiftCount = std::max(maxTimeSlotShiftCount, timeSlotShiftCount);
+ }
+
+ return maxTimeSlotShiftCount * T::GetTimeSlotLengthNs();
+ }
+};
diff --git a/library/cpp/actors/util/thread_load_log_ut.cpp b/library/cpp/actors/util/thread_load_log_ut.cpp
new file mode 100644
index 0000000000..20e776cff6
--- /dev/null
+++ b/library/cpp/actors/util/thread_load_log_ut.cpp
@@ -0,0 +1,966 @@
+#include "thread_load_log.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/random/random.h>
+#include <util/system/hp_timer.h>
+#include <util/system/thread.h>
+#include <util/system/types.h>
+#include <util/system/sanitizers.h>
+
+#include <limits>
+
+Y_UNIT_TEST_SUITE(ThreadLoadLog) {
+
+ Y_UNIT_TEST(TThreadLoad8BitSlotType) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+
+ using TSlotType = std::uint8_t;
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>;
+
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeWindowLengthNs(), timeWindowLengthNs);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotLengthNs(), timeSlotLengthNs);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotCount(), timeSlotCount);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotMaxValue(), std::numeric_limits<TSlotType>::max());
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<TSlotType>::max() + 1);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartLengthNs(), T::GetTimeSlotLengthNs() / T::GetTimeSlotPartCount());
+ }
+
+ Y_UNIT_TEST(TThreadLoad16BitSlotType) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+
+ using TSlotType = std::uint16_t;
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>;
+
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeWindowLengthNs(), timeWindowLengthNs);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotLengthNs(), timeSlotLengthNs);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotCount(), timeSlotCount);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotMaxValue(), std::numeric_limits<TSlotType>::max());
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<TSlotType>::max() + 1);
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartLengthNs(), T::GetTimeSlotLengthNs() / T::GetTimeSlotPartCount());
+ }
+
+ Y_UNIT_TEST(TThreadLoad8BitSlotTypeWindowBusy) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+
+ using TSlotType = std::uint8_t;
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>;
+
+ T threadLoad;
+ threadLoad.RegisterBusyPeriod(T::GetTimeWindowLengthNs());
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), T::GetTimeWindowLengthNs());
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), T::GetTimeSlotMaxValue());
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoad16BitSlotTypeWindowBusy) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+
+ using TSlotType = std::uint16_t;
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>;
+
+ T threadLoad;
+ threadLoad.RegisterBusyPeriod(T::GetTimeWindowLengthNs());
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), T::GetTimeWindowLengthNs());
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), T::GetTimeSlotMaxValue());
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot1) {
+ TThreadLoad<38400> threadLoad;
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1;
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot2) {
+ using T = TThreadLoad<38400>;
+
+ ui32 startNs = 2 * T::GetTimeSlotPartLengthNs();
+ T threadLoad(startNs);
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1;
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot3) {
+ TThreadLoad<38400> threadLoad;
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot4) {
+ using T = TThreadLoad<38400>;
+
+ ui32 startNs = 2 * T::GetTimeSlotPartLengthNs();
+ T threadLoad(startNs);
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 3 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), (timeNs - startNs) / T::GetTimeSlotPartLengthNs());
+
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTwoTimeSlots1) {
+ TThreadLoad<38400> threadLoad;
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs() - 1;
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+
+ for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTwoTimeSlots2) {
+ TThreadLoad<38400> threadLoad;
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+
+ for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots1) {
+ TThreadLoad<38400> threadLoad;
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 3 * threadLoad.GetTimeSlotLengthNs() - 1;
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+
+ for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots2) {
+ TThreadLoad<38400> threadLoad;
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 3 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+
+ for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots3) {
+ using T = TThreadLoad<38400>;
+
+ ui32 startNs = 3 * T::GetTimeSlotPartLengthNs();
+ T threadLoad(startNs);
+
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 0;
+ threadLoad.RegisterBusyPeriod(timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot1) {
+ using T = TThreadLoad<38400>;
+
+ ui64 timeNs = T::GetTimeSlotPartLengthNs();
+ T threadLoad(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 2 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 3 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 4 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot2) {
+ using T = TThreadLoad<38400>;
+
+ ui64 timeNs = T::GetTimeSlotPartLengthNs();
+ T threadLoad(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 2 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1;
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 4 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 3);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot3) {
+ using T = TThreadLoad<38400>;
+
+ ui64 timeNs = T::GetTimeSlotPartLengthNs();
+ T threadLoad(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 2 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1;
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 4 * T::GetTimeSlotPartLengthNs() - 2;
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 5 * T::GetTimeSlotPartLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 3);
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots1) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 2 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots2) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1;
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 2 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots3) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1;
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = 2 * threadLoad.GetTimeSlotLengthNs() - 1;
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots1) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ timeNs = 2 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ timeNs = 3 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots2) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ timeNs = 3 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots3) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ timeNs = 3 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots4) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = threadLoad.GetTimeSlotLengthNs() + 2 * threadLoad.GetTimeSlotPartLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ timeNs = 3 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotPartCount() - 2);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots5) {
+ using T = TThreadLoad<38400>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = timeNs + threadLoad.GetTimeWindowLengthNs() + threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodOverTimeWindow) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint8_t>;
+
+ T threadLoad;
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0);
+ for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ ui64 timeNs = 5 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[3].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[4].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 5u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+
+ timeNs = timeNs + threadLoad.GetTimeWindowLengthNs() - 3 * threadLoad.GetTimeSlotLengthNs();
+ threadLoad.RegisterIdlePeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[3].load(), threadLoad.GetTimeSlotMaxValue());
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[4].load(), threadLoad.GetTimeSlotMaxValue());
+ for (auto slotIndex = 5u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsZeroShiftNs) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<std::uint16_t>::max() + 1);
+
+ T *threadLoads[2];
+ threadLoads[0] = new T;
+ threadLoads[1] = new T;
+
+ for (ui64 i = 1; i < timeSlotCount; i += 2) {
+ threadLoads[0]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs());
+ threadLoads[0]->RegisterBusyPeriod((i + 1) * T::GetTimeSlotLengthNs());
+ }
+
+ for (ui64 i = 1; i < timeSlotCount; i += 2) {
+ threadLoads[1]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs());
+ threadLoads[1]->RegisterIdlePeriod((i + 1) * T::GetTimeSlotLengthNs());
+ }
+
+ TMinusOneThreadEstimator estimator;
+ ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, 2, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+ UNIT_ASSERT_VALUES_EQUAL(value, 0);
+
+ delete threadLoads[0];
+ delete threadLoads[1];
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsOneTimeSlotShift1) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+ constexpr auto threadCount = 2;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ T *threadLoads[threadCount];
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ threadLoads[t] = new T;
+
+ for (ui64 i = 2; i < threadLoads[t]->GetTimeSlotCount(); i += 2) {
+ threadLoads[t]->RegisterIdlePeriod((i - 1) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs());
+ }
+
+ threadLoads[t]->RegisterIdlePeriod((threadLoads[t]->GetTimeSlotCount() - 1) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterBusyPeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs());
+
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 2 == 1) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ TMinusOneThreadEstimator estimator;
+ auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+
+ for (ui64 t = 0; t < threadCount; ++t) {
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 2 == 1) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeSlotLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ delete threadLoads[t];
+ }
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsOneTimeSlotShift2) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+ constexpr auto threadCount = 2;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ T *threadLoads[threadCount];
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ threadLoads[t] = new T;
+
+ for (ui64 i = 2; i < threadLoads[t]->GetTimeSlotCount(); i += 2) {
+ threadLoads[t]->RegisterBusyPeriod((i - 1) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs());
+ }
+
+ threadLoads[t]->RegisterBusyPeriod((threadLoads[t]->GetTimeSlotCount() - 1) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterIdlePeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs());
+
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 2 == 0) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ TMinusOneThreadEstimator estimator;
+ auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+
+ for (ui64 t = 0; t < threadCount; ++t) {
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 2 == 0) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeSlotLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ delete threadLoads[t];
+ }
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift1) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+ constexpr auto threadCount = 2;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ T *threadLoads[threadCount];
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ threadLoads[t] = new T;
+
+ for (ui64 i = 4; i < threadLoads[t]->GetTimeSlotCount(); i += 4) {
+ threadLoads[t]->RegisterIdlePeriod((i - 2) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs());
+ }
+
+ threadLoads[t]->RegisterIdlePeriod((threadLoads[t]->GetTimeSlotCount() - 2) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterBusyPeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs());
+
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 4 == 2 || s % 4 == 3) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ TMinusOneThreadEstimator estimator;
+ auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+
+ for (ui64 t = 0; t < threadCount; ++t) {
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 4 == 2 || s % 4 == 3) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[s].load(), 0);
+ }
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ delete threadLoads[t];
+ }
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift2) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+ constexpr auto threadCount = 2;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ T *threadLoads[threadCount];
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ threadLoads[t] = new T;
+
+ for (ui64 i = 4; i < threadLoads[t]->GetTimeSlotCount(); i += 4) {
+ threadLoads[t]->RegisterBusyPeriod((i - 2) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs());
+ }
+
+ threadLoads[t]->RegisterBusyPeriod((threadLoads[t]->GetTimeSlotCount() - 2) * T::GetTimeSlotLengthNs());
+ threadLoads[t]->RegisterIdlePeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs());
+
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 4 == 0 || s % 4 == 1) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ TMinusOneThreadEstimator estimator;
+ auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+
+ for (ui64 t = 0; t < threadCount; ++t) {
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ if (s % 4 == 0 || s % 4 == 1) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ delete threadLoads[t];
+ }
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift3) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+ constexpr auto threadCount = 2;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ T *threadLoads[threadCount];
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ threadLoads[t] = new T;
+
+ auto timeNs = T::GetTimeWindowLengthNs() - 1.5 * T::GetTimeSlotLengthNs();
+ threadLoads[t]->RegisterIdlePeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs);
+
+ timeNs = T::GetTimeWindowLengthNs();
+ threadLoads[t]->RegisterBusyPeriod(timeNs);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs);
+
+ for (ui64 s = 0; s + 2 < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 2].load(), T::GetTimeSlotPartCount() / 2);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 1].load(), T::GetTimeSlotMaxValue());
+ }
+
+ TMinusOneThreadEstimator estimator;
+ auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ for (ui64 s = 0; s + 2 < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 2].load(), T::GetTimeSlotPartCount() / 2);
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 1].load(), T::GetTimeSlotMaxValue());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ delete threadLoads[t];
+ }
+ }
+
+ Y_UNIT_TEST(MinusOneThreadEstimator16ThreadLoadsAllTimeSlots) {
+ constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec
+ constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec
+ constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs;
+ constexpr auto threadCount = 16;
+ constexpr auto estimatesCount = 16;
+
+ using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>;
+
+ for (auto e = 0u; e < estimatesCount; ++e) {
+ T *threadLoads[threadCount];
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ threadLoads[t] = new T;
+ auto timeNs = threadLoads[t]->GetTimeWindowLengthNs();
+ threadLoads[t]->RegisterBusyPeriod(timeNs);
+
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs);
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ }
+ }
+
+ ui64 result = 0;
+ {
+ THPTimer timer;
+ TMinusOneThreadEstimator estimator;
+ result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs());
+ // output in microseconds
+ auto passed = timer.Passed() * 1000000;
+ Y_UNUSED(passed);
+ // Cerr << "timer : " << passed << " " << __LINE__ << Endl;
+ }
+
+ for (ui64 t = 0; t < threadCount; ++t) {
+ UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), T::GetTimeWindowLengthNs());
+ for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) {
+ UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str());
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeWindowLengthNs());
+
+ for (auto t = 0u; t < threadCount; ++t) {
+ delete threadLoads[t];
+ }
+ }
+ }
+}
diff --git a/library/cpp/mime/types/mime.cpp b/library/cpp/mime/types/mime.cpp
index e4cbcc86eb..74eeabea48 100644
--- a/library/cpp/mime/types/mime.cpp
+++ b/library/cpp/mime/types/mime.cpp
@@ -250,5 +250,6 @@ const char* MimeNames[MIME_MAX] = {
"woff", // MIME_WOFF // 43
"woff2", // MIME_WOFF2 // 44
"ttf", // MIME_TTF // 45
- "webmanifest" // MIME_WEBMANIFEST // 46
+ "webmanifest", // MIME_WEBMANIFEST // 46
+ "cbor", // MIME_CBOR // 47
};
diff --git a/library/cpp/string_utils/CMakeLists.txt b/library/cpp/string_utils/CMakeLists.txt
index d256782733..bbdcba85d9 100644
--- a/library/cpp/string_utils/CMakeLists.txt
+++ b/library/cpp/string_utils/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(base64)
+add_subdirectory(csv)
add_subdirectory(indent_text)
add_subdirectory(levenshtein_diff)
add_subdirectory(parse_size)
diff --git a/library/cpp/string_utils/csv/CMakeLists.txt b/library/cpp/string_utils/csv/CMakeLists.txt
new file mode 100644
index 0000000000..7dffad3566
--- /dev/null
+++ b/library/cpp/string_utils/csv/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-string_utils-csv)
+target_link_libraries(cpp-string_utils-csv PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+)
+target_sources(cpp-string_utils-csv PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/string_utils/csv/csv.cpp
+)
diff --git a/library/cpp/string_utils/csv/csv.cpp b/library/cpp/string_utils/csv/csv.cpp
new file mode 100644
index 0000000000..218473c62c
--- /dev/null
+++ b/library/cpp/string_utils/csv/csv.cpp
@@ -0,0 +1,82 @@
+#include "csv.h"
+
+TStringBuf NCsvFormat::CsvSplitter::Consume() {
+ if (Begin == End) {
+ return nullptr;
+ }
+ TString::iterator TokenStart = Begin;
+ TString::iterator TokenEnd = Begin;
+ if (Quote == '\0') {
+ while (1) {
+ if (TokenEnd == End || *TokenEnd == Delimeter) {
+ Begin = TokenEnd;
+ return TStringBuf(TokenStart, TokenEnd);
+ }
+ ++TokenEnd;
+ }
+ } else {
+ bool Escape = false;
+ if (*Begin == Quote) {
+ Escape = true;
+ ++TokenStart;
+ ++TokenEnd;
+ Y_ENSURE(TokenStart != End, TStringBuf("RFC4180 violation: quotation mark must be followed by something"));
+ }
+ while (1) {
+ if (TokenEnd == End || (!Escape && *TokenEnd == Delimeter)) {
+ Begin = TokenEnd;
+ return TStringBuf(TokenStart, TokenEnd);
+ } else if (*TokenEnd == Quote) {
+ Y_ENSURE(Escape, TStringBuf("RFC4180 violation: quotation mark must be in the escaped string only"));
+ if (TokenEnd + 1 == End) {
+ Begin = TokenEnd + 1;
+ } else if (*(TokenEnd + 1) == Delimeter) {
+ Begin = TokenEnd + 1;
+ } else if (*(TokenEnd + 1) == Quote) {
+ CustomStringBufs.push_back(TStringBuf(TokenStart, (TokenEnd + 1)));
+ TokenEnd += 2;
+ TokenStart = TokenEnd;
+ continue;
+ } else {
+ Y_ENSURE(false, TStringBuf("RFC4180 violation: in escaped string quotation mark must be followed by a delimiter, EOL or another quotation mark"));
+ }
+ if (CustomStringBufs.size()) {
+ CustomString.clear();
+ for (auto CustomStringBuf : CustomStringBufs) {
+ CustomString += TString{ CustomStringBuf };
+ }
+ CustomString += TString{ TStringBuf(TokenStart, TokenEnd) };
+ CustomStringBufs.clear();
+ return TStringBuf(CustomString);
+ } else {
+ return TStringBuf(TokenStart, TokenEnd);
+ }
+ }
+ ++TokenEnd;
+ }
+ }
+};
+
+TString NCsvFormat::TLinesSplitter::ConsumeLine() {
+ bool Escape = false;
+ TString result;
+ TString line;
+ while (Input.ReadLine(line)) {
+ for (auto it = line.begin(); it != line.end(); ++it) {
+ if (*it == Quote) {
+ Escape = !Escape;
+ }
+ }
+ if (!result) {
+ result = line;
+ } else {
+ result += line;
+ }
+ if (!Escape) {
+ break;
+ } else {
+ result += "\n";
+ }
+ }
+ return result;
+};
diff --git a/library/cpp/string_utils/csv/csv.h b/library/cpp/string_utils/csv/csv.h
new file mode 100644
index 0000000000..8cb96e6bb9
--- /dev/null
+++ b/library/cpp/string_utils/csv/csv.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/vector.h>
+#include <util/stream/input.h>
+
+/*
+ Split string by rfc4180
+*/
+
+namespace NCsvFormat {
+ class TLinesSplitter {
+ private:
+ IInputStream& Input;
+ const char Quote;
+ public:
+ TLinesSplitter(IInputStream& input, const char quote = '"')
+ : Input(input)
+ , Quote(quote) {
+ }
+ TString ConsumeLine();
+ };
+
+ class CsvSplitter {
+ public:
+ CsvSplitter(TString& data, const char delimeter = ',', const char quote = '"')
+ // quote = '\0' ignores quoting in values and words like simple split
+ : Delimeter(delimeter)
+ , Quote(quote)
+ , Begin(data.begin())
+ , End(data.end())
+ {
+ }
+
+ bool Step() {
+ if (Begin == End) {
+ return false;
+ }
+ ++Begin;
+ return true;
+ }
+
+ TStringBuf Consume();
+ explicit operator TVector<TString>() {
+ TVector<TString> ret;
+
+ do {
+ TStringBuf buf = Consume();
+ ret.push_back(TString{buf});
+ } while (Step());
+
+ return ret;
+ }
+
+ private:
+ const char Delimeter;
+ const char Quote;
+ TString::iterator Begin;
+ const TString::const_iterator End;
+ TString CustomString;
+ TVector<TStringBuf> CustomStringBufs;
+ };
+}