diff options
author | kruall <kruall@ydb.tech> | 2023-12-07 22:14:35 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-12-07 22:36:46 +0300 |
commit | ab0991343189494d8e93e3eff2af16fc3a3ce561 (patch) | |
tree | 189320e798cd8dbbce7ba195e1c625d3b52e7e0d | |
parent | b94e74ac740f885d9ad76a2f845aeb5f6895bb6d (diff) | |
download | ydb-ab0991343189494d8e93e3eff2af16fc3a3ce561.tar.gz |
Remove UnitedPool, KIKIMR-18440
29 files changed, 14 insertions, 2912 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index c09b59d292..ab88bc8bf0 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -190,7 +190,6 @@ #include <ydb/library/actors/core/events.h> #include <ydb/library/actors/core/executor_pool_basic.h> #include <ydb/library/actors/core/executor_pool_io.h> -#include <ydb/library/actors/core/executor_pool_united.h> #include <ydb/library/actors/core/log.h> #include <ydb/library/actors/core/log_settings.h> #include <ydb/library/actors/core/mon.h> @@ -286,7 +285,6 @@ void AddExecutorPool( const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, - ui32& unitedThreads, const NKikimr::TAppData* appData) { const auto counters = GetServiceCounters(appData->Counters, "utils"); @@ -335,87 +333,19 @@ void AddExecutorPool( cpuManager.IO.emplace_back(std::move(io)); break; } - case NKikimrConfig::TActorSystemConfig::TExecutor::UNITED: { - TUnitedExecutorPoolConfig united; - united.PoolId = poolId; - united.PoolName = poolConfig.GetName(); - united.Concurrency = poolConfig.GetConcurrency(); - united.Weight = (NActors::TPoolWeight)poolConfig.GetWeight(); - united.Allowed = ParseAffinity(poolConfig.GetAffinity()); - if (poolConfig.HasTimePerMailboxMicroSecs()) { - united.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs()); - } else if (systemConfig.HasTimePerMailboxMicroSecs()) { - united.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs()); - } - if (poolConfig.HasEventsPerMailbox()) { - united.EventsPerMailbox = poolConfig.GetEventsPerMailbox(); - } else if (systemConfig.HasEventsPerMailbox()) { - united.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); - } - Y_ABORT_UNLESS(united.EventsPerMailbox != 0); - united.Balancing.Cpus = poolConfig.GetThreads(); - united.Balancing.MinCpus = poolConfig.GetMinThreads(); - united.Balancing.MaxCpus = poolConfig.GetMaxThreads(); - united.Balancing.Priority = poolConfig.GetBalancingPriority(); - united.Balancing.ToleratedLatencyUs = poolConfig.GetToleratedLatencyUs(); - unitedThreads += united.Balancing.Cpus; - cpuManager.United.emplace_back(std::move(united)); - break; - } default: Y_ABORT(); } } -static TUnitedWorkersConfig CreateUnitedWorkersConfig(const NKikimrConfig::TActorSystemConfig::TUnitedWorkers& config, ui32 unitedThreads) { - TUnitedWorkersConfig result; - result.CpuCount = unitedThreads; - if (config.HasCpuCount()) { - result.CpuCount = config.GetCpuCount(); - } - if (config.HasSpinThresholdUs()) { - result.SpinThresholdUs = config.GetSpinThresholdUs(); - } - if (config.HasPoolLimitUs()) { - result.PoolLimitUs = config.GetPoolLimitUs(); - } - if (config.HasEventLimitUs()) { - result.EventLimitUs = config.GetEventLimitUs(); - } - if (config.HasLimitPrecisionUs()) { - result.LimitPrecisionUs = config.GetLimitPrecisionUs(); - } - if (config.HasFastWorkerPriority()) { - result.FastWorkerPriority = config.GetFastWorkerPriority(); - } - if (config.HasIdleWorkerPriority()) { - result.IdleWorkerPriority = config.GetIdleWorkerPriority(); - } - if (config.HasAffinity()) { - result.Allowed = ParseAffinity(config.GetAffinity()); - } - if (config.HasNoRealtime()) { - result.NoRealtime = config.GetNoRealtime(); - } - if (config.HasNoAffinity()) { - result.NoAffinity = config.GetNoAffinity(); - } - if (config.HasBalancerPeriodUs()) { - result.Balancer.PeriodUs = config.GetBalancerPeriodUs(); - } - return result; -} - static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config, const NKikimr::TAppData* appData) { TCpuManagerConfig cpuManager; - ui32 unitedThreads = 0; cpuManager.PingInfoByPool.resize(config.GetExecutor().size()); for (int poolId = 0; poolId < config.GetExecutor().size(); poolId++) { - AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, unitedThreads, appData); + AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, appData); } - cpuManager.UnitedWorkers = CreateUnitedWorkersConfig(config.GetUnitedWorkers(), unitedThreads); return cpuManager; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8cc3ff2f04..f9495d7150 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -61,7 +61,6 @@ message TActorSystemConfig { enum EType { BASIC = 1; IO = 2; - UNITED = 3; }; optional EType Type = 1; @@ -74,17 +73,9 @@ message TActorSystemConfig { optional uint32 EventsPerMailbox = 8; optional uint32 RealtimePriority = 9; - // Actorsystem 2.0: cpu sharing by different pools with preemption - optional uint32 Concurrency = 10; // Limits simultaneously running mailboxes of UNITED pool - optional uint32 Weight = 11; // Weight of UNITED pool in cpu-local scheduler (default value is NActors::DefPoolWeight) - - // Actorsystem 1.5: cpu balancing between pools + // Actorsystem 1.4 optional uint32 MinThreads = 12; // Lower balancing bound, should be at least 1, and not greater than `Threads` optional uint32 MaxThreads = 13; // Higher balancing bound, should be not lower than `Threads` - optional uint32 BalancingPriority = 14; // Priority of pool to obtain cpu due to balancing (higher is better) - optional uint64 ToleratedLatencyUs = 15; // p100-latency threshold indicating that more cpus are required by pool - - // Actorsystem 1.4 optional int32 Priority = 16; optional int32 MaxAvgPingDeviation = 17; } @@ -115,22 +106,6 @@ message TActorSystemConfig { optional uint32 EventsPerMailbox = 9; optional uint32 SelfPingInterval = 10; // in microseconds - message TUnitedWorkers { - optional uint32 CpuCount = 1; // Total CPUs running united workers (TExecutor.Threads analog), should be set to zero to use actorsystem 1.5, and >0 for actorsystem 2.0 - optional uint64 SpinThresholdUs = 2; // Limit for active spinning in case all pools became idle - optional uint64 PoolLimitUs = 3; // Soft limit on pool execution - optional uint64 EventLimitUs = 4; // Hard limit on last event execution exceeding pool limit - optional uint64 LimitPrecisionUs = 5; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch) - optional uint64 FastWorkerPriority = 6; // Real-time priority of workers not exceeding hard limits - optional uint64 IdleWorkerPriority = 7; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority) - optional TAffinity Affinity = 8; // Cpu set for workers (every worker has affinity for exactly one cpu) - optional bool NoRealtime = 9; // Do not use RT-priority for worker threads - optional bool NoAffinity = 10; // Do not use affinity for worker threads - optional uint64 BalancerPeriodUs = 11; // Time between balancer steps (see default in NActors::TBalancerConfig) - } - - optional TUnitedWorkers UnitedWorkers = 11; - optional bool UseAutoConfig = 12; // Used only with UseAutoConfig; diff --git a/ydb/library/actors/core/CMakeLists.darwin-arm64.txt b/ydb/library/actors/core/CMakeLists.darwin-arm64.txt index d606663430..f85da8cdc6 100644 --- a/ydb/library/actors/core/CMakeLists.darwin-arm64.txt +++ b/ydb/library/actors/core/CMakeLists.darwin-arm64.txt @@ -55,7 +55,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp @@ -66,7 +65,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp diff --git a/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt b/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt index d606663430..f85da8cdc6 100644 --- a/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt @@ -55,7 +55,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp @@ -66,7 +65,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp diff --git a/ydb/library/actors/core/CMakeLists.linux-aarch64.txt b/ydb/library/actors/core/CMakeLists.linux-aarch64.txt index 0c23452507..cf33f59969 100644 --- a/ydb/library/actors/core/CMakeLists.linux-aarch64.txt +++ b/ydb/library/actors/core/CMakeLists.linux-aarch64.txt @@ -56,7 +56,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp @@ -67,7 +66,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp diff --git a/ydb/library/actors/core/CMakeLists.linux-x86_64.txt b/ydb/library/actors/core/CMakeLists.linux-x86_64.txt index 0c23452507..cf33f59969 100644 --- a/ydb/library/actors/core/CMakeLists.linux-x86_64.txt +++ b/ydb/library/actors/core/CMakeLists.linux-x86_64.txt @@ -56,7 +56,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp @@ -67,7 +66,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp diff --git a/ydb/library/actors/core/CMakeLists.windows-x86_64.txt b/ydb/library/actors/core/CMakeLists.windows-x86_64.txt index d606663430..f85da8cdc6 100644 --- a/ydb/library/actors/core/CMakeLists.windows-x86_64.txt +++ b/ydb/library/actors/core/CMakeLists.windows-x86_64.txt @@ -55,7 +55,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp @@ -66,7 +65,6 @@ target_sources(library-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp diff --git a/ydb/library/actors/core/actor_benchmark_helper.h b/ydb/library/actors/core/actor_benchmark_helper.h index 9307a117a8..074295840e 100644 --- a/ydb/library/actors/core/actor_benchmark_helper.h +++ b/ydb/library/actors/core/actor_benchmark_helper.h @@ -273,63 +273,30 @@ struct TActorBenchmark { setup->CpuManager.Basic.emplace_back(std::move(basic)); } - static void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency, bool activateEveryEvent) { - TUnitedExecutorPoolConfig united; - united.PoolId = setup->GetExecutorsCount(); - united.PoolName = TStringBuilder() << "u" << united.PoolId; - united.Concurrency = concurrency; - united.TimePerMailbox = TDuration::Hours(1); - if (activateEveryEvent) { - united.EventsPerMailbox = 1; - } else { - united.EventsPerMailbox = ::Max<ui32>(); - } - setup->CpuManager.United.emplace_back(std::move(united)); - } - - static THolder<TActorSystemSetup> GetActorSystemSetup(ui32 unitedCpuCount, bool preemption) { + static THolder<TActorSystemSetup> GetActorSystemSetup() { auto setup = MakeHolder<NActors::TActorSystemSetup>(); setup->NodeId = 1; - setup->CpuManager.UnitedWorkers.CpuCount = unitedCpuCount; - setup->CpuManager.UnitedWorkers.SpinThresholdUs = TSettings::DefaultSpinThreshold; - setup->CpuManager.UnitedWorkers.NoRealtime = TSettings::DefaultNoRealtime; - if (preemption) { - setup->CpuManager.UnitedWorkers.PoolLimitUs = 500; - setup->CpuManager.UnitedWorkers.EventLimitUs = 100; - setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 100; - } else { - setup->CpuManager.UnitedWorkers.PoolLimitUs = 100'000'000'000; - setup->CpuManager.UnitedWorkers.EventLimitUs = 10'000'000'000; - setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 10'000'000'000; - } setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); return setup; } enum class EPoolType { Basic, - United }; - static THolder<TActorSystemSetup> InitActorSystemSetup(EPoolType poolType, ui32 poolsCount, ui32 threads, bool activateEveryEvent, bool preemption) { + static THolder<TActorSystemSetup> InitActorSystemSetup(EPoolType poolType, ui32 poolsCount, ui32 threads, bool activateEveryEvent) { if (poolType == EPoolType::Basic) { - THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false); + THolder<TActorSystemSetup> setup = GetActorSystemSetup(); for (ui32 i = 0; i < poolsCount; ++i) { AddBasicPool(setup, threads, activateEveryEvent, 0); } return setup; - } else if (poolType == EPoolType::United) { - THolder<TActorSystemSetup> setup = GetActorSystemSetup(poolsCount * threads, preemption); - for (ui32 i = 0; i < poolsCount; ++i) { - AddUnitedPool(setup, threads, activateEveryEvent); - } - return setup; } Y_ABORT(); } static double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType, ESendingType sendingType) { - THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false); + THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -359,7 +326,7 @@ struct TActorBenchmark { } static double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType, ESendingType sendingType) { - THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true, false); + THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -402,7 +369,7 @@ struct TActorBenchmark { } static double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType, ESendingType sendingType) { - THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false); + THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -454,7 +421,7 @@ struct TActorBenchmark { }; static auto BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType, ESendingType sendingType, TDuration testDuration = TDuration::Zero(), ui32 inFlight = 1) { - THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, false, false); + THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -529,7 +496,7 @@ struct TActorBenchmark { } static auto BenchStarContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType, ESendingType sendingType, TDuration testDuration = TDuration::Zero(), ui32 starMultiply=10) { - THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true, false); + THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true); TActorSystem actorSystem(setup); actorSystem.Start(); diff --git a/ydb/library/actors/core/actor_ut.cpp b/ydb/library/actors/core/actor_ut.cpp index fe2fd98059..78aaad410c 100644 --- a/ydb/library/actors/core/actor_ut.cpp +++ b/ydb/library/actors/core/actor_ut.cpp @@ -25,7 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams; Y_UNIT_TEST(WithSharedExecutors) { - THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(0, false); + THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(); TActorBenchmark::AddBasicPool(setup, 2, 1, 0); TActorBenchmark::AddBasicPool(setup, 2, 1, 1); @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } Y_UNIT_TEST(WithoutSharedExecutors) { - THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(0, false); + THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(); TActorBenchmark::AddBasicPool(setup, 2, 1, 0); TActorBenchmark::AddBasicPool(setup, 2, 1, 0); @@ -169,23 +169,6 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } } - Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) { - for (const auto& mType : TSettings::MailboxTypes) { - auto stats = TActorBenchmark::CountStats([mType] { - return TActorBenchmark::BenchSendReceive(true, mType, TActorBenchmark::EPoolType::United, ESendingType::Common); - }); - Cerr << stats.ToString() << " " << mType << Endl; - stats = TActorBenchmark::CountStats([mType] { - return TActorBenchmark::BenchSendReceive(true, mType, TActorBenchmark::EPoolType::United, ESendingType::Lazy); - }); - Cerr << stats.ToString() << " " << mType << " Lazy" << Endl; - stats = TActorBenchmark::CountStats([mType] { - return TActorBenchmark::BenchSendReceive(true, mType, TActorBenchmark::EPoolType::United, ESendingType::Tail); - }); - Cerr << stats.ToString() << " " << mType << " Tail" << Endl; - } - } - Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) { for (const auto& mType : TSettings::MailboxTypes) { auto stats = TActorBenchmark::CountStats([mType] { @@ -203,87 +186,38 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } } - Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) { - for (const auto& mType : TSettings::MailboxTypes) { - auto stats = TActorBenchmark::CountStats([mType] { - return TActorBenchmark::BenchSendReceive(false, mType, TActorBenchmark::EPoolType::United, ESendingType::Common); - }); - Cerr << stats.ToString() << " " << mType << Endl; - stats = TActorBenchmark::CountStats([mType] { - return TActorBenchmark::BenchSendReceive(false, mType, TActorBenchmark::EPoolType::United, ESendingType::Lazy); - }); - Cerr << stats.ToString() << " " << mType << " Lazy" << Endl; - stats = TActorBenchmark::CountStats([mType] { - return TActorBenchmark::BenchSendReceive(false, mType, TActorBenchmark::EPoolType::United, ESendingType::Tail); - }); - Cerr << stats.ToString() << " " << mType << " Tail" << Endl; - } - } - Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) { TActorBenchmark::RunBenchSendActivateReceive(1, 1, true, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAllocUnited) { - TActorBenchmark::RunBenchSendActivateReceive(1, 1, true, TActorBenchmark::EPoolType::United); - } - Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAlloc) { TActorBenchmark::RunBenchSendActivateReceive(1, 1, false, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAllocUnited) { - TActorBenchmark::RunBenchSendActivateReceive(1, 1, false, TActorBenchmark::EPoolType::United); - } - Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAlloc) { TActorBenchmark::RunBenchSendActivateReceive(1, 2, true, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAllocUnited) { - TActorBenchmark::RunBenchSendActivateReceive(1, 2, true, TActorBenchmark::EPoolType::United); - } - Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAlloc) { TActorBenchmark::RunBenchSendActivateReceive(1, 2, false, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAllocUnited) { - TActorBenchmark::RunBenchSendActivateReceive(1, 2, false, TActorBenchmark::EPoolType::United); - } - Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAlloc) { TActorBenchmark::RunBenchSendActivateReceive(2, 1, true, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAllocUnited) { - TActorBenchmark::RunBenchSendActivateReceive(2, 1, true, TActorBenchmark::EPoolType::United); - } - Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAlloc) { TActorBenchmark::RunBenchSendActivateReceive(2, 1, false, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAllocUnited) { - TActorBenchmark::RunBenchSendActivateReceive(2, 1, false, TActorBenchmark::EPoolType::United); - } - Y_UNIT_TEST(SendActivateReceive1Pool1Threads) { TActorBenchmark::RunBenchContentedThreads(1, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool1ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(1, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool2Threads) { TActorBenchmark::RunBenchContentedThreads(2, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(2, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool3Threads) { TActorBenchmark::RunBenchContentedThreads(3, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool3ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(3, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool4Threads) { TActorBenchmark::RunBenchContentedThreads(4, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool4ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(4, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool5Threads) { TActorBenchmark::RunBenchContentedThreads(5, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool5ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(5, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool6Threads) { TActorBenchmark::RunBenchContentedThreads(6, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool6ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(6, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool7Threads) { TActorBenchmark::RunBenchContentedThreads(7, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool7ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(7, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool8Threads) { TActorBenchmark::RunBenchContentedThreads(8, TActorBenchmark::EPoolType::Basic); } - Y_UNIT_TEST(SendActivateReceive1Pool8ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(8, TActorBenchmark::EPoolType::United); } Y_UNIT_TEST(SendActivateReceiveCSV) { std::vector<ui32> threadsList; @@ -314,24 +248,6 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl; } } - - Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighboursUnited) { - TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256}; - for (const auto& neighbour : NeighbourActors) { - auto stats = TActorBenchmark::CountStats([neighbour] { - return TActorBenchmark::BenchSendActivateReceiveWithMailboxNeighbours(neighbour, TActorBenchmark::EPoolType::United, ESendingType::Common); - }); - Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; - stats = TActorBenchmark::CountStats([neighbour] { - return TActorBenchmark::BenchSendActivateReceiveWithMailboxNeighbours(neighbour, TActorBenchmark::EPoolType::United, ESendingType::Lazy); - }); - Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Lazy" << Endl; - stats = TActorBenchmark::CountStats([neighbour] { - return TActorBenchmark::BenchSendActivateReceiveWithMailboxNeighbours(neighbour, TActorBenchmark::EPoolType::United, ESendingType::Tail); - }); - Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl; - } - } } Y_UNIT_TEST_SUITE(TestDecorator) { diff --git a/ydb/library/actors/core/actorsystem.h b/ydb/library/actors/core/actorsystem.h index e8144d8fae..326c4a07ee 100644 --- a/ydb/library/actors/core/actorsystem.h +++ b/ydb/library/actors/core/actorsystem.h @@ -2,7 +2,6 @@ #include "defs.h" -#include "balancer.h" #include "config.h" #include "event.h" #include "executor_pool.h" @@ -93,8 +92,6 @@ namespace NActors { ui32 ExecutorsCount = 0; TArrayHolder<TAutoPtr<IExecutorPool>> Executors; - TAutoPtr<IBalancer> Balancer; // main implementation will be implicitly created if not set - TCpuManagerConfig CpuManager; TAutoPtr<ISchedulerThread> Scheduler; diff --git a/ydb/library/actors/core/balancer.cpp b/ydb/library/actors/core/balancer.cpp deleted file mode 100644 index 517d376d1d..0000000000 --- a/ydb/library/actors/core/balancer.cpp +++ /dev/null @@ -1,311 +0,0 @@ -#include "balancer.h" - -#include "probes.h" - -#include <ydb/library/actors/util/cpu_load_log.h> -#include <ydb/library/actors/util/datetime.h> -#include <ydb/library/actors/util/intrinsics.h> - -#include <util/system/spinlock.h> - -#include <algorithm> - -namespace NActors { - LWTRACE_USING(ACTORLIB_PROVIDER); - - // Describes balancing-related state of pool, the most notable is `Importance` to add new cpu - struct TLevel { - // Balancer will try to give more cpu to overloaded pools - enum ELoadClass { - Underloaded = 0, - Moderate = 1, - Overloaded = 2, - }; - - double ScaleFactor; - ELoadClass LoadClass; - ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden - - TLevel() {} - - TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) { - ScaleFactor = double(currentCpus) / cfg.Cpus; - if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu - LoadClass = Underloaded; - } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency - LoadClass = Overloaded; - } else { - LoadClass = Moderate; - } - Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId); - } - - private: - // Importance is simple ui64 value (from highest to lowest): - // 2 Bits: LoadClass - // 8 Bits: Priority - // 10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus) - // 10 Bits: -CpuIdle - // 6 Bits: PoolId - static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) { - ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023); - ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023); - - Y_ABORT_UNLESS(ui64(load) < (1ull << 2ull)); - Y_ABORT_UNLESS(ui64(priority) < (1ull << 8ull)); - Y_ABORT_UNLESS(ui64(scale) < (1ull << 10ull)); - Y_ABORT_UNLESS(ui64(idle) < (1ull << 10ull)); - Y_ABORT_UNLESS(ui64(poolId) < (1ull << 6ull)); - - static_assert(ui64(MaxPools) <= (1ull << 6ull)); - - ui64 importance = - (ui64(load) << ui64(6 + 10 + 10 + 8)) | - (ui64(priority) << ui64(6 + 10 + 10)) | - (ui64(scale) << ui64(6 + 10)) | - (ui64(idle) << ui64(6)) | - ui64(poolId); - return importance; - } - }; - - // Main balancer implemenation - class TBalancer: public IBalancer { - private: - struct TCpu; - struct TPool; - - bool Disabled = true; - TSpinLock Lock; - ui64 NextBalanceTs; - TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps - TVector<TPool> Pools; // Indexed by PoolId, can have gaps - TBalancerConfig Config; - - public: - - ui64 GetPeriodUs() override; - // Setup - TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts); - bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override; - ~TBalancer(); - - // Balancing - bool TryLock(ui64 ts) override; - void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override; - void Balance() override; - void Unlock() override; - - private: - void MoveCpu(TPool& from, TPool& to); - }; - - struct TBalancer::TPool { - TBalancingConfig Config; - TPoolId PoolId; - TString PoolName; - - // Input data for balancing - TBalancerStats Prev; - TBalancerStats Next; - - // Derived stats - double CpuLoad; - double CpuIdle; - - // Classification - // NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level, - // NOTE: but expected levels after movements also - TLevel CurLevel; // Level with current amount of cpu - TLevel AddLevel; // Level after one cpu acception - TLevel SubLevel; // Level after one cpu donation - - // Balancing state - ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced) - ui64 PrevCpus = 0; // Cpus in last period - - explicit TPool(const TBalancingConfig& cfg = {}) - : Config(cfg) - {} - - void Configure(const TBalancingConfig& cfg, const TString& poolName) { - Config = cfg; - // Enforce constraints - if (Config.Cpus > 0) { - Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus); - Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus); - } else { - Y_ABORT_UNLESS(Config.Cpus == 0, - "Unexpected negative Config.Cpus# %" PRIi64, - (i64)Config.Cpus); - Config.MinCpus = 0; - Config.MaxCpus = 0; - } - PoolName = poolName; - } - }; - - struct TBalancer::TCpu { - TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap) - TCpuAllocation Alloc; - TPoolId Current; - TPoolId Assigned; - }; - - TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) - : NextBalanceTs(ts) - , Config(config) - { - for (TPoolId pool = 0; pool < MaxPools; pool++) { - Pools.emplace_back(); - Pools.back().PoolId = pool; - } - for (const TUnitedExecutorPoolConfig& united : unitedPools) { - Pools[united.PoolId].Configure(united.Balancing, united.PoolName); - } - } - - TBalancer::~TBalancer() { - } - - bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) { - // Setup - TCpuId cpuId = cpuAlloc.CpuId; - if (Cpus.size() <= cpuId) { - Cpus.resize(cpuId + 1); - } - TCpu& cpu = Cpus[cpuId]; - cpu.State = state; - cpu.Alloc = cpuAlloc; - - // Fill every pool with cpus up to TBalancingConfig::Cpus - TPoolId pool = 0; - for (TPool& p : Pools) { - if (p.CurrentCpus < p.Config.Cpus) { - p.CurrentCpus++; - break; - } - pool++; - } - if (pool != MaxPools) { // cpu under balancer control - state->SwitchPool(pool); - state->AssignPool(pool); - Disabled = false; - return true; - } - return false; // non-balanced cpu - } - - bool TBalancer::TryLock(ui64 ts) { - if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) { - NextBalanceTs = ts + Us2Ts(Config.PeriodUs); - return true; - } - return false; - } - - void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) { - Y_ABORT_UNLESS(pool < MaxPools); - TPool& p = Pools[pool]; - p.Prev = p.Next; - p.Next = stats; - } - - void TBalancer::Balance() { - // Update every cpu state - for (TCpu& cpu : Cpus) { - if (cpu.State) { - cpu.State->Load(cpu.Assigned, cpu.Current); - if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) { - return; // previous movement has not been applied yet, wait - } - } - } - - // Process stats, classify and compute pool importance - TStackVec<TPool*, MaxPools> order; - for (TPool& pool : Pools) { - if (pool.Config.Cpus == 0) { - continue; // skip gaps (non-existent or non-united pools) - } - if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) { - return; // invalid stats - } - - // Compute derived stats - pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts); - if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) { - pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests - } else { - pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts); - } - - // Compute levels - pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle, - pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs); - pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle, - 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized - pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1, - pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs); - - // Prepare for balancing - pool.PrevCpus = pool.CurrentCpus; - order.push_back(&pool); - } - - // Sort pools by importance - std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; }); - for (TPool* pool : order) { - LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance); - } - - // Move cpus from lower importance to higher importance pools - for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) { - TPool& to = **toIter; - if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded - to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated - { - for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) { - TPool& from = **fromIter; - if (from.CurrentCpus == from.PrevCpus && // if not balanced yet - from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated - from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement - { - MoveCpu(from, to); - from.CurrentCpus--; - to.CurrentCpus++; - break; - } - } - } - } - } - - void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) { - for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) { - TCpu& cpu = *ci; - if (!cpu.State) { - continue; - } - if (cpu.Assigned == from.PoolId) { - cpu.State->AssignPool(to.PoolId); - cpu.Assigned = to.PoolId; - LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId); - return; - } - } - Y_ABORT(); - } - - void TBalancer::Unlock() { - Lock.Release(); - } - - ui64 TBalancer::GetPeriodUs() { - return Config.PeriodUs; - } - - IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) { - return new TBalancer(config, unitedPools, ts); - } -} diff --git a/ydb/library/actors/core/balancer.h b/ydb/library/actors/core/balancer.h deleted file mode 100644 index e1f6f33bf3..0000000000 --- a/ydb/library/actors/core/balancer.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include "defs.h" -#include "config.h" -#include "cpu_state.h" - -namespace NActors { - // Per-pool statistics used by balancer - struct TBalancerStats { - ui64 Ts = 0; // Measurement timestamp - ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start - ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex - ui64 WorstActivationTimeUs = 0; - ui64 ExpectedLatencyIncreaseUs = 0; - }; - - // Pool cpu balancer - struct IBalancer { - virtual ~IBalancer() {} - virtual bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) = 0; - virtual bool TryLock(ui64 ts) = 0; - virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0; - virtual void Balance() = 0; - virtual void Unlock() = 0; - virtual ui64 GetPeriodUs() = 0; - // TODO: add method for reconfiguration on fly - }; - - IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts); -} diff --git a/ydb/library/actors/core/balancer_ut.cpp b/ydb/library/actors/core/balancer_ut.cpp deleted file mode 100644 index d8bf3da056..0000000000 --- a/ydb/library/actors/core/balancer_ut.cpp +++ /dev/null @@ -1,225 +0,0 @@ -#include "balancer.h" - -#include <ydb/library/actors/util/datetime.h> -#include <library/cpp/lwtrace/all.h> -#include <library/cpp/testing/unittest/registar.h> - -#include <util/stream/str.h> - -using namespace NActors; - -//////////////////////////////////////////////////////////////////////////////// - -Y_UNIT_TEST_SUITE(PoolCpuBalancer) { - struct TTest { - TCpuManagerConfig Config; - TCpuMask Available; - THolder<IBalancer> Balancer; - TVector<TCpuState> CpuStates; - TVector<ui64> CpuUs; - ui64 Now = 0; - - void SetCpuCount(size_t count) { - Config.UnitedWorkers.CpuCount = count; - for (TCpuId cpuId = 0; cpuId < count; cpuId++) { - Available.Set(cpuId); - } - } - - void AddPool(ui32 minCpus, ui32 cpus, ui32 maxCpus, ui8 priority = 0) { - TUnitedExecutorPoolConfig u; - u.PoolId = TPoolId(Config.United.size()); - u.Balancing.Cpus = cpus; - u.Balancing.MinCpus = minCpus; - u.Balancing.MaxCpus = maxCpus; - u.Balancing.Priority = priority; - Config.United.push_back(u); - } - - void Start() { - TCpuAllocationConfig allocation(Available, Config); - Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, 0)); - CpuStates.resize(allocation.Items.size()); // do not resize it later to avoid dangling pointers - CpuUs.resize(CpuStates.size()); - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - bool added = Balancer->AddCpu(cpuAlloc, &CpuStates[cpuAlloc.CpuId]); - UNIT_ASSERT(added); - } - } - - void Balance(ui64 deltaTs, const TVector<ui64>& cpuUs) { - Now += deltaTs; - ui64 ts = Now; - if (Balancer->TryLock(ts)) { - for (TPoolId pool = 0; pool < cpuUs.size(); pool++) { - CpuUs[pool] += cpuUs[pool]; - TBalancerStats stats; - stats.Ts = ts; - stats.CpuUs = CpuUs[pool]; - Balancer->SetPoolStats(pool, stats); - } - Balancer->Balance(); - Balancer->Unlock(); - } - } - - void ApplyMovements() { - for (TCpuState& state : CpuStates) { - TPoolId current; - TPoolId assigned; - state.Load(assigned, current); - state.SwitchPool(assigned); - } - } - - static TString ToStr(const TVector<ui64>& values) { - TStringStream ss; - ss << "{"; - for (auto v : values) { - ss << " " << v; - } - ss << " }"; - return ss.Str(); - } - - void AssertPoolsCurrentCpus(const TVector<ui64>& cpuRequired) { - TVector<ui64> cpuCurrent; - cpuCurrent.resize(cpuRequired.size()); - for (TCpuState& state : CpuStates) { - TPoolId current; - TPoolId assigned; - state.Load(assigned, current); - cpuCurrent[current]++; - } - for (TPoolId pool = 0; pool < cpuRequired.size(); pool++) { - UNIT_ASSERT_C(cpuCurrent[pool] == cpuRequired[pool], - "cpu distribution mismatch, required " << ToStr(cpuRequired) << " but got " << ToStr(cpuCurrent)); - } - } - }; - - Y_UNIT_TEST(StartLwtrace) { - NLWTrace::StartLwtraceFromEnv(); - } - - Y_UNIT_TEST(AllOverloaded) { - TTest t; - int cpus = 10; - t.SetCpuCount(cpus); - t.AddPool(1, 1, 10); // pool=0 - t.AddPool(1, 2, 10); // pool=1 - t.AddPool(1, 3, 10); // pool=2 - t.AddPool(1, 4, 10); // pool=2 - t.Start(); - ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); - ui64 totalCpuUs = cpus * Ts2Us(dts); // pretend every pool has consumed as whole actorsystem, overload - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {totalCpuUs, totalCpuUs, totalCpuUs, totalCpuUs}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 2, 3, 4}); - } - - Y_UNIT_TEST(OneOverloaded) { - TTest t; - int cpus = 10; - t.SetCpuCount(cpus); - t.AddPool(1, 1, 10); // pool=0 - t.AddPool(1, 2, 10); // pool=1 - t.AddPool(1, 3, 10); // pool=2 - t.AddPool(1, 4, 10); // pool=2 - t.Start(); - ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); - ui64 totalCpuUs = cpus * Ts2Us(dts); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {totalCpuUs, 0, 0, 0}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({7, 1, 1, 1}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {0, totalCpuUs, 0, 0}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 7, 1, 1}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {0, 0, totalCpuUs, 0}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 1, 7, 1}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {0, 0, 0, totalCpuUs}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 1, 1, 7}); - } - - Y_UNIT_TEST(TwoOverloadedFairness) { - TTest t; - int cpus = 10; - t.SetCpuCount(cpus); - t.AddPool(1, 1, 10); // pool=0 - t.AddPool(1, 2, 10); // pool=1 - t.AddPool(1, 3, 10); // pool=2 - t.AddPool(1, 4, 10); // pool=2 - t.Start(); - ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); - ui64 totalCpuUs = cpus * Ts2Us(dts); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {totalCpuUs, totalCpuUs, 0, 0}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({3, 5, 1, 1}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {totalCpuUs, 0, totalCpuUs, 0}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({2, 1, 6, 1}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {totalCpuUs, 0, 0, totalCpuUs}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({2, 1, 1, 6}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {0, totalCpuUs, totalCpuUs, 0}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 3, 5, 1}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {0, totalCpuUs, 0, totalCpuUs}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 3, 1, 5}); - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {0, 0, totalCpuUs, totalCpuUs}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({1, 1, 3, 5}); - } - - Y_UNIT_TEST(TwoOverloadedPriority) { - TTest t; - int cpus = 20; - t.SetCpuCount(cpus); - t.AddPool(1, 5, 20, 0); // pool=0 - t.AddPool(1, 5, 20, 1); // pool=1 - t.AddPool(1, 5, 20, 2); // pool=2 - t.AddPool(1, 5, 20, 3); // pool=3 - t.Start(); - ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); - ui64 mErlang = Ts2Us(dts) / 1000; - for (int i = 0; i < cpus; i++) { - t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 9500 * mErlang}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({2, 3, 5, 10}); - t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 8500 * mErlang}); - t.ApplyMovements(); - t.AssertPoolsCurrentCpus({3, 3, 5, 9}); - // NOTE: this operation require one move, but we do not make global analysis, so multiple steps (1->2 & 0->1) are required (can be optimized later) - for (int i = 0; i < 3; i++) { - t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 5500 * mErlang, 8500 * mErlang}); - t.ApplyMovements(); - } - t.AssertPoolsCurrentCpus({2, 3, 6, 9}); - } -} diff --git a/ydb/library/actors/core/config.h b/ydb/library/actors/core/config.h index bcbc87653f..d70a917847 100644 --- a/ydb/library/actors/core/config.h +++ b/ydb/library/actors/core/config.h @@ -10,24 +10,6 @@ namespace NActors { - struct TBalancingConfig { - // Default cpu count (used during overload). Zero value disables this pool balancing - // 1) Sum of `Cpus` on all pools cannot be changed without restart - // (changing cpu mode between Shared and Assigned is not implemented yet) - // 2) This sum must be equal to TUnitedWorkersConfig::CpuCount, - // otherwise `CpuCount - SUM(Cpus)` cpus will be in Shared mode (i.e. actorsystem 2.0) - ui32 Cpus = 0; - - ui32 MinCpus = 0; // Lower balancing bound, should be at least 1, and not greater than `Cpus` - ui32 MaxCpus = 0; // Higher balancing bound, should be not lower than `Cpus` - ui8 Priority = 0; // Priority of pool to obtain cpu due to balancing (higher is better) - ui64 ToleratedLatencyUs = 0; // p100-latency threshold indicating that more cpus are required by pool - }; - - struct TBalancerConfig { - ui64 PeriodUs = 15000000; // Time between balancer steps - }; - enum class EASProfile { Default, LowCpuConsumption, @@ -62,40 +44,6 @@ namespace NActors { TCpuMask Affinity; // Executor thread affinity }; - struct TUnitedExecutorPoolConfig { - static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10); - static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; - - ui32 PoolId = 0; - TString PoolName; - - // Resource sharing - ui32 Concurrency = 0; // Limits simultaneously running mailboxes count if set to non-zero value (do not set if Balancing.Cpus != 0) - TPoolWeight Weight = 0; // Weight in fair cpu-local pool scheduler - TCpuMask Allowed; // Allowed CPUs for workers to run this pool on (ignored if balancer works, i.e. actorsystem 1.5) - - // Single mailbox execution limits - TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX; - ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; - - // Long-term balancing - TBalancingConfig Balancing; - }; - - struct TUnitedWorkersConfig { - ui32 CpuCount = 0; // Total CPUs running united workers (i.e. TBasicExecutorPoolConfig::Threads analog); set to zero to disable united workers - ui64 SpinThresholdUs = 100; // Limit for active spinning in case all pools became idle - ui64 PoolLimitUs = 500; // Soft limit on pool execution - ui64 EventLimitUs = 100; // Hard limit on last event execution exceeding pool limit - ui64 LimitPrecisionUs = 100; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch) - ui64 FastWorkerPriority = 10; // Real-time priority of workers not exceeding hard limits - ui64 IdleWorkerPriority = 20; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority) - TCpuMask Allowed; // Allowed CPUs for workers to run on (every worker has affinity for exactly one cpu) - bool NoRealtime = false; // For environments w/o permissions for RT-threads - bool NoAffinity = false; // For environments w/o permissions for cpu affinity - TBalancerConfig Balancer; - }; - struct TSelfPingInfo { NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; @@ -103,14 +51,12 @@ namespace NActors { }; struct TCpuManagerConfig { - TUnitedWorkersConfig UnitedWorkers; TVector<TBasicExecutorPoolConfig> Basic; TVector<TIOExecutorPoolConfig> IO; - TVector<TUnitedExecutorPoolConfig> United; TVector<TSelfPingInfo> PingInfoByPool; ui32 GetExecutorsCount() const { - return Basic.size() + IO.size() + United.size(); + return Basic.size() + IO.size(); } TString GetPoolName(ui32 poolId) const { @@ -124,11 +70,6 @@ namespace NActors { return p.PoolName; } } - for (const auto& p : United) { - if (p.PoolId == poolId) { - return p.PoolName; - } - } Y_ABORT("undefined pool id: %" PRIu32, (ui32)poolId); } @@ -143,11 +84,6 @@ namespace NActors { return p.Threads; } } - for (const auto& p : United) { - if (p.PoolId == poolId) { - return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount; - } - } return {}; } @@ -220,41 +156,4 @@ namespace NActors { } }; - struct TCpuAllocationConfig { - TVector<TCpuAllocation> Items; - - TCpuAllocationConfig(const TCpuMask& available, const TCpuManagerConfig& cfg) { - for (const TUnitedExecutorPoolConfig& pool : cfg.United) { - Y_ABORT_UNLESS(pool.PoolId < MaxPools, "wrong PoolId of united executor pool: %s(%d)", - pool.PoolName.c_str(), (pool.PoolId)); - } - ui32 allocated[MaxPools] = {0}; - for (TCpuId cpu = 0; cpu < available.Size() && Items.size() < cfg.UnitedWorkers.CpuCount; cpu++) { - if (available.IsSet(cpu)) { - TCpuAllocation item; - item.CpuId = cpu; - for (const TUnitedExecutorPoolConfig& pool : cfg.United) { - if (cfg.UnitedWorkers.Allowed.IsEmpty() || cfg.UnitedWorkers.Allowed.IsSet(cpu)) { - if (pool.Allowed.IsEmpty() || pool.Allowed.IsSet(cpu)) { - item.AllowedPools.emplace_back(pool.PoolId, pool.Weight); - allocated[pool.PoolId]++; - } - } - } - if (!item.AllowedPools.empty()) { - Items.push_back(item); - } - } - } - for (const TUnitedExecutorPoolConfig& pool : cfg.United) { - Y_ABORT_UNLESS(allocated[pool.PoolId] > 0, "unable to allocate cpu for united executor pool: %s(%d)", - pool.PoolName.c_str(), (pool.PoolId)); - } - } - - operator bool() const { - return !Items.empty(); - } - }; - } diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 24b3161e3c..1898904bb3 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -3,23 +3,16 @@ #include "executor_pool_basic.h" #include "executor_pool_io.h" -#include "executor_pool_united.h" namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); TCpuManager::TCpuManager(THolder<TActorSystemSetup>& setup) : ExecutorPoolCount(setup->GetExecutorsCount()) - , Balancer(setup->Balancer) , Config(setup->CpuManager) { if (setup->Executors) { // Explicit mode w/o united pools Executors.Reset(setup->Executors.Release()); - for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { - IExecutorPool* pool = Executors[excIdx].Get(); - Y_ABORT_UNLESS(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr, - "united executor pool is prohibited in explicit mode of NActors::TCpuManager"); - } } else { Setup(); } @@ -28,14 +21,6 @@ namespace NActors { void TCpuManager::Setup() { TAffinity available; available.Current(); - TCpuAllocationConfig allocation(available, Config); - - if (allocation) { - if (!Balancer) { - Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast())); - } - UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get())); - } ui64 ts = GetCycleCountFast(); Harmonizer.Reset(MakeHarmonizer(ts)); @@ -53,9 +38,6 @@ namespace NActors { } void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) { - if (UnitedWorkers) { - UnitedWorkers->Prepare(actorSystem, scheduleReaders); - } for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { NSchedulerQueue::TReader* readers; ui32 readersCount = 0; @@ -67,9 +49,6 @@ namespace NActors { } void TCpuManager::Start() { - if (UnitedWorkers) { - UnitedWorkers->Start(); - } for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx]->Start(); } @@ -79,18 +58,12 @@ namespace NActors { for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx]->PrepareStop(); } - if (UnitedWorkers) { - UnitedWorkers->PrepareStop(); - } } void TCpuManager::Shutdown() { for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx]->Shutdown(); } - if (UnitedWorkers) { - UnitedWorkers->Shutdown(); - } for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) { done = 0; for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { @@ -112,7 +85,6 @@ namespace NActors { } } Executors.Destroy(); - UnitedWorkers.Destroy(); } IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) { @@ -126,12 +98,6 @@ namespace NActors { return new TIOExecutorPool(cfg); } } - for (TUnitedExecutorPoolConfig& cfg : Config.United) { - if (cfg.PoolId == poolId) { - IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get()); - return result; - } - } Y_ABORT("missing PoolId: %d", int(poolId)); } diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h index 26ba97aa39..29e86170c9 100644 --- a/ydb/library/actors/core/cpu_manager.h +++ b/ydb/library/actors/core/cpu_manager.h @@ -2,8 +2,6 @@ #include "harmonizer.h" #include "executor_pool.h" -#include "executor_pool_united_workers.h" -#include "balancer.h" namespace NActors { struct TActorSystemSetup; @@ -11,8 +9,6 @@ namespace NActors { class TCpuManager : public TNonCopyable { const ui32 ExecutorPoolCount; TArrayHolder<TAutoPtr<IExecutorPool>> Executors; - THolder<TUnitedWorkers> UnitedWorkers; - THolder<IBalancer> Balancer; THolder<IHarmonizer> Harmonizer; TCpuManagerConfig Config; diff --git a/ydb/library/actors/core/executor_pool_united.cpp b/ydb/library/actors/core/executor_pool_united.cpp deleted file mode 100644 index ef3f0830ea..0000000000 --- a/ydb/library/actors/core/executor_pool_united.cpp +++ /dev/null @@ -1,1455 +0,0 @@ -#include "executor_pool_united.h" -#include "executor_pool_united_workers.h" - -#include "actor.h" -#include "balancer.h" -#include "cpu_state.h" -#include "executor_thread.h" -#include "probes.h" -#include "mailbox.h" -#include "scheduler_queue.h" -#include <ydb/library/actors/util/affinity.h> -#include <ydb/library/actors/util/cpu_load_log.h> -#include <ydb/library/actors/util/datetime.h> -#include <ydb/library/actors/util/futex.h> -#include <ydb/library/actors/util/intrinsics.h> -#include <ydb/library/actors/util/timerfd.h> - -#include <util/system/datetime.h> -#include <util/system/hp_timer.h> - -#include <algorithm> - -namespace NActors { - LWTRACE_USING(ACTORLIB_PROVIDER); - - struct TUnitedWorkers::TWorker: public TNonCopyable { - TAutoPtr<TExecutorThread> Thread; - volatile TThreadId ThreadId = UnknownThreadId; - NSchedulerQueue::TQueueType SchedulerQueue; - }; - - struct TUnitedWorkers::TPool: public TNonCopyable { - TAtomic Waiters = 0; // Number of idle cpus, waiting for activations in this pool - char Padding[64 - sizeof(TAtomic)]; - - TUnorderedCache<ui32, 512, 4> Activations; // MPMC-queue for mailbox activations - TAtomic Active = 0; // Number of mailboxes ready for execution or currently executing - TAtomic Tokens = 0; // Pending tokens (token is required for worker to start execution, guarantees concurrency limit and activation availability) - volatile bool StopFlag = false; - - // Configuration - TPoolId PoolId; - TAtomicBase Concurrency; // Max concurrent workers running this pool - IExecutorPool* ExecutorPool; - TMailboxTable* MailboxTable; - ui64 TimePerMailboxTs; - ui32 EventsPerMailbox; - - // Cpus this pool is allowed to run on - // Cpus are specified in wake order - TStackVec<TCpu*, 15> WakeOrderCpus; - - ~TPool() { - while (Activations.Pop(0)) {} - } - - void Stop() { - AtomicStore(&StopFlag, true); - } - - bool IsUnited() const { - return WakeOrderCpus.size(); - } - - // Add activation of newly scheduled mailbox. Returns generated token (unless concurrency is exceeded) - bool PushActivation(ui32 activation, ui64 revolvingCounter) { - Activations.Push(activation, revolvingCounter); - TAtomicBase active = AtomicIncrement(Active); - if (active <= Concurrency) { // token generated - AtomicIncrement(Tokens); - return true; - } - return false; - } - - template <bool Relaxed> - static bool TryAcquireTokenImpl(TAtomic* tokens) { - while (true) { - TAtomicBase value; - if constexpr (Relaxed) { - value = RelaxedLoad(tokens); - } else { - value = AtomicLoad(tokens); - } - if (value > 0) { - if (AtomicCas(tokens, value - 1, value)) { - return true; // token acquired - } - } else { - return false; // no more tokens - } - } - } - - // Try acquire pending token. Must be done before execution - bool TryAcquireToken() { - return TryAcquireTokenImpl<false>(&Tokens); - } - - // Try acquire pending token. Must be done before execution - bool TryAcquireTokenRelaxed() { - return TryAcquireTokenImpl<true>(&Tokens); - } - - // Get activation. Requires acquired token. - void BeginExecution(ui32& activation, ui64 revolvingCounter) { - while (!RelaxedLoad(&StopFlag)) { - if (activation = Activations.Pop(++revolvingCounter)) { - return; - } - SpinLockPause(); - } - activation = 0; // should stop - } - - // End currently active execution and start new one if token is available. - // Reuses token if it's not destroyed. - // Returned `true` means successful switch, `activation` is filled. - // Returned `false` means execution has ended, no need to call StopExecution() - bool NextExecution(ui32& activation, ui64 revolvingCounter) { - if (AtomicDecrement(Active) >= Concurrency) { // reuse just released token - BeginExecution(activation, revolvingCounter); - return true; - } else if (TryAcquireToken()) { // another token acquired - BeginExecution(activation, revolvingCounter); - return true; - } - return false; // no more tokens available - } - - // Stop active execution. Returns released token (unless it is destroyed) - bool StopExecution() { - TAtomicBase active = AtomicDecrement(Active); - if (active >= Concurrency) { // token released - AtomicIncrement(Tokens); - return true; - } - return false; // token destroyed - } - - // Switch worker context into this pool - void Switch(TWorkerContext& wctx, ui64 softDeadlineTs, TExecutorThreadStats& stats) { - wctx.Switch(ExecutorPool, MailboxTable, TimePerMailboxTs, EventsPerMailbox, softDeadlineTs, &stats); - } - }; - - class TPoolScheduler { - class TSchedulable { - // Lower PoolBits store PoolId - // All other higher bits store virtual runtime in cycles - using TValue = ui64; - TValue Value; - - static constexpr ui64 PoolIdMask = ui64((1ull << PoolBits) - 1); - static constexpr ui64 VRunTsMask = ~PoolIdMask; - - public: - explicit TSchedulable(TPoolId poolId = MaxPools, ui64 vrunts = 0) - : Value((poolId & PoolIdMask) | (vrunts & VRunTsMask)) - {} - - TPoolId GetPoolId() const { - return Value & PoolIdMask; - } - - ui64 GetVRunTs() const { - // Do not truncate pool id - // NOTE: it decrease accuracy, but improves performance - return Value; - } - - ui64 GetPreciseVRunTs() const { - return Value & VRunTsMask; - } - - void SetVRunTs(ui64 vrunts) { - Value = (Value & PoolIdMask) | (vrunts & VRunTsMask); - } - - void Account(ui64 base, ui64 ts) { - // Add at least minimum amount to change Value - SetVRunTs(base + Max(ts, PoolIdMask + 1)); - } - }; - - // For min-heap of Items - struct TCmp { - bool operator()(TSchedulable lhs, TSchedulable rhs) const { - return lhs.GetVRunTs() > rhs.GetVRunTs(); - } - }; - - TPoolId Size = 0; // total number of pools on this cpu - TPoolId Current = 0; // index of current pool in `Items` - - // At the beginning `Current` items are orginized as binary min-heap -- ready to be scheduled - // The rest `Size - Current` items are unordered (required to keep track of last vrunts) - TSchedulable Items[MaxPools]; // virtual runtime in cycles for each pool - ui64 MinVRunTs = 0; // virtual runtime used by waking pools (system's vrunts) - ui64 Ts = 0; // real timestamp of current execution start (for accounting) - - // Maps PoolId into it's inverse weight - ui64 InvWeights[MaxPools]; - static constexpr ui64 VRunTsOverflow = ui64(1ull << 62ull) / MaxPoolWeight; - - public: - void AddPool(TPoolId pool, TPoolWeight weight) { - Items[Size] = TSchedulable(pool, MinVRunTs); - Size++; - InvWeights[pool] = MaxPoolWeight / std::clamp(weight ? weight : DefPoolWeight, MinPoolWeight, MaxPoolWeight); - } - - // Iterate over pools in scheduling order - // should be used in construction: - // for (TPoolId pool = Begin(); pool != End(); pool = Next()) - TPoolId Begin() { - // Wrap vruntime around to avoid overflow, if required - if (Y_UNLIKELY(MinVRunTs >= VRunTsOverflow)) { - for (TPoolId i = 0; i < Size; i++) { - ui64 ts = Items[i].GetPreciseVRunTs(); - Items[i].SetVRunTs(ts >= VRunTsOverflow ? ts - VRunTsOverflow : 0); - } - MinVRunTs -= VRunTsOverflow; - } - Current = Size; - std::make_heap(Items, Items + Current, TCmp()); - return Next(); - } - - constexpr TPoolId End() const { - return MaxPools; - } - - TPoolId Next() { - if (Current > 0) { - std::pop_heap(Items, Items + Current, TCmp()); - Current--; - return CurrentPool(); - } else { - return End(); - } - } - - // Scheduling was successful, we are going to run CurrentPool() - void Scheduled() { - MinVRunTs = Max(MinVRunTs, Items[Current].GetPreciseVRunTs()); - // NOTE: Ts is propagated on Account() to avoid gaps - } - - // Schedule specific pool that woke up cpu after idle - void ScheduledAfterIdle(TPoolId pool, ui64 ts) { - if (Y_UNLIKELY(ts < Ts)) { // anomaly: time goes backwards (e.g. rdtsc is reset to zero on cpu reset) - Ts = ts; // just skip anomalous time slice - return; - } - MinVRunTs += (ts - Ts) * (MaxPoolWeight / DefPoolWeight); // propagate system's vrunts to blur difference between pools - Ts = ts; // propagate time w/o accounting to any pool - - // Set specified pool as current, it requires scan - for (Current = 0; Current < Size && pool != Items[Current].GetPoolId(); Current++) {} - Y_ABORT_UNLESS(Current < Size); - } - - // Account currently running pool till now (ts) - void Account(ui64 ts) { - // Skip time slice for the first run and when time goes backwards (e.g. rdtsc is reset to zero on cpu reset) - if (Y_LIKELY(Ts > 0 && Ts <= ts)) { - TPoolId pool = CurrentPool(); - Y_ABORT_UNLESS(pool < MaxPools); - Items[Current].Account(MinVRunTs, (ts - Ts) * InvWeights[pool]); - } - Ts = ts; // propagate time - } - - TPoolId CurrentPool() const { - return Items[Current].GetPoolId(); - } - }; - - // Cyclic array of timers for idle workers to wait for hard preemption on - struct TIdleQueue: public TNonCopyable { - TArrayHolder<TTimerFd> Timers; - size_t Size; - TAtomic EnqueueCounter = 0; - TAtomic DequeueCounter = 0; - - explicit TIdleQueue(size_t size) - : Timers(new TTimerFd[size]) - , Size(size) - {} - - void Stop() { - for (size_t i = 0; i < Size; i++) { - Timers[i].Wake(); - } - } - - // Returns timer which new idle-worker should wait for - TTimerFd* Enqueue() { - return &Timers[AtomicGetAndIncrement(EnqueueCounter) % Size]; - } - - // Returns timer that hard preemption should trigger to wake idle-worker - TTimerFd* Dequeue() { - return &Timers[AtomicGetAndIncrement(DequeueCounter) % Size]; - } - }; - - // Base class for cpu-local managers that help workers on single cpu to cooperate - struct TCpuLocalManager: public TThrRefBase { - TUnitedWorkers* United; - - explicit TCpuLocalManager(TUnitedWorkers* united) - : United(united) - {} - - virtual TWorkerId WorkerCount() const = 0; - virtual void AddWorker(TWorkerId workerId) = 0; - virtual void Stop() = 0; - }; - - // Represents cpu with single associated worker that is able to execute any pool. - // It always executes pool assigned by balancer and switch pool only if assigned pool has changed - struct TAssignedCpu: public TCpuLocalManager { - bool Started = false; - - TAssignedCpu(TUnitedWorkers* united) - : TCpuLocalManager(united) - {} - - TWorkerId WorkerCount() const override { - return 1; - } - - void AddWorker(TWorkerId workerId) override { - Y_UNUSED(workerId); - } - - ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { - ui32 activation; - if (Y_UNLIKELY(!Started)) { - Started = true; - } else if (Y_UNLIKELY(United->IsPoolReassigned(wctx))) { - United->StopExecution(wctx.PoolId); // stop current execution and switch pool if reassigned - } else if (United->NextExecution(wctx.PoolId, activation, revolvingCounter)) { - return activation; // another activation from currently executing pool (or 0 if stopped) - } - - // Switch to another pool, it blocks until token is acquired - if (Y_UNLIKELY(!SwitchPool(wctx))) { - return 0; // stopped - } - United->SwitchPool(wctx, 0); - United->BeginExecution(wctx.PoolId, activation, revolvingCounter); - return activation; - } - - void Stop() override { - } - - private: - // Sets next pool to run, and acquires token, blocks if there are no tokens - bool SwitchPool(TWorkerContext& wctx) { - if (Y_UNLIKELY(United->IsStopped())) { - return false; - } - - // Run balancer (if it's time to) - United->Balance(); - - // Select pool to execute - wctx.PoolId = United->AssignedPool(wctx); - Y_ABORT_UNLESS(wctx.PoolId != CpuShared); - if (United->TryAcquireToken(wctx.PoolId)) { - return true; - } - - // No more work -- wait for activations (spinning, then blocked) - wctx.PoolId = United->Idle(wctx.PoolId, wctx); - - // Wakeup or stop occured - if (Y_UNLIKELY(wctx.PoolId == CpuStopped)) { - return false; - } - return true; // United->Idle() has already acquired token - } - }; - - // Lock-free data structure that help workers on single cpu to discover their state and do hard preemptions - struct TSharedCpu: public TCpuLocalManager { - // Current lease - volatile TLease::TValue CurrentLease; - char Padding1[64 - sizeof(TLease)]; - - // Slow pools - // the highest bit: 1=wait-for-slow-workers mode 0=else - // any lower bit (poolId is bit position): 1=pool-is-slow 0=pool-is-fast - volatile TPoolsMask SlowPoolsMask = 0; - char Padding2[64 - sizeof(TPoolsMask)]; - - // Must be accessed under never expiring lease to avoid races - TPoolScheduler PoolSched; - TWorkerId FastWorker = MaxWorkers; - TTimerFd* PreemptionTimer = nullptr; - ui64 HardPreemptionTs = 0; - bool Started = false; - - TIdleQueue IdleQueue; - - struct TConfig { - const TCpuId CpuId; - const TWorkerId Workers; - ui64 SoftLimitTs; - ui64 HardLimitTs; - ui64 EventLimitTs; - ui64 LimitPrecisionTs; - const int IdleWorkerPriority; - const int FastWorkerPriority; - const bool NoRealtime; - const bool NoAffinity; - const TCpuAllocation CpuAlloc; - - TConfig(const TCpuAllocation& allocation, const TUnitedWorkersConfig& united) - : CpuId(allocation.CpuId) - , Workers(allocation.AllowedPools.size() + 1) - , SoftLimitTs(Us2Ts(united.PoolLimitUs)) - , HardLimitTs(Us2Ts(united.PoolLimitUs + united.EventLimitUs)) - , EventLimitTs(Us2Ts(united.EventLimitUs)) - , LimitPrecisionTs(Us2Ts(united.LimitPrecisionUs)) - , IdleWorkerPriority(std::clamp<ui64>(united.IdleWorkerPriority ? united.IdleWorkerPriority : 20, 1, 99)) - , FastWorkerPriority(std::clamp<ui64>(united.FastWorkerPriority ? united.FastWorkerPriority : 10, 1, IdleWorkerPriority - 1)) - , NoRealtime(united.NoRealtime) - , NoAffinity(united.NoAffinity) - , CpuAlloc(allocation) - {} - }; - - TConfig Config; - TVector<TWorkerId> Workers; - - TSharedCpu(const TConfig& cfg, TUnitedWorkers* united) - : TCpuLocalManager(united) - , IdleQueue(cfg.Workers) - , Config(cfg) - { - for (const auto& pa : Config.CpuAlloc.AllowedPools) { - PoolSched.AddPool(pa.PoolId, pa.Weight); - } - } - - TWorkerId WorkerCount() const override { - return Config.Workers; - } - - void AddWorker(TWorkerId workerId) override { - if (Workers.empty()) { - // Grant lease to the first worker - AtomicStore(&CurrentLease, TLease(workerId, NeverExpire).Value); - } - Workers.push_back(workerId); - } - - ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { - ui32 activation; - if (!wctx.Lease.IsNeverExpiring()) { - if (wctx.SoftDeadlineTs < GetCycleCountFast()) { // stop if lease has expired or is near to be expired - United->StopExecution(wctx.PoolId); - } else if (United->NextExecution(wctx.PoolId, activation, revolvingCounter)) { - return activation; // another activation from currently executing pool (or 0 if stopped) - } - } - - // Switch to another pool, it blocks until token is acquired - if (Y_UNLIKELY(!SwitchPool(wctx))) { - return 0; // stopped - } - United->BeginExecution(wctx.PoolId, activation, revolvingCounter); - return activation; - } - - void Stop() override { - IdleQueue.Stop(); - } - - private: - enum EPriority { - IdlePriority, // highest (real-time, Config.IdleWorkerPriority) - FastPriority, // normal (real-time, Config.FastWorkerPriority) - SlowPriority, // lowest (not real-time) - }; - - enum EWorkerAction { - // Fast-worker - ExecuteFast, - WaitForSlow, - - // Slow-worker - BecameIdle, - WakeFast, - - // Idle-worker - BecameFast, - Standby, - - // Common - Stopped, - }; - - // Thread-safe; should be called from worker - // Blocks for idle-workers; sets lease and next pool to run - bool SwitchPool(TWorkerContext& wctx) { - TTimerFd* idleTimer = nullptr; - while (true) { - if (DisablePreemptionAndTryExtend(wctx.Lease)) { // if fast-worker - if (Y_UNLIKELY(!Started)) { - SetPriority(0, FastPriority); - Started = true; - } - while (true) { - switch (FastWorkerAction(wctx)) { - case ExecuteFast: - United->SwitchPool(wctx, wctx.Lease.GetPreciseExpireTs() - Config.EventLimitTs); - EnablePreemptionAndGrant(wctx.Lease); - return true; - case WaitForSlow: - FastWorkerSleep(GetCycleCountFast() + Config.SoftLimitTs); - break; - case Stopped: return false; - default: Y_ABORT(); - } - } - } else if (wctx.Lease.IsNeverExpiring()) { // if idle-worker - switch (IdleWorkerAction(idleTimer, wctx.Lease.GetWorkerId())) { - case BecameFast: - SetPriority(0, FastPriority); - break; // try acquire new lease - case Standby: - if (!idleTimer) { - idleTimer = IdleQueue.Enqueue(); - } - SetPriority(0, IdlePriority); - idleTimer->Wait(); - break; - case Stopped: return false; - default: Y_ABORT(); - } - } else { // lease has expired and hard preemption occured, so we are executing in a slow-worker - wctx.IncrementPreemptedEvents(); - switch (SlowWorkerAction(wctx.PoolId)) { - case WakeFast: - WakeFastWorker(); - [[fallthrough]]; // no break; pass through - case BecameIdle: - wctx.Lease = wctx.Lease.NeverExpire(); - wctx.PoolId = MaxPools; - idleTimer = nullptr; - break; - case Stopped: return false; - default: Y_ABORT(); - } - } - } - } - - enum ETryRunPool { - RunFastPool, - RunSlowPool, - NoTokens, - }; - - ETryRunPool TryRun(TPoolId pool) { - while (true) { - // updates WaitPoolsFlag in SlowPoolsMask according to scheduled pool slowness - TPoolsMask slow = AtomicLoad(&SlowPoolsMask); - if ((1ull << pool) & slow) { // we are about to execute slow pool (fast-worker will just wait, token is NOT required) - if (slow & WaitPoolsFlag) { - return RunSlowPool; // wait flag is already set - } else { - if (AtomicCas(&SlowPoolsMask, slow | WaitPoolsFlag, slow)) { // try set wait flag - return RunSlowPool; // wait flag has been successfully set - } - } - } else { // we are about to execute fast pool, token required - if (slow & WaitPoolsFlag) { // reset wait flag if required - if (AtomicCas(&SlowPoolsMask, slow & ~WaitPoolsFlag, slow)) { // try reset wait flag - return United->TryAcquireToken(pool) ? RunFastPool : NoTokens; // wait flag has been successfully reset - } - } else { - return United->TryAcquireToken(pool) ? RunFastPool : NoTokens; // wait flag is already reset - } - } - } - } - - EWorkerAction FastWorkerAction(TWorkerContext& wctx) { - if (Y_UNLIKELY(United->IsStopped())) { - return Stopped; - } - - // Account current pool - ui64 ts = GetCycleCountFast(); - PoolSched.Account(ts); - - // Select next pool to execute - for (wctx.PoolId = PoolSched.Begin(); wctx.PoolId != PoolSched.End(); wctx.PoolId = PoolSched.Next()) { - switch (TryRun(wctx.PoolId)) { - case RunFastPool: - PoolSched.Scheduled(); - wctx.Lease = PostponePreemption(wctx.Lease.GetWorkerId(), ts); - return ExecuteFast; - case RunSlowPool: - PoolSched.Scheduled(); - ResetPreemption(wctx.Lease.GetWorkerId(), ts); // there is no point in preemption during wait - return WaitForSlow; - case NoTokens: // concurrency limit reached, or no more work in pool - break; // just try next pool (if any) - } - } - - // No more work, no slow-workers -- wait for activations (active, then blocked) - wctx.PoolId = United->Idle(CpuShared, wctx); - - // Wakeup or stop occured - if (Y_UNLIKELY(wctx.PoolId == CpuStopped)) { - return Stopped; - } - ts = GetCycleCountFast(); - PoolSched.ScheduledAfterIdle(wctx.PoolId, ts); - wctx.Lease = PostponePreemption(wctx.Lease.GetWorkerId(), ts); - return ExecuteFast; // United->Idle() has already acquired token - } - - EWorkerAction IdleWorkerAction(TTimerFd* idleTimer, TWorkerId workerId) { - if (Y_UNLIKELY(United->IsStopped())) { - return Stopped; - } - if (!idleTimer) { // either worker start or became idle -- hard preemption is not required - return Standby; - } - - TLease lease = TLease(AtomicLoad(&CurrentLease)); - ui64 ts = GetCycleCountFast(); - if (lease.GetExpireTs() < ts) { // current lease has expired - if (TryBeginHardPreemption(lease)) { - SetPoolIsSlowFlag(PoolSched.CurrentPool()); - TWorkerId preempted = lease.GetWorkerId(); - SetPriority(United->GetWorkerThreadId(preempted), SlowPriority); - LWPROBE(HardPreemption, Config.CpuId, PoolSched.CurrentPool(), preempted, workerId); - EndHardPreemption(workerId); - return BecameFast; - } else { - // Lease has been changed just now, no way we need preemption right now, so no retry needed - return Standby; - } - } else { - // Lease has not expired yet (maybe never expiring lease) - return Standby; - } - } - - EWorkerAction SlowWorkerAction(TPoolId pool) { - if (Y_UNLIKELY(United->IsStopped())) { - return Stopped; - } - while (true) { - TPoolsMask slow = AtomicLoad(&SlowPoolsMask); - if (slow & (1ull << pool)) { - if (slow == (1ull << pool) & WaitPoolsFlag) { // the last slow pool is going to became fast - if (AtomicCas(&SlowPoolsMask, 0, slow)) { // reset both pool-is-slow flag and WaitPoolsFlag - return WakeFast; - } - } else { // there are (a) several slow-worker or (b) one slow-worker w/o waiting fast-worker - if (AtomicCas(&SlowPoolsMask, slow & ~(1ull << pool), slow)) { // reset pool-is-slow flag - return BecameIdle; - } - } - } else { - // SlowWorkerAction has been called between TryBeginHardPreemption and SetPoolIsSlowFlag - // flag for this pool is not set yet, but we can be sure pool is slow: - // - because SlowWorkerAction has been called; - // - this mean lease has expired and hard preemption occured. - // So just wait other worker to call SetPoolIsSlowFlag - LWPROBE(SlowWorkerActionRace, Config.CpuId, pool, slow); - } - } - } - - void SetPoolIsSlowFlag(TPoolId pool) { - while (true) { - TPoolsMask slow = AtomicLoad(&SlowPoolsMask); - if ((slow & (1ull << pool)) == 0) { // if pool is fast - if (AtomicCas(&SlowPoolsMask, slow | (1ull << pool), slow)) { // set pool-is-slow flag - return; - } - } else { - Y_ABORT("two slow-workers executing the same pool on the same core"); - return; // pool is already slow - } - } - } - - bool TryBeginHardPreemption(TLease lease) { - return AtomicCas(&CurrentLease, HardPreemptionLease, lease); - } - - void EndHardPreemption(TWorkerId to) { - ATOMIC_COMPILER_BARRIER(); - if (!AtomicCas(&CurrentLease, TLease(to, NeverExpire), HardPreemptionLease)) { - Y_ABORT("hard preemption failed"); - } - } - - bool DisablePreemptionAndTryExtend(TLease lease) { - return AtomicCas(&CurrentLease, lease.NeverExpire(), lease); - } - - void EnablePreemptionAndGrant(TLease lease) { - ATOMIC_COMPILER_BARRIER(); - if (!AtomicCas(&CurrentLease, lease, lease.NeverExpire())) { - Y_ABORT("lease grant failed"); - } - } - - void FastWorkerSleep(ui64 deadlineTs) { - while (true) { - TPoolsMask slow = AtomicLoad(&SlowPoolsMask); - if ((slow & WaitPoolsFlag) == 0) { - return; // woken by WakeFast action - } - ui64 ts = GetCycleCountFast(); - if (deadlineTs <= ts) { - if (AtomicCas(&SlowPoolsMask, slow & ~WaitPoolsFlag, slow)) { // try reset wait flag - return; // wait flag has been successfully reset after timeout - } - } else { // should wait - ui64 timeoutNs = Ts2Ns(deadlineTs - ts); -#ifdef _linux_ - timespec timeout; - timeout.tv_sec = timeoutNs / 1'000'000'000; - timeout.tv_nsec = timeoutNs % 1'000'000'000; - SysFutex(FastWorkerFutex(), FUTEX_WAIT_PRIVATE, FastWorkerFutexValue(slow), &timeout, nullptr, 0); -#else - NanoSleep(timeoutNs); // non-linux wake is not supported, cpu will go idle on slow -> fast switch -#endif - } - } - } - - void WakeFastWorker() { -#ifdef _linux_ - SysFutex(FastWorkerFutex(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0); -#endif - } - -#ifdef _linux_ - ui32* FastWorkerFutex() { - // Actually we wait on one highest bit, but futex value size is 4 bytes on all platforms - static_assert(sizeof(TPoolsMask) >= 4, "cannot be used as futex value on linux"); - return (ui32*)&SlowPoolsMask + 1; // higher 32 bits (little endian assumed) - } - - ui32 FastWorkerFutexValue(TPoolsMask slow) { - return ui32(slow >> 32); // higher 32 bits - } -#endif - - void SetPriority(TThreadId tid, EPriority priority) { - if (Config.NoRealtime) { - return; - } -#ifdef _linux_ - int policy; - struct sched_param param; - switch (priority) { - case IdlePriority: - policy = SCHED_FIFO; - param.sched_priority = Config.IdleWorkerPriority; - break; - case FastPriority: - policy = SCHED_FIFO; - param.sched_priority = Config.FastWorkerPriority; - break; - case SlowPriority: - policy = SCHED_OTHER; - param.sched_priority = 0; - break; - } - int ret = sched_setscheduler(tid, policy, ¶m); - switch (ret) { - case 0: return; - case EINVAL: - Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> EINVAL", tid, policy, param.sched_priority); - case EPERM: - // Requirements: - // * CAP_SYS_NICE capability to run real-time processes and set cpu affinity. - // Either run under root or set application capabilities: - // sudo setcap cap_sys_nice=eip BINARY - // * Non-zero rt-runtime (in case cgroups are used). - // Either (a) disable global limit on RT processes bandwidth: - // sudo sysctl -w kernel.sched_rt_runtime_us=-1 - // Or (b) set non-zero rt-runtime for your cgroup: - // echo -1 > /sys/fs/cgroup/cpu/[cgroup]/cpu.rt_runtime_us - // (also set the same value for every parent cgroup) - // https://www.kernel.org/doc/Documentation/scheduler/sched-rt-group.txt - Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> EPERM", tid, policy, param.sched_priority); - case ESRCH: - Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> ESRCH", tid, policy, param.sched_priority); - default: - Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> %d", tid, policy, param.sched_priority, ret); - } -#else - Y_UNUSED(tid); - Y_UNUSED(priority); -#endif - } - - void ResetPreemption(TWorkerId fastWorkerId, ui64 ts) { - if (Y_UNLIKELY(!PreemptionTimer)) { - return; - } - if (FastWorker == fastWorkerId && HardPreemptionTs > 0) { - PreemptionTimer->Reset(); - LWPROBE(ResetPreemptionTimer, Config.CpuId, FastWorker, PreemptionTimer->Fd, Ts2Ms(ts), Ts2Ms(HardPreemptionTs)); - HardPreemptionTs = 0; - } - } - - TLease PostponePreemption(TWorkerId fastWorkerId, ui64 ts) { - // Select new timer after hard preemption - if (FastWorker != fastWorkerId) { - FastWorker = fastWorkerId; - PreemptionTimer = IdleQueue.Dequeue(); - HardPreemptionTs = 0; - } - - ui64 hardPreemptionTs = ts + Config.HardLimitTs; - if (hardPreemptionTs > HardPreemptionTs) { - // Reset timer (at most once in TickIntervalTs, sacrifice precision) - HardPreemptionTs = hardPreemptionTs + Config.LimitPrecisionTs; - PreemptionTimer->Set(HardPreemptionTs); - LWPROBE(SetPreemptionTimer, Config.CpuId, FastWorker, PreemptionTimer->Fd, Ts2Ms(ts), Ts2Ms(HardPreemptionTs)); - } - - return TLease(fastWorkerId, hardPreemptionTs); - } - }; - - // Proxy for start and switching TUnitedExecutorPool-s on single cpu via GetReadyActivation() - // (does not implement any other method in IExecutorPool) - class TCpuExecutorPool: public IExecutorPool { - const TString Name; - - public: - explicit TCpuExecutorPool(const TString& name) - : IExecutorPool(MaxPools) - , Name(name) - {} - - TString GetName() const override { - return Name; - } - - void SetRealTimeMode() const override { - // derived classes controls rt-priority - do nothing - } - - // Should never be called - void ReclaimMailbox(TMailboxType::EType, ui32, TWorkerId, ui64) override { Y_ABORT(); } - TMailboxHeader *ResolveMailbox(ui32) override { Y_ABORT(); } - void Schedule(TInstant, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_ABORT(); } - void Schedule(TMonotonic, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_ABORT(); } - void Schedule(TDuration, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_ABORT(); } - bool Send(TAutoPtr<IEventHandle>&) override { Y_ABORT(); } - bool SpecificSend(TAutoPtr<IEventHandle>&) override { Y_ABORT(); } - void ScheduleActivation(ui32) override { Y_ABORT(); } - void SpecificScheduleActivation(ui32) override { Y_ABORT(); } - void ScheduleActivationEx(ui32, ui64) override { Y_ABORT(); } - TActorId Register(IActor*, TMailboxType::EType, ui64, const TActorId&) override { Y_ABORT(); } - TActorId Register(IActor*, TMailboxHeader*, ui32, const TActorId&) override { Y_ABORT(); } - void Prepare(TActorSystem*, NSchedulerQueue::TReader**, ui32*) override { Y_ABORT(); } - void Start() override { Y_ABORT(); } - void PrepareStop() override { Y_ABORT(); } - void Shutdown() override { Y_ABORT(); } - bool Cleanup() override { Y_ABORT(); } - }; - - // Proxy executor pool working with cpu-local scheduler (aka actorsystem 2.0) - class TSharedCpuExecutorPool: public TCpuExecutorPool { - TSharedCpu* Local; - TIntrusivePtr<TAffinity> SingleCpuAffinity; // no migration support yet - public: - explicit TSharedCpuExecutorPool(TSharedCpu* local, const TUnitedWorkersConfig& config) - : TCpuExecutorPool("u-" + ToString(local->Config.CpuId)) - , Local(local) - , SingleCpuAffinity(config.NoAffinity ? nullptr : new TAffinity(TCpuMask(local->Config.CpuId))) - {} - - TAffinity* Affinity() const override { - return SingleCpuAffinity.Get(); - } - - ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override { - return Local->GetReadyActivation(wctx, revolvingCounter); - } - }; - - // Proxy executor pool working with balancer and assigned pools (aka actorsystem 1.5) - class TAssignedCpuExecutorPool: public TCpuExecutorPool { - TAssignedCpu* Local; - TIntrusivePtr<TAffinity> CpuAffinity; - public: - explicit TAssignedCpuExecutorPool(TAssignedCpu* local, const TUnitedWorkersConfig& config) - : TCpuExecutorPool("United") - , Local(local) - , CpuAffinity(config.NoAffinity ? nullptr : new TAffinity(config.Allowed)) - {} - - TAffinity* Affinity() const override { - return CpuAffinity.Get(); - } - - ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override { - return Local->GetReadyActivation(wctx, revolvingCounter); - } - }; - - // Representation of a single cpu and it's state visible to other cpus and pools - struct TUnitedWorkers::TCpu: public TNonCopyable { - struct TScopedWaiters { - TCpu& Cpu; - TPool* AssignedPool; // nullptr if CpuShared - - // Subscribe on wakeups from allowed pools - TScopedWaiters(TCpu& cpu, TPool* assignedPool) : Cpu(cpu), AssignedPool(assignedPool) { - if (!AssignedPool) { - for (TPool* pool : Cpu.AllowedPools) { - AtomicIncrement(pool->Waiters); - } - } else { - AtomicIncrement(AssignedPool->Waiters); - } - } - - // Unsubscribe from pools we've subscribed on - ~TScopedWaiters() { - if (!AssignedPool) { - for (TPool* pool : Cpu.AllowedPools) { - AtomicDecrement(pool->Waiters); - } - } else { - AtomicDecrement(AssignedPool->Waiters); - } - } - }; - - // Current cpu state important for other cpus and balancer - TCpuState State; - - // Thread-safe per pool stats - // NOTE: It's guaranteed that cpu never executes two instance of the same pool - TVector<TExecutorThreadStats> PoolStats; - TCpuLoadLog<1024> LoadLog; - - - // Configuration - TCpuId CpuId; - THolder<TCpuLocalManager> LocalManager; - THolder<TCpuExecutorPool> ExecutorPool; - - // Pools allowed to run on this cpu - TStackVec<TPool*, 15> AllowedPools; - - void Stop() { - if (LocalManager) { - State.Stop(); - LocalManager->Stop(); - } - } - - bool StartSpinning(TUnitedWorkers* united, TPool* assignedPool, TPoolId& result) { - // Mark cpu as idle - if (Y_UNLIKELY(!State.StartSpinning())) { - result = CpuStopped; - return true; - } - - // Avoid using multiple atomic seq_cst loads in cycle, use barrier once and relaxed ops - AtomicBarrier(); - - // Check there is no pending tokens (can be released before Waiters increment) - if (!assignedPool) { - for (TPool* pool : AllowedPools) { - if (pool->TryAcquireTokenRelaxed()) { - result = WakeWithTokenAcquired(united, pool->PoolId); - return true; // token acquired or stop - } - } - } else { - if (assignedPool->TryAcquireTokenRelaxed()) { - result = WakeWithTokenAcquired(united, assignedPool->PoolId); - return true; // token acquired or stop - } - } - - // At this point we can be sure wakeup won't be lost - // So we can actively spin or block w/o checking for pending tokens - return false; - } - - bool ActiveWait(ui64 spinThresholdTs, TPoolId& result) { - ui64 ts = GetCycleCountFast(); - LoadLog.RegisterBusyPeriod(ts); - ui64 deadline = ts + spinThresholdTs; - while (GetCycleCountFast() < deadline) { - for (ui32 i = 0; i < 12; ++i) { - TPoolId current = State.CurrentPool(); - if (current == CpuSpinning) { - SpinLockPause(); - } else { - result = current; - LoadLog.RegisterIdlePeriod(GetCycleCountFast()); - return true; // wakeup - } - } - } - return false; // spin threshold exceeded, no wakeups - } - - bool StartBlocking(TPoolId& result) { - // Switch into blocked state - if (State.StartBlocking()) { - result = State.CurrentPool(); - return true; - } else { - return false; - } - } - - bool BlockedWait(TPoolId& result, ui64 timeoutNs) { - return State.Block(timeoutNs, result); - } - - void SwitchPool(TPoolId pool) { - return State.SwitchPool(pool); - } - - private: - TPoolId WakeWithTokenAcquired(TUnitedWorkers* united, TPoolId token) { - switch (State.WakeWithTokenAcquired(token)) { - case TCpuState::Woken: // we've got token and successfully woken up this cpu - // NOTE: sending thread may also wakeup another worker, which wont be able to acquire token and will go idle (it's ok) - return token; - case TCpuState::NotIdle: { // wakeup event has also occured - TPoolId wakeup = State.CurrentPool(); - if (wakeup != token) { // token and wakeup for different pools - united->TryWake(wakeup); // rewake another cpu to avoid losing wakeup - } - return token; - } - case TCpuState::Forbidden: - Y_ABORT(); - case TCpuState::Stopped: - return CpuStopped; - } - } - }; - - TUnitedWorkers::TUnitedWorkers( - const TUnitedWorkersConfig& config, - const TVector<TUnitedExecutorPoolConfig>& unitedPools, - const TCpuAllocationConfig& allocation, - IBalancer* balancer) - : Balancer(balancer) - , Config(config) - , Allocation(allocation) - { - // Find max pool id and initialize pools - PoolCount = 0; - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - for (const auto& pa : cpuAlloc.AllowedPools) { - PoolCount = Max<size_t>(PoolCount, pa.PoolId + 1); - } - } - Pools.Reset(new TPool[PoolCount]); - - // Find max cpu id and initialize cpus - CpuCount = 0; - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - CpuCount = Max<size_t>(CpuCount, cpuAlloc.CpuId + 1); - } - Cpus.Reset(new TCpu[CpuCount]); - - // Setup allocated cpus - // NOTE: leave gaps for not allocated cpus (default-initialized) - WorkerCount = 0; - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - TCpu& cpu = Cpus[cpuAlloc.CpuId]; - cpu.CpuId = cpuAlloc.CpuId; - cpu.PoolStats.resize(PoolCount); // NOTE: also may have gaps - for (const auto& pa : cpuAlloc.AllowedPools) { - cpu.AllowedPools.emplace_back(&Pools[pa.PoolId]); - } - - // Setup balancing and cpu-local manager - if (!Balancer->AddCpu(cpuAlloc, &cpu.State)) { - cpu.State.SwitchPool(0); // set initial state to non-idle to avoid losing wakeups on start - cpu.State.AssignPool(CpuShared); - TSharedCpu* local = new TSharedCpu(TSharedCpu::TConfig(cpuAlloc, Config), this); - cpu.LocalManager.Reset(local); - cpu.ExecutorPool.Reset(new TSharedCpuExecutorPool(local, Config)); - } else { - TAssignedCpu* local = new TAssignedCpu(this); - cpu.LocalManager.Reset(local); - cpu.ExecutorPool.Reset(new TAssignedCpuExecutorPool(local, Config)); - } - WorkerCount += cpu.LocalManager->WorkerCount(); - } - - // Initialize workers - Workers.Reset(new TWorker[WorkerCount]); - - // Setup pools - // NOTE: leave gaps for not united pools (default-initialized) - for (const TUnitedExecutorPoolConfig& cfg : unitedPools) { - TPool& pool = Pools[cfg.PoolId]; - Y_ABORT_UNLESS(cfg.PoolId < MaxPools); - pool.PoolId = cfg.PoolId; - pool.Concurrency = cfg.Concurrency ? cfg.Concurrency : Config.CpuCount; - pool.ExecutorPool = nullptr; // should be set later using SetupPool() - pool.MailboxTable = nullptr; // should be set later using SetupPool() - pool.TimePerMailboxTs = DurationToCycles(cfg.TimePerMailbox); - pool.EventsPerMailbox = cfg.EventsPerMailbox; - - // Reinitialize per cpu pool stats with right MaxActivityType - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - TCpu& cpu = Cpus[cpuAlloc.CpuId]; - cpu.PoolStats[cfg.PoolId] = TExecutorThreadStats(); - } - - // Setup WakeOrderCpus: left to right exclusive cpus, then left to right shared cpus. - // Waking exclusive cpus first reduce load on shared cpu and improve latency isolation, which is - // the point of using exclusive cpu. But note that number of actively spinning idle cpus may increase, - // so cpu consumption on light load is higher. - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - TCpu& cpu = Cpus[cpuAlloc.CpuId]; - if (cpu.AllowedPools.size() == 1 && cpu.AllowedPools[0] == &pool) { - pool.WakeOrderCpus.emplace_back(&cpu); - } - } - for (const TCpuAllocation& cpuAlloc : allocation.Items) { - TCpu& cpu = Cpus[cpuAlloc.CpuId]; - if (cpu.AllowedPools.size() > 1 && cpuAlloc.HasPool(pool.PoolId)) { - pool.WakeOrderCpus.emplace_back(&cpu); - } - } - } - } - - TUnitedWorkers::~TUnitedWorkers() { - } - - void TUnitedWorkers::Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders) { - // Setup allocated cpus - // NOTE: leave gaps for not allocated cpus (default-initialized) - TWorkerId workers = 0; - for (TCpuId cpuId = 0; cpuId < CpuCount; cpuId++) { - TCpu& cpu = Cpus[cpuId]; - - // Setup cpu-local workers - if (cpu.LocalManager) { - for (i16 i = 0; i < cpu.LocalManager->WorkerCount(); i++) { - TWorkerId workerId = workers++; - cpu.LocalManager->AddWorker(workerId); - - // Setup worker - Y_ABORT_UNLESS(workerId < WorkerCount); - Workers[workerId].Thread.Reset(new TExecutorThread( - workerId, - cpu.CpuId, - actorSystem, - cpu.ExecutorPool.Get(), // use cpu-local manager as proxy executor for all workers on cpu - nullptr, // MailboxTable is pool-specific, will be set on pool switch - cpu.ExecutorPool->GetName())); - // NOTE: TWorker::ThreadId will be initialized after in Start() - - scheduleReaders.push_back(&Workers[workerId].SchedulerQueue.Reader); - } - } - } - } - - void TUnitedWorkers::Start() { - for (TWorkerId workerId = 0; workerId < WorkerCount; workerId++) { - Workers[workerId].Thread->Start(); - } - for (TWorkerId workerId = 0; workerId < WorkerCount; workerId++) { - AtomicStore(&Workers[workerId].ThreadId, Workers[workerId].Thread->GetThreadId()); - } - } - - inline TThreadId TUnitedWorkers::GetWorkerThreadId(TWorkerId workerId) const { - volatile TThreadId* threadId = &Workers[workerId].ThreadId; -#ifdef _linux_ - while (AtomicLoad(threadId) == UnknownThreadId) { - NanoSleep(1000); - } -#endif - return AtomicLoad(threadId); - } - - inline NSchedulerQueue::TWriter* TUnitedWorkers::GetScheduleWriter(TWorkerId workerId) const { - return &Workers[workerId].SchedulerQueue.Writer; - } - - void TUnitedWorkers::SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable) { - Pools[pool].ExecutorPool = executorPool; - Pools[pool].MailboxTable = mailboxTable; - } - - void TUnitedWorkers::PrepareStop() { - AtomicStore(&StopFlag, true); - for (TPoolId pool = 0; pool < PoolCount; pool++) { - Pools[pool].Stop(); - } - for (TCpuId cpuId = 0; cpuId < CpuCount; cpuId++) { - Cpus[cpuId].Stop(); - } - } - - void TUnitedWorkers::Shutdown() { - for (TWorkerId workerId = 0; workerId < WorkerCount; workerId++) { - Workers[workerId].Thread->Join(); - } - } - - inline void TUnitedWorkers::PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter) { - if (Pools[pool].PushActivation(activation, revolvingCounter)) { // token generated - TryWake(pool); - } - } - - inline bool TUnitedWorkers::TryAcquireToken(TPoolId pool) { - return Pools[pool].TryAcquireToken(); - } - - inline void TUnitedWorkers::TryWake(TPoolId pool) { - // Avoid using multiple atomic seq_cst loads in cycle, use barrier once - AtomicBarrier(); - - // Scan every allowed cpu in pool's wakeup order and try to wake the first idle cpu - if (RelaxedLoad(&Pools[pool].Waiters) > 0) { - for (TCpu* cpu : Pools[pool].WakeOrderCpus) { - if (cpu->State.WakeWithoutToken(pool) == TCpuState::Woken) { - return; // successful wake up - } - } - } - - // Cpu has not been woken up - } - - inline void TUnitedWorkers::BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) { - Pools[pool].BeginExecution(activation, revolvingCounter); - } - - inline bool TUnitedWorkers::NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) { - return Pools[pool].NextExecution(activation, revolvingCounter); - } - - inline void TUnitedWorkers::StopExecution(TPoolId pool) { - if (Pools[pool].StopExecution()) { // pending token - TryWake(pool); - } - } - - inline void TUnitedWorkers::Balance() { - ui64 ts = GetCycleCountFast(); - if (Balancer->TryLock(ts)) { - for (TPoolId pool = 0; pool < PoolCount; pool++) { - if (Pools[pool].IsUnited()) { - ui64 ElapsedTs = 0; - ui64 ParkedTs = 0; - TStackVec<TCpuLoadLog<1024>*, 128> logs; - ui64 worstActivationTimeUs = 0; - for (TCpu* cpu : Pools[pool].WakeOrderCpus) { - TExecutorThreadStats& cpuStats = cpu->PoolStats[pool]; - ElapsedTs += cpuStats.ElapsedTicks; - ParkedTs += cpuStats.ParkedTicks; - worstActivationTimeUs = Max(worstActivationTimeUs, cpuStats.WorstActivationTimeUs); - AtomicStore<decltype(cpuStats.WorstActivationTimeUs)>(&cpuStats.WorstActivationTimeUs, 0ul); - logs.push_back(&cpu->LoadLog); - } - ui64 minPeriodTs = Min(ui64(Us2Ts(Balancer->GetPeriodUs())), ui64((1024ull-2ull)*64ull*128ull*1024ull)); - ui64 estimatedTs = MinusOneCpuEstimator.MaxLatencyIncreaseWithOneLessCpu( - &logs[0], logs.size(), ts, minPeriodTs); - TBalancerStats stats; - stats.Ts = ts; - stats.CpuUs = Ts2Us(ElapsedTs); - stats.IdleUs = Ts2Us(ParkedTs); - stats.ExpectedLatencyIncreaseUs = Ts2Us(estimatedTs); - stats.WorstActivationTimeUs = worstActivationTimeUs; - Balancer->SetPoolStats(pool, stats); - } - } - Balancer->Balance(); - Balancer->Unlock(); - } - } - - inline TPoolId TUnitedWorkers::AssignedPool(TWorkerContext& wctx) { - return Cpus[wctx.CpuId].State.AssignedPool(); - } - - inline bool TUnitedWorkers::IsPoolReassigned(TWorkerContext& wctx) { - return Cpus[wctx.CpuId].State.IsPoolReassigned(wctx.PoolId); - } - - inline void TUnitedWorkers::SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs) { - Pools[wctx.PoolId].Switch(wctx, softDeadlineTs, Cpus[wctx.CpuId].PoolStats[wctx.PoolId]); - Cpus[wctx.CpuId].SwitchPool(wctx.PoolId); - } - - TPoolId TUnitedWorkers::Idle(TPoolId assigned, TWorkerContext& wctx) { - wctx.SwitchToIdle(); - - TPoolId result; - TTimeTracker timeTracker; - TCpu& cpu = Cpus[wctx.CpuId]; - TPool* assignedPool = assigned == CpuShared ? nullptr : &Pools[assigned]; - TCpu::TScopedWaiters scopedWaiters(cpu, assignedPool); - while (true) { - if (cpu.StartSpinning(this, assignedPool, result)) { - break; // token already acquired (or stop) - } - result = WaitSequence(cpu, wctx, timeTracker); - if (Y_UNLIKELY(result == CpuStopped) || TryAcquireToken(result)) { - break; // token acquired (or stop) - } - } - - wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed()); - return result; - } - - TPoolId TUnitedWorkers::WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker) { - TPoolId result; - if (cpu.ActiveWait(Us2Ts(Config.SpinThresholdUs), result)) { - wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed()); - return result; - } - if (cpu.StartBlocking(result)) { - wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed()); - return result; - } - wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed()); - cpu.LoadLog.RegisterBusyPeriod(GetCycleCountFast()); - bool wakeup; - do { - wakeup = cpu.BlockedWait(result, Config.Balancer.PeriodUs * 1000); - wctx.AddParkedCycles(timeTracker.Elapsed()); - } while (!wakeup); - cpu.LoadLog.RegisterIdlePeriod(GetCycleCountFast()); - return result; - } - - void TUnitedWorkers::GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const { - size_t idx = 1; - statsCopy.resize(idx + Pools[pool].WakeOrderCpus.size()); - for (TCpu* cpu : Pools[pool].WakeOrderCpus) { - TExecutorThreadStats& s = statsCopy[idx++]; - s = TExecutorThreadStats(); - s.Aggregate(cpu->PoolStats[pool]); - } - } - - TUnitedExecutorPool::TUnitedExecutorPool(const TUnitedExecutorPoolConfig& cfg, TUnitedWorkers* united) - : TExecutorPoolBaseMailboxed(cfg.PoolId) - , United(united) - , PoolName(cfg.PoolName) - { - United->SetupPool(TPoolId(cfg.PoolId), this, MailboxTable.Get()); - } - - void TUnitedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { - ActorSystem = actorSystem; - - // Schedule readers are initialized through TUnitedWorkers::Prepare - *scheduleReaders = nullptr; - *scheduleSz = 0; - } - - void TUnitedExecutorPool::Start() { - // workers are actually started in TUnitedWorkers::Start() - } - - void TUnitedExecutorPool::PrepareStop() { - } - - void TUnitedExecutorPool::Shutdown() { - // workers are actually joined in TUnitedWorkers::Shutdown() - } - - TAffinity* TUnitedExecutorPool::Affinity() const { - Y_ABORT(); // should never be called, TCpuExecutorPool is used instead - } - - ui32 TUnitedExecutorPool::GetThreads() const { - return 0; - } - - ui32 TUnitedExecutorPool::GetReadyActivation(TWorkerContext&, ui64) { - Y_ABORT(); // should never be called, TCpu*ExecutorPool is used instead - } - - inline void TUnitedExecutorPool::ScheduleActivation(ui32 activation) { - TUnitedExecutorPool::ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); - } - - inline void TUnitedExecutorPool::SpecificScheduleActivation(ui32 activation) { - TUnitedExecutorPool::ScheduleActivation(activation); - } - - inline void TUnitedExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { - United->PushActivation(PoolId, activation, revolvingCounter); - } - - void TUnitedExecutorPool::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { - TUnitedExecutorPool::Schedule(deadline - ActorSystem->Timestamp(), ev, cookie, workerId); - } - - void TUnitedExecutorPool::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { - Y_DEBUG_ABORT_UNLESS(workerId < United->GetWorkerCount()); - const auto current = ActorSystem->Monotonic(); - if (deadline < current) { - deadline = current; - } - United->GetScheduleWriter(workerId)->Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - - void TUnitedExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { - Y_DEBUG_ABORT_UNLESS(workerId < United->GetWorkerCount()); - const auto deadline = ActorSystem->Monotonic() + delta; - United->GetScheduleWriter(workerId)->Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - - void TUnitedExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { - Y_UNUSED(poolStats); - if (statsCopy.empty()) { - statsCopy.resize(1); - } - statsCopy[0] = TExecutorThreadStats(); - statsCopy[0].Aggregate(Stats); - United->GetCurrentStats(PoolId, statsCopy); - } -} diff --git a/ydb/library/actors/core/executor_pool_united.h b/ydb/library/actors/core/executor_pool_united.h deleted file mode 100644 index cd38162769..0000000000 --- a/ydb/library/actors/core/executor_pool_united.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once - -#include "actorsystem.h" -#include "balancer.h" -#include "scheduler_queue.h" -#include "executor_pool_base.h" - -#include <ydb/library/actors/util/unordered_cache.h> - -#include <library/cpp/monlib/dynamic_counters/counters.h> -#include <ydb/library/actors/util/cpu_load_log.h> -#include <ydb/library/actors/util/unordered_cache.h> -#include <library/cpp/containers/stack_vector/stack_vec.h> - -#include <util/generic/noncopyable.h> - -namespace NActors { - class TMailboxTable; - - class TUnitedExecutorPool: public TExecutorPoolBaseMailboxed { - TUnitedWorkers* United; - const TString PoolName; - TAtomic ActivationsRevolvingCounter = 0; - public: - TUnitedExecutorPool(const TUnitedExecutorPoolConfig& cfg, TUnitedWorkers* united); - - void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; - void Start() override; - void PrepareStop() override; - void Shutdown() override; - - TAffinity* Affinity() const override; - ui32 GetThreads() const override; - ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; - void ScheduleActivation(ui32 activation) override; - void SpecificScheduleActivation(ui32 activation) override; - void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override; - void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; - void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; - void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; - - void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override; - - TString GetName() const override { - return PoolName; - } - }; -} diff --git a/ydb/library/actors/core/executor_pool_united_ut.cpp b/ydb/library/actors/core/executor_pool_united_ut.cpp deleted file mode 100644 index 16c3a49711..0000000000 --- a/ydb/library/actors/core/executor_pool_united_ut.cpp +++ /dev/null @@ -1,341 +0,0 @@ -#include "actorsystem.h" -#include "executor_pool_basic.h" -#include "hfunc.h" -#include "scheduler_basic.h" - -#include <ydb/library/actors/util/should_continue.h> - -#include <library/cpp/testing/unittest/registar.h> -#include <ydb/library/actors/protos/unittests.pb.h> - -using namespace NActors; - -//////////////////////////////////////////////////////////////////////////////// - -struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg"); -}; - -//////////////////////////////////////////////////////////////////////////////// - -inline ui64 DoTimedWork(ui64 workUs) { - ui64 startUs = ThreadCPUTime(); - ui64 endUs = startUs + workUs; - ui64 nowUs = startUs; - do { - ui64 endTs = GetCycleCountFast() + Us2Ts(endUs - nowUs); - while (GetCycleCountFast() <= endTs) {} - nowUs = ThreadCPUTime(); - } while (nowUs <= endUs); - return nowUs - startUs; -} - -class TTestSenderActor : public IActorCallback { -private: - using EActivityType = IActor::EActivityType ; - using EActorActivity = IActor::EActorActivity; - -private: - TAtomic Counter; - TActorId Receiver; - - std::function<void(void)> Action; - -public: - TTestSenderActor(std::function<void(void)> action = [](){}, - EActivityType activityType = EActorActivity::OTHER) - : IActorCallback(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType) - , Action(action) - {} - - void Start(TActorId receiver, size_t count) { - AtomicSet(Counter, count); - Receiver = receiver; - } - - void Stop() { - while (true) { - if (GetCounter() == 0) { - break; - } - - Sleep(TDuration::MilliSeconds(1)); - } - } - - size_t GetCounter() const { - return AtomicGet(Counter); - } - -private: - STFUNC(Execute) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvMsg, Handle); - } - } - - void Handle(TEvMsg::TPtr &ev) { - Y_UNUSED(ev); - Action(); - TAtomicBase count = AtomicDecrement(Counter); - Y_ABORT_UNLESS(count != Max<TAtomicBase>()); - if (count) { - Send(Receiver, new TEvMsg()); - } - } -}; - -// Single cpu balancer that switches pool on every activation; not thread-safe -struct TRoundRobinBalancer: public IBalancer { - TCpuState* State; - TMap<TPoolId, TPoolId> NextPool; - - bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override { - State = cpu; - TPoolId prev = cpuAlloc.AllowedPools.rbegin()->PoolId; - for (auto& p : cpuAlloc.AllowedPools) { - NextPool[prev] = p.PoolId; - prev = p.PoolId; - } - return true; - } - - bool TryLock(ui64) override { return true; } - void SetPoolStats(TPoolId, const TBalancerStats&) override {} - void Unlock() override {} - - void Balance() override { - TPoolId assigned; - TPoolId current; - State->Load(assigned, current); - State->AssignPool(NextPool[assigned]); - } - - ui64 GetPeriodUs() override { - return 1000; - } -}; - -void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) { - TUnitedExecutorPoolConfig united; - united.PoolId = setup->GetExecutorsCount(); - united.Concurrency = concurrency; - setup->CpuManager.United.emplace_back(std::move(united)); -} - -THolder<TActorSystemSetup> GetActorSystemSetup(ui32 cpuCount) { - auto setup = MakeHolder<NActors::TActorSystemSetup>(); - setup->NodeId = 1; - setup->CpuManager.UnitedWorkers.CpuCount = cpuCount; - setup->CpuManager.UnitedWorkers.NoRealtime = true; // unavailable in test environment - setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); - return setup; -} - -Y_UNIT_TEST_SUITE(UnitedExecutorPool) { - -#ifdef _linux_ - - Y_UNIT_TEST(OnePoolManyCpus) { - const size_t msgCount = 1e4; - auto setup = GetActorSystemSetup(4); - AddUnitedPool(setup); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - auto actor = new TTestSenderActor(); - auto actorId = actorSystem.Register(actor); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); - - while (actor->GetCounter()) { - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); - - Sleep(TDuration::MilliSeconds(1)); - } - - TVector<TExecutorThreadStats> stats; - TExecutorPoolStats poolStats; - actorSystem.GetPoolStats(0, poolStats, stats); - // Sum all per-thread counters into the 0th element - for (ui32 idx = 1; idx < stats.size(); ++idx) { - stats[0].Aggregate(stats[idx]); - } - - UNIT_ASSERT_VALUES_EQUAL(stats[0].SentEvents, msgCount - 1); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero - UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); - UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined - UNIT_ASSERT(stats[0].ElapsedTicks > 0); - //UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools - UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); - UNIT_ASSERT(stats[0].ActivationTimeHistogram.TotalSamples >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); - UNIT_ASSERT_VALUES_EQUAL(stats[0].EventDeliveryTimeHistogram.TotalSamples, msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].EventProcessingCountHistogram.TotalSamples, msgCount); - UNIT_ASSERT(stats[0].EventProcessingTimeHistogram.TotalSamples > 0); - UNIT_ASSERT(stats[0].ElapsedTicksByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()] > 0); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ActorsAliveByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 1); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ScheduledEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 0); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolDestroyedActors, 0); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolAllocatedMailboxes, 4095); // one line - UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount + stats[0].MailboxPushedOutBySoftPreemption >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); - } - - Y_UNIT_TEST(ManyPoolsOneSharedCpu) { - const size_t msgCount = 1e4; - const size_t pools = 4; - auto setup = GetActorSystemSetup(1); - for (size_t pool = 0; pool < pools; pool++) { - AddUnitedPool(setup); - } - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - TVector<TTestSenderActor*> actors; - for (size_t pool = 0; pool < pools; pool++) { - auto actor = new TTestSenderActor(); - auto actorId = actorSystem.Register(actor, TMailboxType::HTSwap, pool); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); - actors.push_back(actor); - } - - while (true) { - size_t left = 0; - for (auto actor : actors) { - left += actor->GetCounter(); - } - if (left == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left); - Sleep(TDuration::MilliSeconds(1)); - } - - for (size_t pool = 0; pool < pools; pool++) { - TVector<TExecutorThreadStats> stats; - TExecutorPoolStats poolStats; - actorSystem.GetPoolStats(pool, poolStats, stats); - // Sum all per-thread counters into the 0th element - for (ui32 idx = 1; idx < stats.size(); ++idx) { - stats[0].Aggregate(stats[idx]); - } - - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); - } - } - - Y_UNIT_TEST(ManyPoolsOneAssignedCpu) { - const size_t msgCount = 1e4; - const size_t pools = 4; - auto setup = GetActorSystemSetup(1); - setup->Balancer.Reset(new TRoundRobinBalancer()); - for (size_t pool = 0; pool < pools; pool++) { - AddUnitedPool(setup); - } - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - TVector<TTestSenderActor*> actors; - for (size_t pool = 0; pool < pools; pool++) { - auto actor = new TTestSenderActor(); - auto actorId = actorSystem.Register(actor, TMailboxType::HTSwap, pool); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); - actors.push_back(actor); - } - - while (true) { - size_t left = 0; - for (auto actor : actors) { - left += actor->GetCounter(); - } - if (left == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left); - Sleep(TDuration::MilliSeconds(1)); - } - - for (size_t pool = 0; pool < pools; pool++) { - TVector<TExecutorThreadStats> stats; - TExecutorPoolStats poolStats; - actorSystem.GetPoolStats(pool, poolStats, stats); - // Sum all per-thread counters into the 0th element - for (ui32 idx = 1; idx < stats.size(); ++idx) { - stats[0].Aggregate(stats[idx]); - } - - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); - } - } - - Y_UNIT_TEST(ManyPoolsOneCpuSlowEvents) { - const size_t msgCount = 3; - const size_t pools = 4; - auto setup = GetActorSystemSetup(1); - for (size_t pool = 0; pool < pools; pool++) { - AddUnitedPool(setup); - } - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - TVector<TTestSenderActor*> actors; - for (size_t pool = 0; pool < pools; pool++) { - auto actor = new TTestSenderActor([]() { - DoTimedWork(100'000); - }); - auto actorId = actorSystem.Register(actor, TMailboxType::HTSwap, pool); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); - actors.push_back(actor); - } - - while (true) { - size_t left = 0; - for (auto actor : actors) { - left += actor->GetCounter(); - } - if (left == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(15), "left " << left); - Sleep(TDuration::MilliSeconds(1)); - } - - for (size_t pool = 0; pool < pools; pool++) { - TVector<TExecutorThreadStats> stats; - TExecutorPoolStats poolStats; - actorSystem.GetPoolStats(pool, poolStats, stats); - // Sum all per-thread counters into the 0th element - for (ui32 idx = 1; idx < stats.size(); ++idx) { - stats[0].Aggregate(stats[idx]); - } - - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, msgCount); // every 100ms event should be preempted - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); - } - } - -#endif - -} diff --git a/ydb/library/actors/core/executor_pool_united_workers.h b/ydb/library/actors/core/executor_pool_united_workers.h deleted file mode 100644 index 7b57e3d8f9..0000000000 --- a/ydb/library/actors/core/executor_pool_united_workers.h +++ /dev/null @@ -1,105 +0,0 @@ -#pragma once - -#include "defs.h" -#include "balancer.h" -#include "scheduler_queue.h" - -#include <ydb/library/actors/actor_type/indexes.h> -#include <ydb/library/actors/util/cpu_load_log.h> -#include <ydb/library/actors/util/datetime.h> -#include <util/generic/noncopyable.h> - -namespace NActors { - class TActorSystem; - class TMailboxTable; - - class TUnitedWorkers: public TNonCopyable { - struct TWorker; - struct TPool; - struct TCpu; - - i16 WorkerCount; - TArrayHolder<TWorker> Workers; // indexed by WorkerId - size_t PoolCount; - TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools - size_t CpuCount; - TArrayHolder<TCpu> Cpus; // indexed by CpuId, so may include not allocated CPUs - - IBalancer* Balancer; // external pool cpu balancer - - TUnitedWorkersConfig Config; - TCpuAllocationConfig Allocation; - - volatile bool StopFlag = false; - TMinusOneCpuEstimator<1024> MinusOneCpuEstimator; - const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex(); - public: - TUnitedWorkers( - const TUnitedWorkersConfig& config, - const TVector<TUnitedExecutorPoolConfig>& unitedPools, - const TCpuAllocationConfig& allocation, - IBalancer* balancer); - ~TUnitedWorkers(); - void Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders); - void Start(); - void PrepareStop(); - void Shutdown(); - - bool IsStopped() const { - return RelaxedLoad(&StopFlag); - } - - TWorkerId GetWorkerCount() const { - return WorkerCount; - } - - // Returns thread id of a worker - TThreadId GetWorkerThreadId(TWorkerId workerId) const; - - // Returns per worker schedule writers - NSchedulerQueue::TWriter* GetScheduleWriter(TWorkerId workerId) const; - - // Sets executor for specified pool - void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable); - - // Add activation of newly scheduled mailbox and wake cpu to execute it if required - void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter); - - // Try acquire pending token. Must be done before execution - bool TryAcquireToken(TPoolId pool); - - // Try to wake idle cpu waiting for tokens on specified pool - void TryWake(TPoolId pool); - - // Get activation from pool; requires pool's token - void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); - - // Stop currently active execution and start new one if token is available - // NOTE: Reuses token if it's not destroyed - bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); - - // Stop active execution - void StopExecution(TPoolId pool); - - // Runs balancer to assign pools to cpus - void Balance(); - - // Returns pool to be executed by worker or `CpuShared` - TPoolId AssignedPool(TWorkerContext& wctx); - - // Checks if balancer has assigned another pool for worker's cpu - bool IsPoolReassigned(TWorkerContext& wctx); - - // Switch worker context into specified pool - void SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs); - - // Wait for tokens from any pool allowed on specified cpu - TPoolId Idle(TPoolId assigned, TWorkerContext& wctx); - - // Fill stats for specified pool - void GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const; - - private: - TPoolId WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker); - }; -} diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index fb1f968888..29d75ef0c0 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -43,7 +43,6 @@ namespace NActors { , ExecutorPool(executorPool) , Ctx(workerId, cpuId) , ThreadName(threadName) - , IsUnitedWorker(true) , TimePerMailbox(timePerMailbox) , EventsPerMailbox(eventsPerMailbox) { @@ -67,7 +66,6 @@ namespace NActors { , AvailableExecutorPools(executorPools) , Ctx(workerId, 0) , ThreadName(threadName) - , IsUnitedWorker(false) , TimePerMailbox(timePerMailbox) , EventsPerMailbox(eventsPerMailbox) , SoftProcessingDurationTs(softProcessingDurationTs) @@ -79,7 +77,7 @@ namespace NActors { { } void TExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) { - Y_DEBUG_ABORT_UNLESS(IsUnitedWorker || actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox); + Y_DEBUG_ABORT_UNLESS(actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox); IActor* actor = mailbox->DetachActor(actorId.LocalId()); Ctx.DecrementActorsAliveByActivity(actor->GetActivityType()); DyingActors.push_back(THolder(actor)); diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h index 83e323b4a8..79d4001b21 100644 --- a/ydb/library/actors/core/executor_thread.h +++ b/ydb/library/actors/core/executor_thread.h @@ -99,7 +99,6 @@ namespace NActors { ui64 RevolvingWriteCounter = 0; const TString ThreadName; volatile TThreadId ThreadId = UnknownThreadId; - bool IsUnitedWorker = false; TDuration TimePerMailbox; ui32 EventsPerMailbox; diff --git a/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt b/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt index 72ca15e72e..98e9cdcae1 100644 --- a/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt @@ -33,11 +33,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp diff --git a/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt index 784612191d..d20fbe3e95 100644 --- a/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt @@ -34,11 +34,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp diff --git a/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt b/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt index 260aa2596f..e47e5555b1 100644 --- a/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt @@ -37,11 +37,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp diff --git a/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt b/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt index 4adb4cbe7e..4610ee7de3 100644 --- a/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt @@ -38,11 +38,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp diff --git a/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt b/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt index 56c30fa8ef..c3834bf41d 100644 --- a/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt @@ -27,11 +27,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp diff --git a/ydb/library/actors/core/ut/ya.make b/ydb/library/actors/core/ut/ya.make index 1af7984cce..e1a225e528 100644 --- a/ydb/library/actors/core/ut/ya.make +++ b/ydb/library/actors/core/ut/ya.make @@ -30,11 +30,9 @@ SRCS( actorsystem_ut.cpp performance_ut.cpp ask_ut.cpp - balancer_ut.cpp event_pb_payload_ut.cpp event_pb_ut.cpp executor_pool_basic_ut.cpp - executor_pool_united_ut.cpp log_ut.cpp mon_ut.cpp scheduler_actor_ut.cpp diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make index 9f6a4b7b2b..f1d9abc3f8 100644 --- a/ydb/library/actors/core/ya.make +++ b/ydb/library/actors/core/ya.make @@ -27,8 +27,6 @@ SRCS( ask.cpp ask.h av_bootstrapped.cpp - balancer.h - balancer.cpp buffer.cpp buffer.h callstack.cpp @@ -53,8 +51,6 @@ SRCS( executor_pool_basic.h executor_pool_io.cpp executor_pool_io.h - executor_pool_united.cpp - executor_pool_united.h executor_thread.cpp executor_thread.h harmonizer.cpp |