aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-12-07 22:14:35 +0300
committerkruall <kruall@ydb.tech>2023-12-07 22:36:46 +0300
commitab0991343189494d8e93e3eff2af16fc3a3ce561 (patch)
tree189320e798cd8dbbce7ba195e1c625d3b52e7e0d
parentb94e74ac740f885d9ad76a2f845aeb5f6895bb6d (diff)
downloadydb-ab0991343189494d8e93e3eff2af16fc3a3ce561.tar.gz
Remove UnitedPool, KIKIMR-18440
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp72
-rw-r--r--ydb/core/protos/config.proto27
-rw-r--r--ydb/library/actors/core/CMakeLists.darwin-arm64.txt2
-rw-r--r--ydb/library/actors/core/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/actors/core/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/actors/core/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/actors/core/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/actors/core/actor_benchmark_helper.h49
-rw-r--r--ydb/library/actors/core/actor_ut.cpp88
-rw-r--r--ydb/library/actors/core/actorsystem.h3
-rw-r--r--ydb/library/actors/core/balancer.cpp311
-rw-r--r--ydb/library/actors/core/balancer.h30
-rw-r--r--ydb/library/actors/core/balancer_ut.cpp225
-rw-r--r--ydb/library/actors/core/config.h103
-rw-r--r--ydb/library/actors/core/cpu_manager.cpp34
-rw-r--r--ydb/library/actors/core/cpu_manager.h4
-rw-r--r--ydb/library/actors/core/executor_pool_united.cpp1455
-rw-r--r--ydb/library/actors/core/executor_pool_united.h48
-rw-r--r--ydb/library/actors/core/executor_pool_united_ut.cpp341
-rw-r--r--ydb/library/actors/core/executor_pool_united_workers.h105
-rw-r--r--ydb/library/actors/core/executor_thread.cpp4
-rw-r--r--ydb/library/actors/core/executor_thread.h1
-rw-r--r--ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt2
-rw-r--r--ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/actors/core/ut/ya.make2
-rw-r--r--ydb/library/actors/core/ya.make4
29 files changed, 14 insertions, 2912 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index c09b59d292..ab88bc8bf0 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -190,7 +190,6 @@
#include <ydb/library/actors/core/events.h>
#include <ydb/library/actors/core/executor_pool_basic.h>
#include <ydb/library/actors/core/executor_pool_io.h>
-#include <ydb/library/actors/core/executor_pool_united.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/actors/core/log_settings.h>
#include <ydb/library/actors/core/mon.h>
@@ -286,7 +285,6 @@ void AddExecutorPool(
const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig,
const NKikimrConfig::TActorSystemConfig& systemConfig,
ui32 poolId,
- ui32& unitedThreads,
const NKikimr::TAppData* appData)
{
const auto counters = GetServiceCounters(appData->Counters, "utils");
@@ -335,87 +333,19 @@ void AddExecutorPool(
cpuManager.IO.emplace_back(std::move(io));
break;
}
- case NKikimrConfig::TActorSystemConfig::TExecutor::UNITED: {
- TUnitedExecutorPoolConfig united;
- united.PoolId = poolId;
- united.PoolName = poolConfig.GetName();
- united.Concurrency = poolConfig.GetConcurrency();
- united.Weight = (NActors::TPoolWeight)poolConfig.GetWeight();
- united.Allowed = ParseAffinity(poolConfig.GetAffinity());
- if (poolConfig.HasTimePerMailboxMicroSecs()) {
- united.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs());
- } else if (systemConfig.HasTimePerMailboxMicroSecs()) {
- united.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs());
- }
- if (poolConfig.HasEventsPerMailbox()) {
- united.EventsPerMailbox = poolConfig.GetEventsPerMailbox();
- } else if (systemConfig.HasEventsPerMailbox()) {
- united.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
- }
- Y_ABORT_UNLESS(united.EventsPerMailbox != 0);
- united.Balancing.Cpus = poolConfig.GetThreads();
- united.Balancing.MinCpus = poolConfig.GetMinThreads();
- united.Balancing.MaxCpus = poolConfig.GetMaxThreads();
- united.Balancing.Priority = poolConfig.GetBalancingPriority();
- united.Balancing.ToleratedLatencyUs = poolConfig.GetToleratedLatencyUs();
- unitedThreads += united.Balancing.Cpus;
- cpuManager.United.emplace_back(std::move(united));
- break;
- }
default:
Y_ABORT();
}
}
-static TUnitedWorkersConfig CreateUnitedWorkersConfig(const NKikimrConfig::TActorSystemConfig::TUnitedWorkers& config, ui32 unitedThreads) {
- TUnitedWorkersConfig result;
- result.CpuCount = unitedThreads;
- if (config.HasCpuCount()) {
- result.CpuCount = config.GetCpuCount();
- }
- if (config.HasSpinThresholdUs()) {
- result.SpinThresholdUs = config.GetSpinThresholdUs();
- }
- if (config.HasPoolLimitUs()) {
- result.PoolLimitUs = config.GetPoolLimitUs();
- }
- if (config.HasEventLimitUs()) {
- result.EventLimitUs = config.GetEventLimitUs();
- }
- if (config.HasLimitPrecisionUs()) {
- result.LimitPrecisionUs = config.GetLimitPrecisionUs();
- }
- if (config.HasFastWorkerPriority()) {
- result.FastWorkerPriority = config.GetFastWorkerPriority();
- }
- if (config.HasIdleWorkerPriority()) {
- result.IdleWorkerPriority = config.GetIdleWorkerPriority();
- }
- if (config.HasAffinity()) {
- result.Allowed = ParseAffinity(config.GetAffinity());
- }
- if (config.HasNoRealtime()) {
- result.NoRealtime = config.GetNoRealtime();
- }
- if (config.HasNoAffinity()) {
- result.NoAffinity = config.GetNoAffinity();
- }
- if (config.HasBalancerPeriodUs()) {
- result.Balancer.PeriodUs = config.GetBalancerPeriodUs();
- }
- return result;
-}
-
static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config,
const NKikimr::TAppData* appData)
{
TCpuManagerConfig cpuManager;
- ui32 unitedThreads = 0;
cpuManager.PingInfoByPool.resize(config.GetExecutor().size());
for (int poolId = 0; poolId < config.GetExecutor().size(); poolId++) {
- AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, unitedThreads, appData);
+ AddExecutorPool(cpuManager, config.GetExecutor(poolId), config, poolId, appData);
}
- cpuManager.UnitedWorkers = CreateUnitedWorkersConfig(config.GetUnitedWorkers(), unitedThreads);
return cpuManager;
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8cc3ff2f04..f9495d7150 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -61,7 +61,6 @@ message TActorSystemConfig {
enum EType {
BASIC = 1;
IO = 2;
- UNITED = 3;
};
optional EType Type = 1;
@@ -74,17 +73,9 @@ message TActorSystemConfig {
optional uint32 EventsPerMailbox = 8;
optional uint32 RealtimePriority = 9;
- // Actorsystem 2.0: cpu sharing by different pools with preemption
- optional uint32 Concurrency = 10; // Limits simultaneously running mailboxes of UNITED pool
- optional uint32 Weight = 11; // Weight of UNITED pool in cpu-local scheduler (default value is NActors::DefPoolWeight)
-
- // Actorsystem 1.5: cpu balancing between pools
+ // Actorsystem 1.4
optional uint32 MinThreads = 12; // Lower balancing bound, should be at least 1, and not greater than `Threads`
optional uint32 MaxThreads = 13; // Higher balancing bound, should be not lower than `Threads`
- optional uint32 BalancingPriority = 14; // Priority of pool to obtain cpu due to balancing (higher is better)
- optional uint64 ToleratedLatencyUs = 15; // p100-latency threshold indicating that more cpus are required by pool
-
- // Actorsystem 1.4
optional int32 Priority = 16;
optional int32 MaxAvgPingDeviation = 17;
}
@@ -115,22 +106,6 @@ message TActorSystemConfig {
optional uint32 EventsPerMailbox = 9;
optional uint32 SelfPingInterval = 10; // in microseconds
- message TUnitedWorkers {
- optional uint32 CpuCount = 1; // Total CPUs running united workers (TExecutor.Threads analog), should be set to zero to use actorsystem 1.5, and >0 for actorsystem 2.0
- optional uint64 SpinThresholdUs = 2; // Limit for active spinning in case all pools became idle
- optional uint64 PoolLimitUs = 3; // Soft limit on pool execution
- optional uint64 EventLimitUs = 4; // Hard limit on last event execution exceeding pool limit
- optional uint64 LimitPrecisionUs = 5; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch)
- optional uint64 FastWorkerPriority = 6; // Real-time priority of workers not exceeding hard limits
- optional uint64 IdleWorkerPriority = 7; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority)
- optional TAffinity Affinity = 8; // Cpu set for workers (every worker has affinity for exactly one cpu)
- optional bool NoRealtime = 9; // Do not use RT-priority for worker threads
- optional bool NoAffinity = 10; // Do not use affinity for worker threads
- optional uint64 BalancerPeriodUs = 11; // Time between balancer steps (see default in NActors::TBalancerConfig)
- }
-
- optional TUnitedWorkers UnitedWorkers = 11;
-
optional bool UseAutoConfig = 12;
// Used only with UseAutoConfig;
diff --git a/ydb/library/actors/core/CMakeLists.darwin-arm64.txt b/ydb/library/actors/core/CMakeLists.darwin-arm64.txt
index d606663430..f85da8cdc6 100644
--- a/ydb/library/actors/core/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/actors/core/CMakeLists.darwin-arm64.txt
@@ -55,7 +55,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp
@@ -66,7 +65,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp
diff --git a/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt b/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt
index d606663430..f85da8cdc6 100644
--- a/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/actors/core/CMakeLists.darwin-x86_64.txt
@@ -55,7 +55,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp
@@ -66,7 +65,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp
diff --git a/ydb/library/actors/core/CMakeLists.linux-aarch64.txt b/ydb/library/actors/core/CMakeLists.linux-aarch64.txt
index 0c23452507..cf33f59969 100644
--- a/ydb/library/actors/core/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/actors/core/CMakeLists.linux-aarch64.txt
@@ -56,7 +56,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp
@@ -67,7 +66,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp
diff --git a/ydb/library/actors/core/CMakeLists.linux-x86_64.txt b/ydb/library/actors/core/CMakeLists.linux-x86_64.txt
index 0c23452507..cf33f59969 100644
--- a/ydb/library/actors/core/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/actors/core/CMakeLists.linux-x86_64.txt
@@ -56,7 +56,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp
@@ -67,7 +66,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp
diff --git a/ydb/library/actors/core/CMakeLists.windows-x86_64.txt b/ydb/library/actors/core/CMakeLists.windows-x86_64.txt
index d606663430..f85da8cdc6 100644
--- a/ydb/library/actors/core/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/actors/core/CMakeLists.windows-x86_64.txt
@@ -55,7 +55,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/av_bootstrapped.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/callstack.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/cpu_manager.cpp
@@ -66,7 +65,6 @@ target_sources(library-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_base.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_io.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_thread.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/interconnect.cpp
diff --git a/ydb/library/actors/core/actor_benchmark_helper.h b/ydb/library/actors/core/actor_benchmark_helper.h
index 9307a117a8..074295840e 100644
--- a/ydb/library/actors/core/actor_benchmark_helper.h
+++ b/ydb/library/actors/core/actor_benchmark_helper.h
@@ -273,63 +273,30 @@ struct TActorBenchmark {
setup->CpuManager.Basic.emplace_back(std::move(basic));
}
- static void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency, bool activateEveryEvent) {
- TUnitedExecutorPoolConfig united;
- united.PoolId = setup->GetExecutorsCount();
- united.PoolName = TStringBuilder() << "u" << united.PoolId;
- united.Concurrency = concurrency;
- united.TimePerMailbox = TDuration::Hours(1);
- if (activateEveryEvent) {
- united.EventsPerMailbox = 1;
- } else {
- united.EventsPerMailbox = ::Max<ui32>();
- }
- setup->CpuManager.United.emplace_back(std::move(united));
- }
-
- static THolder<TActorSystemSetup> GetActorSystemSetup(ui32 unitedCpuCount, bool preemption) {
+ static THolder<TActorSystemSetup> GetActorSystemSetup() {
auto setup = MakeHolder<NActors::TActorSystemSetup>();
setup->NodeId = 1;
- setup->CpuManager.UnitedWorkers.CpuCount = unitedCpuCount;
- setup->CpuManager.UnitedWorkers.SpinThresholdUs = TSettings::DefaultSpinThreshold;
- setup->CpuManager.UnitedWorkers.NoRealtime = TSettings::DefaultNoRealtime;
- if (preemption) {
- setup->CpuManager.UnitedWorkers.PoolLimitUs = 500;
- setup->CpuManager.UnitedWorkers.EventLimitUs = 100;
- setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 100;
- } else {
- setup->CpuManager.UnitedWorkers.PoolLimitUs = 100'000'000'000;
- setup->CpuManager.UnitedWorkers.EventLimitUs = 10'000'000'000;
- setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 10'000'000'000;
- }
setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
return setup;
}
enum class EPoolType {
Basic,
- United
};
- static THolder<TActorSystemSetup> InitActorSystemSetup(EPoolType poolType, ui32 poolsCount, ui32 threads, bool activateEveryEvent, bool preemption) {
+ static THolder<TActorSystemSetup> InitActorSystemSetup(EPoolType poolType, ui32 poolsCount, ui32 threads, bool activateEveryEvent) {
if (poolType == EPoolType::Basic) {
- THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false);
+ THolder<TActorSystemSetup> setup = GetActorSystemSetup();
for (ui32 i = 0; i < poolsCount; ++i) {
AddBasicPool(setup, threads, activateEveryEvent, 0);
}
return setup;
- } else if (poolType == EPoolType::United) {
- THolder<TActorSystemSetup> setup = GetActorSystemSetup(poolsCount * threads, preemption);
- for (ui32 i = 0; i < poolsCount; ++i) {
- AddUnitedPool(setup, threads, activateEveryEvent);
- }
- return setup;
}
Y_ABORT();
}
static double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType, ESendingType sendingType) {
- THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false);
+ THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -359,7 +326,7 @@ struct TActorBenchmark {
}
static double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType, ESendingType sendingType) {
- THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true, false);
+ THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -402,7 +369,7 @@ struct TActorBenchmark {
}
static double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType, ESendingType sendingType) {
- THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false);
+ THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -454,7 +421,7 @@ struct TActorBenchmark {
};
static auto BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType, ESendingType sendingType, TDuration testDuration = TDuration::Zero(), ui32 inFlight = 1) {
- THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, false, false);
+ THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -529,7 +496,7 @@ struct TActorBenchmark {
}
static auto BenchStarContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType, ESendingType sendingType, TDuration testDuration = TDuration::Zero(), ui32 starMultiply=10) {
- THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true, false);
+ THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true);
TActorSystem actorSystem(setup);
actorSystem.Start();
diff --git a/ydb/library/actors/core/actor_ut.cpp b/ydb/library/actors/core/actor_ut.cpp
index fe2fd98059..78aaad410c 100644
--- a/ydb/library/actors/core/actor_ut.cpp
+++ b/ydb/library/actors/core/actor_ut.cpp
@@ -25,7 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;
Y_UNIT_TEST(WithSharedExecutors) {
- THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(0, false);
+ THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
TActorBenchmark::AddBasicPool(setup, 2, 1, 1);
@@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
Y_UNIT_TEST(WithoutSharedExecutors) {
- THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(0, false);
+ THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
@@ -169,23 +169,6 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
}
- Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) {
- for (const auto& mType : TSettings::MailboxTypes) {
- auto stats = TActorBenchmark::CountStats([mType] {
- return TActorBenchmark::BenchSendReceive(true, mType, TActorBenchmark::EPoolType::United, ESendingType::Common);
- });
- Cerr << stats.ToString() << " " << mType << Endl;
- stats = TActorBenchmark::CountStats([mType] {
- return TActorBenchmark::BenchSendReceive(true, mType, TActorBenchmark::EPoolType::United, ESendingType::Lazy);
- });
- Cerr << stats.ToString() << " " << mType << " Lazy" << Endl;
- stats = TActorBenchmark::CountStats([mType] {
- return TActorBenchmark::BenchSendReceive(true, mType, TActorBenchmark::EPoolType::United, ESendingType::Tail);
- });
- Cerr << stats.ToString() << " " << mType << " Tail" << Endl;
- }
- }
-
Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) {
for (const auto& mType : TSettings::MailboxTypes) {
auto stats = TActorBenchmark::CountStats([mType] {
@@ -203,87 +186,38 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
}
- Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) {
- for (const auto& mType : TSettings::MailboxTypes) {
- auto stats = TActorBenchmark::CountStats([mType] {
- return TActorBenchmark::BenchSendReceive(false, mType, TActorBenchmark::EPoolType::United, ESendingType::Common);
- });
- Cerr << stats.ToString() << " " << mType << Endl;
- stats = TActorBenchmark::CountStats([mType] {
- return TActorBenchmark::BenchSendReceive(false, mType, TActorBenchmark::EPoolType::United, ESendingType::Lazy);
- });
- Cerr << stats.ToString() << " " << mType << " Lazy" << Endl;
- stats = TActorBenchmark::CountStats([mType] {
- return TActorBenchmark::BenchSendReceive(false, mType, TActorBenchmark::EPoolType::United, ESendingType::Tail);
- });
- Cerr << stats.ToString() << " " << mType << " Tail" << Endl;
- }
- }
-
Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) {
TActorBenchmark::RunBenchSendActivateReceive(1, 1, true, TActorBenchmark::EPoolType::Basic);
}
- Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAllocUnited) {
- TActorBenchmark::RunBenchSendActivateReceive(1, 1, true, TActorBenchmark::EPoolType::United);
- }
-
Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAlloc) {
TActorBenchmark::RunBenchSendActivateReceive(1, 1, false, TActorBenchmark::EPoolType::Basic);
}
- Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAllocUnited) {
- TActorBenchmark::RunBenchSendActivateReceive(1, 1, false, TActorBenchmark::EPoolType::United);
- }
-
Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAlloc) {
TActorBenchmark::RunBenchSendActivateReceive(1, 2, true, TActorBenchmark::EPoolType::Basic);
}
- Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAllocUnited) {
- TActorBenchmark::RunBenchSendActivateReceive(1, 2, true, TActorBenchmark::EPoolType::United);
- }
-
Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAlloc) {
TActorBenchmark::RunBenchSendActivateReceive(1, 2, false, TActorBenchmark::EPoolType::Basic);
}
- Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAllocUnited) {
- TActorBenchmark::RunBenchSendActivateReceive(1, 2, false, TActorBenchmark::EPoolType::United);
- }
-
Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAlloc) {
TActorBenchmark::RunBenchSendActivateReceive(2, 1, true, TActorBenchmark::EPoolType::Basic);
}
- Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAllocUnited) {
- TActorBenchmark::RunBenchSendActivateReceive(2, 1, true, TActorBenchmark::EPoolType::United);
- }
-
Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAlloc) {
TActorBenchmark::RunBenchSendActivateReceive(2, 1, false, TActorBenchmark::EPoolType::Basic);
}
- Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAllocUnited) {
- TActorBenchmark::RunBenchSendActivateReceive(2, 1, false, TActorBenchmark::EPoolType::United);
- }
-
Y_UNIT_TEST(SendActivateReceive1Pool1Threads) { TActorBenchmark::RunBenchContentedThreads(1, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool1ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(1, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool2Threads) { TActorBenchmark::RunBenchContentedThreads(2, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(2, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool3Threads) { TActorBenchmark::RunBenchContentedThreads(3, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool3ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(3, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool4Threads) { TActorBenchmark::RunBenchContentedThreads(4, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool4ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(4, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool5Threads) { TActorBenchmark::RunBenchContentedThreads(5, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool5ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(5, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool6Threads) { TActorBenchmark::RunBenchContentedThreads(6, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool6ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(6, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool7Threads) { TActorBenchmark::RunBenchContentedThreads(7, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool7ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(7, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceive1Pool8Threads) { TActorBenchmark::RunBenchContentedThreads(8, TActorBenchmark::EPoolType::Basic); }
- Y_UNIT_TEST(SendActivateReceive1Pool8ThreadsUnited) { TActorBenchmark::RunBenchContentedThreads(8, TActorBenchmark::EPoolType::United); }
Y_UNIT_TEST(SendActivateReceiveCSV) {
std::vector<ui32> threadsList;
@@ -314,24 +248,6 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl;
}
}
-
- Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighboursUnited) {
- TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
- for (const auto& neighbour : NeighbourActors) {
- auto stats = TActorBenchmark::CountStats([neighbour] {
- return TActorBenchmark::BenchSendActivateReceiveWithMailboxNeighbours(neighbour, TActorBenchmark::EPoolType::United, ESendingType::Common);
- });
- Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
- stats = TActorBenchmark::CountStats([neighbour] {
- return TActorBenchmark::BenchSendActivateReceiveWithMailboxNeighbours(neighbour, TActorBenchmark::EPoolType::United, ESendingType::Lazy);
- });
- Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Lazy" << Endl;
- stats = TActorBenchmark::CountStats([neighbour] {
- return TActorBenchmark::BenchSendActivateReceiveWithMailboxNeighbours(neighbour, TActorBenchmark::EPoolType::United, ESendingType::Tail);
- });
- Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl;
- }
- }
}
Y_UNIT_TEST_SUITE(TestDecorator) {
diff --git a/ydb/library/actors/core/actorsystem.h b/ydb/library/actors/core/actorsystem.h
index e8144d8fae..326c4a07ee 100644
--- a/ydb/library/actors/core/actorsystem.h
+++ b/ydb/library/actors/core/actorsystem.h
@@ -2,7 +2,6 @@
#include "defs.h"
-#include "balancer.h"
#include "config.h"
#include "event.h"
#include "executor_pool.h"
@@ -93,8 +92,6 @@ namespace NActors {
ui32 ExecutorsCount = 0;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
- TAutoPtr<IBalancer> Balancer; // main implementation will be implicitly created if not set
-
TCpuManagerConfig CpuManager;
TAutoPtr<ISchedulerThread> Scheduler;
diff --git a/ydb/library/actors/core/balancer.cpp b/ydb/library/actors/core/balancer.cpp
deleted file mode 100644
index 517d376d1d..0000000000
--- a/ydb/library/actors/core/balancer.cpp
+++ /dev/null
@@ -1,311 +0,0 @@
-#include "balancer.h"
-
-#include "probes.h"
-
-#include <ydb/library/actors/util/cpu_load_log.h>
-#include <ydb/library/actors/util/datetime.h>
-#include <ydb/library/actors/util/intrinsics.h>
-
-#include <util/system/spinlock.h>
-
-#include <algorithm>
-
-namespace NActors {
- LWTRACE_USING(ACTORLIB_PROVIDER);
-
- // Describes balancing-related state of pool, the most notable is `Importance` to add new cpu
- struct TLevel {
- // Balancer will try to give more cpu to overloaded pools
- enum ELoadClass {
- Underloaded = 0,
- Moderate = 1,
- Overloaded = 2,
- };
-
- double ScaleFactor;
- ELoadClass LoadClass;
- ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden
-
- TLevel() {}
-
- TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) {
- ScaleFactor = double(currentCpus) / cfg.Cpus;
- if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu
- LoadClass = Underloaded;
- } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency
- LoadClass = Overloaded;
- } else {
- LoadClass = Moderate;
- }
- Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId);
- }
-
- private:
- // Importance is simple ui64 value (from highest to lowest):
- // 2 Bits: LoadClass
- // 8 Bits: Priority
- // 10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus)
- // 10 Bits: -CpuIdle
- // 6 Bits: PoolId
- static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) {
- ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023);
- ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023);
-
- Y_ABORT_UNLESS(ui64(load) < (1ull << 2ull));
- Y_ABORT_UNLESS(ui64(priority) < (1ull << 8ull));
- Y_ABORT_UNLESS(ui64(scale) < (1ull << 10ull));
- Y_ABORT_UNLESS(ui64(idle) < (1ull << 10ull));
- Y_ABORT_UNLESS(ui64(poolId) < (1ull << 6ull));
-
- static_assert(ui64(MaxPools) <= (1ull << 6ull));
-
- ui64 importance =
- (ui64(load) << ui64(6 + 10 + 10 + 8)) |
- (ui64(priority) << ui64(6 + 10 + 10)) |
- (ui64(scale) << ui64(6 + 10)) |
- (ui64(idle) << ui64(6)) |
- ui64(poolId);
- return importance;
- }
- };
-
- // Main balancer implemenation
- class TBalancer: public IBalancer {
- private:
- struct TCpu;
- struct TPool;
-
- bool Disabled = true;
- TSpinLock Lock;
- ui64 NextBalanceTs;
- TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps
- TVector<TPool> Pools; // Indexed by PoolId, can have gaps
- TBalancerConfig Config;
-
- public:
-
- ui64 GetPeriodUs() override;
- // Setup
- TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
- bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
- ~TBalancer();
-
- // Balancing
- bool TryLock(ui64 ts) override;
- void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override;
- void Balance() override;
- void Unlock() override;
-
- private:
- void MoveCpu(TPool& from, TPool& to);
- };
-
- struct TBalancer::TPool {
- TBalancingConfig Config;
- TPoolId PoolId;
- TString PoolName;
-
- // Input data for balancing
- TBalancerStats Prev;
- TBalancerStats Next;
-
- // Derived stats
- double CpuLoad;
- double CpuIdle;
-
- // Classification
- // NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level,
- // NOTE: but expected levels after movements also
- TLevel CurLevel; // Level with current amount of cpu
- TLevel AddLevel; // Level after one cpu acception
- TLevel SubLevel; // Level after one cpu donation
-
- // Balancing state
- ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced)
- ui64 PrevCpus = 0; // Cpus in last period
-
- explicit TPool(const TBalancingConfig& cfg = {})
- : Config(cfg)
- {}
-
- void Configure(const TBalancingConfig& cfg, const TString& poolName) {
- Config = cfg;
- // Enforce constraints
- if (Config.Cpus > 0) {
- Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus);
- Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus);
- } else {
- Y_ABORT_UNLESS(Config.Cpus == 0,
- "Unexpected negative Config.Cpus# %" PRIi64,
- (i64)Config.Cpus);
- Config.MinCpus = 0;
- Config.MaxCpus = 0;
- }
- PoolName = poolName;
- }
- };
-
- struct TBalancer::TCpu {
- TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap)
- TCpuAllocation Alloc;
- TPoolId Current;
- TPoolId Assigned;
- };
-
- TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts)
- : NextBalanceTs(ts)
- , Config(config)
- {
- for (TPoolId pool = 0; pool < MaxPools; pool++) {
- Pools.emplace_back();
- Pools.back().PoolId = pool;
- }
- for (const TUnitedExecutorPoolConfig& united : unitedPools) {
- Pools[united.PoolId].Configure(united.Balancing, united.PoolName);
- }
- }
-
- TBalancer::~TBalancer() {
- }
-
- bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) {
- // Setup
- TCpuId cpuId = cpuAlloc.CpuId;
- if (Cpus.size() <= cpuId) {
- Cpus.resize(cpuId + 1);
- }
- TCpu& cpu = Cpus[cpuId];
- cpu.State = state;
- cpu.Alloc = cpuAlloc;
-
- // Fill every pool with cpus up to TBalancingConfig::Cpus
- TPoolId pool = 0;
- for (TPool& p : Pools) {
- if (p.CurrentCpus < p.Config.Cpus) {
- p.CurrentCpus++;
- break;
- }
- pool++;
- }
- if (pool != MaxPools) { // cpu under balancer control
- state->SwitchPool(pool);
- state->AssignPool(pool);
- Disabled = false;
- return true;
- }
- return false; // non-balanced cpu
- }
-
- bool TBalancer::TryLock(ui64 ts) {
- if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) {
- NextBalanceTs = ts + Us2Ts(Config.PeriodUs);
- return true;
- }
- return false;
- }
-
- void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) {
- Y_ABORT_UNLESS(pool < MaxPools);
- TPool& p = Pools[pool];
- p.Prev = p.Next;
- p.Next = stats;
- }
-
- void TBalancer::Balance() {
- // Update every cpu state
- for (TCpu& cpu : Cpus) {
- if (cpu.State) {
- cpu.State->Load(cpu.Assigned, cpu.Current);
- if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) {
- return; // previous movement has not been applied yet, wait
- }
- }
- }
-
- // Process stats, classify and compute pool importance
- TStackVec<TPool*, MaxPools> order;
- for (TPool& pool : Pools) {
- if (pool.Config.Cpus == 0) {
- continue; // skip gaps (non-existent or non-united pools)
- }
- if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) {
- return; // invalid stats
- }
-
- // Compute derived stats
- pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
- if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) {
- pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests
- } else {
- pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
- }
-
- // Compute levels
- pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle,
- pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
- pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle,
- 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized
- pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1,
- pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
-
- // Prepare for balancing
- pool.PrevCpus = pool.CurrentCpus;
- order.push_back(&pool);
- }
-
- // Sort pools by importance
- std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; });
- for (TPool* pool : order) {
- LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance);
- }
-
- // Move cpus from lower importance to higher importance pools
- for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) {
- TPool& to = **toIter;
- if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded
- to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated
- {
- for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) {
- TPool& from = **fromIter;
- if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
- from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
- from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
- {
- MoveCpu(from, to);
- from.CurrentCpus--;
- to.CurrentCpus++;
- break;
- }
- }
- }
- }
- }
-
- void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) {
- for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) {
- TCpu& cpu = *ci;
- if (!cpu.State) {
- continue;
- }
- if (cpu.Assigned == from.PoolId) {
- cpu.State->AssignPool(to.PoolId);
- cpu.Assigned = to.PoolId;
- LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId);
- return;
- }
- }
- Y_ABORT();
- }
-
- void TBalancer::Unlock() {
- Lock.Release();
- }
-
- ui64 TBalancer::GetPeriodUs() {
- return Config.PeriodUs;
- }
-
- IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
- return new TBalancer(config, unitedPools, ts);
- }
-}
diff --git a/ydb/library/actors/core/balancer.h b/ydb/library/actors/core/balancer.h
deleted file mode 100644
index e1f6f33bf3..0000000000
--- a/ydb/library/actors/core/balancer.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include "config.h"
-#include "cpu_state.h"
-
-namespace NActors {
- // Per-pool statistics used by balancer
- struct TBalancerStats {
- ui64 Ts = 0; // Measurement timestamp
- ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start
- ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex
- ui64 WorstActivationTimeUs = 0;
- ui64 ExpectedLatencyIncreaseUs = 0;
- };
-
- // Pool cpu balancer
- struct IBalancer {
- virtual ~IBalancer() {}
- virtual bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) = 0;
- virtual bool TryLock(ui64 ts) = 0;
- virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0;
- virtual void Balance() = 0;
- virtual void Unlock() = 0;
- virtual ui64 GetPeriodUs() = 0;
- // TODO: add method for reconfiguration on fly
- };
-
- IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
-}
diff --git a/ydb/library/actors/core/balancer_ut.cpp b/ydb/library/actors/core/balancer_ut.cpp
deleted file mode 100644
index d8bf3da056..0000000000
--- a/ydb/library/actors/core/balancer_ut.cpp
+++ /dev/null
@@ -1,225 +0,0 @@
-#include "balancer.h"
-
-#include <ydb/library/actors/util/datetime.h>
-#include <library/cpp/lwtrace/all.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/stream/str.h>
-
-using namespace NActors;
-
-////////////////////////////////////////////////////////////////////////////////
-
-Y_UNIT_TEST_SUITE(PoolCpuBalancer) {
- struct TTest {
- TCpuManagerConfig Config;
- TCpuMask Available;
- THolder<IBalancer> Balancer;
- TVector<TCpuState> CpuStates;
- TVector<ui64> CpuUs;
- ui64 Now = 0;
-
- void SetCpuCount(size_t count) {
- Config.UnitedWorkers.CpuCount = count;
- for (TCpuId cpuId = 0; cpuId < count; cpuId++) {
- Available.Set(cpuId);
- }
- }
-
- void AddPool(ui32 minCpus, ui32 cpus, ui32 maxCpus, ui8 priority = 0) {
- TUnitedExecutorPoolConfig u;
- u.PoolId = TPoolId(Config.United.size());
- u.Balancing.Cpus = cpus;
- u.Balancing.MinCpus = minCpus;
- u.Balancing.MaxCpus = maxCpus;
- u.Balancing.Priority = priority;
- Config.United.push_back(u);
- }
-
- void Start() {
- TCpuAllocationConfig allocation(Available, Config);
- Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, 0));
- CpuStates.resize(allocation.Items.size()); // do not resize it later to avoid dangling pointers
- CpuUs.resize(CpuStates.size());
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- bool added = Balancer->AddCpu(cpuAlloc, &CpuStates[cpuAlloc.CpuId]);
- UNIT_ASSERT(added);
- }
- }
-
- void Balance(ui64 deltaTs, const TVector<ui64>& cpuUs) {
- Now += deltaTs;
- ui64 ts = Now;
- if (Balancer->TryLock(ts)) {
- for (TPoolId pool = 0; pool < cpuUs.size(); pool++) {
- CpuUs[pool] += cpuUs[pool];
- TBalancerStats stats;
- stats.Ts = ts;
- stats.CpuUs = CpuUs[pool];
- Balancer->SetPoolStats(pool, stats);
- }
- Balancer->Balance();
- Balancer->Unlock();
- }
- }
-
- void ApplyMovements() {
- for (TCpuState& state : CpuStates) {
- TPoolId current;
- TPoolId assigned;
- state.Load(assigned, current);
- state.SwitchPool(assigned);
- }
- }
-
- static TString ToStr(const TVector<ui64>& values) {
- TStringStream ss;
- ss << "{";
- for (auto v : values) {
- ss << " " << v;
- }
- ss << " }";
- return ss.Str();
- }
-
- void AssertPoolsCurrentCpus(const TVector<ui64>& cpuRequired) {
- TVector<ui64> cpuCurrent;
- cpuCurrent.resize(cpuRequired.size());
- for (TCpuState& state : CpuStates) {
- TPoolId current;
- TPoolId assigned;
- state.Load(assigned, current);
- cpuCurrent[current]++;
- }
- for (TPoolId pool = 0; pool < cpuRequired.size(); pool++) {
- UNIT_ASSERT_C(cpuCurrent[pool] == cpuRequired[pool],
- "cpu distribution mismatch, required " << ToStr(cpuRequired) << " but got " << ToStr(cpuCurrent));
- }
- }
- };
-
- Y_UNIT_TEST(StartLwtrace) {
- NLWTrace::StartLwtraceFromEnv();
- }
-
- Y_UNIT_TEST(AllOverloaded) {
- TTest t;
- int cpus = 10;
- t.SetCpuCount(cpus);
- t.AddPool(1, 1, 10); // pool=0
- t.AddPool(1, 2, 10); // pool=1
- t.AddPool(1, 3, 10); // pool=2
- t.AddPool(1, 4, 10); // pool=2
- t.Start();
- ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
- ui64 totalCpuUs = cpus * Ts2Us(dts); // pretend every pool has consumed as whole actorsystem, overload
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {totalCpuUs, totalCpuUs, totalCpuUs, totalCpuUs});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 2, 3, 4});
- }
-
- Y_UNIT_TEST(OneOverloaded) {
- TTest t;
- int cpus = 10;
- t.SetCpuCount(cpus);
- t.AddPool(1, 1, 10); // pool=0
- t.AddPool(1, 2, 10); // pool=1
- t.AddPool(1, 3, 10); // pool=2
- t.AddPool(1, 4, 10); // pool=2
- t.Start();
- ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
- ui64 totalCpuUs = cpus * Ts2Us(dts);
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {totalCpuUs, 0, 0, 0});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({7, 1, 1, 1});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {0, totalCpuUs, 0, 0});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 7, 1, 1});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {0, 0, totalCpuUs, 0});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 1, 7, 1});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {0, 0, 0, totalCpuUs});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 1, 1, 7});
- }
-
- Y_UNIT_TEST(TwoOverloadedFairness) {
- TTest t;
- int cpus = 10;
- t.SetCpuCount(cpus);
- t.AddPool(1, 1, 10); // pool=0
- t.AddPool(1, 2, 10); // pool=1
- t.AddPool(1, 3, 10); // pool=2
- t.AddPool(1, 4, 10); // pool=2
- t.Start();
- ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
- ui64 totalCpuUs = cpus * Ts2Us(dts);
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {totalCpuUs, totalCpuUs, 0, 0});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({3, 5, 1, 1});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {totalCpuUs, 0, totalCpuUs, 0});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({2, 1, 6, 1});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {totalCpuUs, 0, 0, totalCpuUs});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({2, 1, 1, 6});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {0, totalCpuUs, totalCpuUs, 0});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 3, 5, 1});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {0, totalCpuUs, 0, totalCpuUs});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 3, 1, 5});
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {0, 0, totalCpuUs, totalCpuUs});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({1, 1, 3, 5});
- }
-
- Y_UNIT_TEST(TwoOverloadedPriority) {
- TTest t;
- int cpus = 20;
- t.SetCpuCount(cpus);
- t.AddPool(1, 5, 20, 0); // pool=0
- t.AddPool(1, 5, 20, 1); // pool=1
- t.AddPool(1, 5, 20, 2); // pool=2
- t.AddPool(1, 5, 20, 3); // pool=3
- t.Start();
- ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
- ui64 mErlang = Ts2Us(dts) / 1000;
- for (int i = 0; i < cpus; i++) {
- t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 9500 * mErlang});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({2, 3, 5, 10});
- t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 8500 * mErlang});
- t.ApplyMovements();
- t.AssertPoolsCurrentCpus({3, 3, 5, 9});
- // NOTE: this operation require one move, but we do not make global analysis, so multiple steps (1->2 & 0->1) are required (can be optimized later)
- for (int i = 0; i < 3; i++) {
- t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 5500 * mErlang, 8500 * mErlang});
- t.ApplyMovements();
- }
- t.AssertPoolsCurrentCpus({2, 3, 6, 9});
- }
-}
diff --git a/ydb/library/actors/core/config.h b/ydb/library/actors/core/config.h
index bcbc87653f..d70a917847 100644
--- a/ydb/library/actors/core/config.h
+++ b/ydb/library/actors/core/config.h
@@ -10,24 +10,6 @@
namespace NActors {
- struct TBalancingConfig {
- // Default cpu count (used during overload). Zero value disables this pool balancing
- // 1) Sum of `Cpus` on all pools cannot be changed without restart
- // (changing cpu mode between Shared and Assigned is not implemented yet)
- // 2) This sum must be equal to TUnitedWorkersConfig::CpuCount,
- // otherwise `CpuCount - SUM(Cpus)` cpus will be in Shared mode (i.e. actorsystem 2.0)
- ui32 Cpus = 0;
-
- ui32 MinCpus = 0; // Lower balancing bound, should be at least 1, and not greater than `Cpus`
- ui32 MaxCpus = 0; // Higher balancing bound, should be not lower than `Cpus`
- ui8 Priority = 0; // Priority of pool to obtain cpu due to balancing (higher is better)
- ui64 ToleratedLatencyUs = 0; // p100-latency threshold indicating that more cpus are required by pool
- };
-
- struct TBalancerConfig {
- ui64 PeriodUs = 15000000; // Time between balancer steps
- };
-
enum class EASProfile {
Default,
LowCpuConsumption,
@@ -62,40 +44,6 @@ namespace NActors {
TCpuMask Affinity; // Executor thread affinity
};
- struct TUnitedExecutorPoolConfig {
- static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10);
- static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
-
- ui32 PoolId = 0;
- TString PoolName;
-
- // Resource sharing
- ui32 Concurrency = 0; // Limits simultaneously running mailboxes count if set to non-zero value (do not set if Balancing.Cpus != 0)
- TPoolWeight Weight = 0; // Weight in fair cpu-local pool scheduler
- TCpuMask Allowed; // Allowed CPUs for workers to run this pool on (ignored if balancer works, i.e. actorsystem 1.5)
-
- // Single mailbox execution limits
- TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX;
- ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
-
- // Long-term balancing
- TBalancingConfig Balancing;
- };
-
- struct TUnitedWorkersConfig {
- ui32 CpuCount = 0; // Total CPUs running united workers (i.e. TBasicExecutorPoolConfig::Threads analog); set to zero to disable united workers
- ui64 SpinThresholdUs = 100; // Limit for active spinning in case all pools became idle
- ui64 PoolLimitUs = 500; // Soft limit on pool execution
- ui64 EventLimitUs = 100; // Hard limit on last event execution exceeding pool limit
- ui64 LimitPrecisionUs = 100; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch)
- ui64 FastWorkerPriority = 10; // Real-time priority of workers not exceeding hard limits
- ui64 IdleWorkerPriority = 20; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority)
- TCpuMask Allowed; // Allowed CPUs for workers to run on (every worker has affinity for exactly one cpu)
- bool NoRealtime = false; // For environments w/o permissions for RT-threads
- bool NoAffinity = false; // For environments w/o permissions for cpu affinity
- TBalancerConfig Balancer;
- };
-
struct TSelfPingInfo {
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
@@ -103,14 +51,12 @@ namespace NActors {
};
struct TCpuManagerConfig {
- TUnitedWorkersConfig UnitedWorkers;
TVector<TBasicExecutorPoolConfig> Basic;
TVector<TIOExecutorPoolConfig> IO;
- TVector<TUnitedExecutorPoolConfig> United;
TVector<TSelfPingInfo> PingInfoByPool;
ui32 GetExecutorsCount() const {
- return Basic.size() + IO.size() + United.size();
+ return Basic.size() + IO.size();
}
TString GetPoolName(ui32 poolId) const {
@@ -124,11 +70,6 @@ namespace NActors {
return p.PoolName;
}
}
- for (const auto& p : United) {
- if (p.PoolId == poolId) {
- return p.PoolName;
- }
- }
Y_ABORT("undefined pool id: %" PRIu32, (ui32)poolId);
}
@@ -143,11 +84,6 @@ namespace NActors {
return p.Threads;
}
}
- for (const auto& p : United) {
- if (p.PoolId == poolId) {
- return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount;
- }
- }
return {};
}
@@ -220,41 +156,4 @@ namespace NActors {
}
};
- struct TCpuAllocationConfig {
- TVector<TCpuAllocation> Items;
-
- TCpuAllocationConfig(const TCpuMask& available, const TCpuManagerConfig& cfg) {
- for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
- Y_ABORT_UNLESS(pool.PoolId < MaxPools, "wrong PoolId of united executor pool: %s(%d)",
- pool.PoolName.c_str(), (pool.PoolId));
- }
- ui32 allocated[MaxPools] = {0};
- for (TCpuId cpu = 0; cpu < available.Size() && Items.size() < cfg.UnitedWorkers.CpuCount; cpu++) {
- if (available.IsSet(cpu)) {
- TCpuAllocation item;
- item.CpuId = cpu;
- for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
- if (cfg.UnitedWorkers.Allowed.IsEmpty() || cfg.UnitedWorkers.Allowed.IsSet(cpu)) {
- if (pool.Allowed.IsEmpty() || pool.Allowed.IsSet(cpu)) {
- item.AllowedPools.emplace_back(pool.PoolId, pool.Weight);
- allocated[pool.PoolId]++;
- }
- }
- }
- if (!item.AllowedPools.empty()) {
- Items.push_back(item);
- }
- }
- }
- for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
- Y_ABORT_UNLESS(allocated[pool.PoolId] > 0, "unable to allocate cpu for united executor pool: %s(%d)",
- pool.PoolName.c_str(), (pool.PoolId));
- }
- }
-
- operator bool() const {
- return !Items.empty();
- }
- };
-
}
diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp
index 24b3161e3c..1898904bb3 100644
--- a/ydb/library/actors/core/cpu_manager.cpp
+++ b/ydb/library/actors/core/cpu_manager.cpp
@@ -3,23 +3,16 @@
#include "executor_pool_basic.h"
#include "executor_pool_io.h"
-#include "executor_pool_united.h"
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
TCpuManager::TCpuManager(THolder<TActorSystemSetup>& setup)
: ExecutorPoolCount(setup->GetExecutorsCount())
- , Balancer(setup->Balancer)
, Config(setup->CpuManager)
{
if (setup->Executors) { // Explicit mode w/o united pools
Executors.Reset(setup->Executors.Release());
- for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
- IExecutorPool* pool = Executors[excIdx].Get();
- Y_ABORT_UNLESS(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr,
- "united executor pool is prohibited in explicit mode of NActors::TCpuManager");
- }
} else {
Setup();
}
@@ -28,14 +21,6 @@ namespace NActors {
void TCpuManager::Setup() {
TAffinity available;
available.Current();
- TCpuAllocationConfig allocation(available, Config);
-
- if (allocation) {
- if (!Balancer) {
- Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast()));
- }
- UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
- }
ui64 ts = GetCycleCountFast();
Harmonizer.Reset(MakeHarmonizer(ts));
@@ -53,9 +38,6 @@ namespace NActors {
}
void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
- if (UnitedWorkers) {
- UnitedWorkers->Prepare(actorSystem, scheduleReaders);
- }
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
NSchedulerQueue::TReader* readers;
ui32 readersCount = 0;
@@ -67,9 +49,6 @@ namespace NActors {
}
void TCpuManager::Start() {
- if (UnitedWorkers) {
- UnitedWorkers->Start();
- }
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->Start();
}
@@ -79,18 +58,12 @@ namespace NActors {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->PrepareStop();
}
- if (UnitedWorkers) {
- UnitedWorkers->PrepareStop();
- }
}
void TCpuManager::Shutdown() {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->Shutdown();
}
- if (UnitedWorkers) {
- UnitedWorkers->Shutdown();
- }
for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) {
done = 0;
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
@@ -112,7 +85,6 @@ namespace NActors {
}
}
Executors.Destroy();
- UnitedWorkers.Destroy();
}
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
@@ -126,12 +98,6 @@ namespace NActors {
return new TIOExecutorPool(cfg);
}
}
- for (TUnitedExecutorPoolConfig& cfg : Config.United) {
- if (cfg.PoolId == poolId) {
- IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get());
- return result;
- }
- }
Y_ABORT("missing PoolId: %d", int(poolId));
}
diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h
index 26ba97aa39..29e86170c9 100644
--- a/ydb/library/actors/core/cpu_manager.h
+++ b/ydb/library/actors/core/cpu_manager.h
@@ -2,8 +2,6 @@
#include "harmonizer.h"
#include "executor_pool.h"
-#include "executor_pool_united_workers.h"
-#include "balancer.h"
namespace NActors {
struct TActorSystemSetup;
@@ -11,8 +9,6 @@ namespace NActors {
class TCpuManager : public TNonCopyable {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
- THolder<TUnitedWorkers> UnitedWorkers;
- THolder<IBalancer> Balancer;
THolder<IHarmonizer> Harmonizer;
TCpuManagerConfig Config;
diff --git a/ydb/library/actors/core/executor_pool_united.cpp b/ydb/library/actors/core/executor_pool_united.cpp
deleted file mode 100644
index ef3f0830ea..0000000000
--- a/ydb/library/actors/core/executor_pool_united.cpp
+++ /dev/null
@@ -1,1455 +0,0 @@
-#include "executor_pool_united.h"
-#include "executor_pool_united_workers.h"
-
-#include "actor.h"
-#include "balancer.h"
-#include "cpu_state.h"
-#include "executor_thread.h"
-#include "probes.h"
-#include "mailbox.h"
-#include "scheduler_queue.h"
-#include <ydb/library/actors/util/affinity.h>
-#include <ydb/library/actors/util/cpu_load_log.h>
-#include <ydb/library/actors/util/datetime.h>
-#include <ydb/library/actors/util/futex.h>
-#include <ydb/library/actors/util/intrinsics.h>
-#include <ydb/library/actors/util/timerfd.h>
-
-#include <util/system/datetime.h>
-#include <util/system/hp_timer.h>
-
-#include <algorithm>
-
-namespace NActors {
- LWTRACE_USING(ACTORLIB_PROVIDER);
-
- struct TUnitedWorkers::TWorker: public TNonCopyable {
- TAutoPtr<TExecutorThread> Thread;
- volatile TThreadId ThreadId = UnknownThreadId;
- NSchedulerQueue::TQueueType SchedulerQueue;
- };
-
- struct TUnitedWorkers::TPool: public TNonCopyable {
- TAtomic Waiters = 0; // Number of idle cpus, waiting for activations in this pool
- char Padding[64 - sizeof(TAtomic)];
-
- TUnorderedCache<ui32, 512, 4> Activations; // MPMC-queue for mailbox activations
- TAtomic Active = 0; // Number of mailboxes ready for execution or currently executing
- TAtomic Tokens = 0; // Pending tokens (token is required for worker to start execution, guarantees concurrency limit and activation availability)
- volatile bool StopFlag = false;
-
- // Configuration
- TPoolId PoolId;
- TAtomicBase Concurrency; // Max concurrent workers running this pool
- IExecutorPool* ExecutorPool;
- TMailboxTable* MailboxTable;
- ui64 TimePerMailboxTs;
- ui32 EventsPerMailbox;
-
- // Cpus this pool is allowed to run on
- // Cpus are specified in wake order
- TStackVec<TCpu*, 15> WakeOrderCpus;
-
- ~TPool() {
- while (Activations.Pop(0)) {}
- }
-
- void Stop() {
- AtomicStore(&StopFlag, true);
- }
-
- bool IsUnited() const {
- return WakeOrderCpus.size();
- }
-
- // Add activation of newly scheduled mailbox. Returns generated token (unless concurrency is exceeded)
- bool PushActivation(ui32 activation, ui64 revolvingCounter) {
- Activations.Push(activation, revolvingCounter);
- TAtomicBase active = AtomicIncrement(Active);
- if (active <= Concurrency) { // token generated
- AtomicIncrement(Tokens);
- return true;
- }
- return false;
- }
-
- template <bool Relaxed>
- static bool TryAcquireTokenImpl(TAtomic* tokens) {
- while (true) {
- TAtomicBase value;
- if constexpr (Relaxed) {
- value = RelaxedLoad(tokens);
- } else {
- value = AtomicLoad(tokens);
- }
- if (value > 0) {
- if (AtomicCas(tokens, value - 1, value)) {
- return true; // token acquired
- }
- } else {
- return false; // no more tokens
- }
- }
- }
-
- // Try acquire pending token. Must be done before execution
- bool TryAcquireToken() {
- return TryAcquireTokenImpl<false>(&Tokens);
- }
-
- // Try acquire pending token. Must be done before execution
- bool TryAcquireTokenRelaxed() {
- return TryAcquireTokenImpl<true>(&Tokens);
- }
-
- // Get activation. Requires acquired token.
- void BeginExecution(ui32& activation, ui64 revolvingCounter) {
- while (!RelaxedLoad(&StopFlag)) {
- if (activation = Activations.Pop(++revolvingCounter)) {
- return;
- }
- SpinLockPause();
- }
- activation = 0; // should stop
- }
-
- // End currently active execution and start new one if token is available.
- // Reuses token if it's not destroyed.
- // Returned `true` means successful switch, `activation` is filled.
- // Returned `false` means execution has ended, no need to call StopExecution()
- bool NextExecution(ui32& activation, ui64 revolvingCounter) {
- if (AtomicDecrement(Active) >= Concurrency) { // reuse just released token
- BeginExecution(activation, revolvingCounter);
- return true;
- } else if (TryAcquireToken()) { // another token acquired
- BeginExecution(activation, revolvingCounter);
- return true;
- }
- return false; // no more tokens available
- }
-
- // Stop active execution. Returns released token (unless it is destroyed)
- bool StopExecution() {
- TAtomicBase active = AtomicDecrement(Active);
- if (active >= Concurrency) { // token released
- AtomicIncrement(Tokens);
- return true;
- }
- return false; // token destroyed
- }
-
- // Switch worker context into this pool
- void Switch(TWorkerContext& wctx, ui64 softDeadlineTs, TExecutorThreadStats& stats) {
- wctx.Switch(ExecutorPool, MailboxTable, TimePerMailboxTs, EventsPerMailbox, softDeadlineTs, &stats);
- }
- };
-
- class TPoolScheduler {
- class TSchedulable {
- // Lower PoolBits store PoolId
- // All other higher bits store virtual runtime in cycles
- using TValue = ui64;
- TValue Value;
-
- static constexpr ui64 PoolIdMask = ui64((1ull << PoolBits) - 1);
- static constexpr ui64 VRunTsMask = ~PoolIdMask;
-
- public:
- explicit TSchedulable(TPoolId poolId = MaxPools, ui64 vrunts = 0)
- : Value((poolId & PoolIdMask) | (vrunts & VRunTsMask))
- {}
-
- TPoolId GetPoolId() const {
- return Value & PoolIdMask;
- }
-
- ui64 GetVRunTs() const {
- // Do not truncate pool id
- // NOTE: it decrease accuracy, but improves performance
- return Value;
- }
-
- ui64 GetPreciseVRunTs() const {
- return Value & VRunTsMask;
- }
-
- void SetVRunTs(ui64 vrunts) {
- Value = (Value & PoolIdMask) | (vrunts & VRunTsMask);
- }
-
- void Account(ui64 base, ui64 ts) {
- // Add at least minimum amount to change Value
- SetVRunTs(base + Max(ts, PoolIdMask + 1));
- }
- };
-
- // For min-heap of Items
- struct TCmp {
- bool operator()(TSchedulable lhs, TSchedulable rhs) const {
- return lhs.GetVRunTs() > rhs.GetVRunTs();
- }
- };
-
- TPoolId Size = 0; // total number of pools on this cpu
- TPoolId Current = 0; // index of current pool in `Items`
-
- // At the beginning `Current` items are orginized as binary min-heap -- ready to be scheduled
- // The rest `Size - Current` items are unordered (required to keep track of last vrunts)
- TSchedulable Items[MaxPools]; // virtual runtime in cycles for each pool
- ui64 MinVRunTs = 0; // virtual runtime used by waking pools (system's vrunts)
- ui64 Ts = 0; // real timestamp of current execution start (for accounting)
-
- // Maps PoolId into it's inverse weight
- ui64 InvWeights[MaxPools];
- static constexpr ui64 VRunTsOverflow = ui64(1ull << 62ull) / MaxPoolWeight;
-
- public:
- void AddPool(TPoolId pool, TPoolWeight weight) {
- Items[Size] = TSchedulable(pool, MinVRunTs);
- Size++;
- InvWeights[pool] = MaxPoolWeight / std::clamp(weight ? weight : DefPoolWeight, MinPoolWeight, MaxPoolWeight);
- }
-
- // Iterate over pools in scheduling order
- // should be used in construction:
- // for (TPoolId pool = Begin(); pool != End(); pool = Next())
- TPoolId Begin() {
- // Wrap vruntime around to avoid overflow, if required
- if (Y_UNLIKELY(MinVRunTs >= VRunTsOverflow)) {
- for (TPoolId i = 0; i < Size; i++) {
- ui64 ts = Items[i].GetPreciseVRunTs();
- Items[i].SetVRunTs(ts >= VRunTsOverflow ? ts - VRunTsOverflow : 0);
- }
- MinVRunTs -= VRunTsOverflow;
- }
- Current = Size;
- std::make_heap(Items, Items + Current, TCmp());
- return Next();
- }
-
- constexpr TPoolId End() const {
- return MaxPools;
- }
-
- TPoolId Next() {
- if (Current > 0) {
- std::pop_heap(Items, Items + Current, TCmp());
- Current--;
- return CurrentPool();
- } else {
- return End();
- }
- }
-
- // Scheduling was successful, we are going to run CurrentPool()
- void Scheduled() {
- MinVRunTs = Max(MinVRunTs, Items[Current].GetPreciseVRunTs());
- // NOTE: Ts is propagated on Account() to avoid gaps
- }
-
- // Schedule specific pool that woke up cpu after idle
- void ScheduledAfterIdle(TPoolId pool, ui64 ts) {
- if (Y_UNLIKELY(ts < Ts)) { // anomaly: time goes backwards (e.g. rdtsc is reset to zero on cpu reset)
- Ts = ts; // just skip anomalous time slice
- return;
- }
- MinVRunTs += (ts - Ts) * (MaxPoolWeight / DefPoolWeight); // propagate system's vrunts to blur difference between pools
- Ts = ts; // propagate time w/o accounting to any pool
-
- // Set specified pool as current, it requires scan
- for (Current = 0; Current < Size && pool != Items[Current].GetPoolId(); Current++) {}
- Y_ABORT_UNLESS(Current < Size);
- }
-
- // Account currently running pool till now (ts)
- void Account(ui64 ts) {
- // Skip time slice for the first run and when time goes backwards (e.g. rdtsc is reset to zero on cpu reset)
- if (Y_LIKELY(Ts > 0 && Ts <= ts)) {
- TPoolId pool = CurrentPool();
- Y_ABORT_UNLESS(pool < MaxPools);
- Items[Current].Account(MinVRunTs, (ts - Ts) * InvWeights[pool]);
- }
- Ts = ts; // propagate time
- }
-
- TPoolId CurrentPool() const {
- return Items[Current].GetPoolId();
- }
- };
-
- // Cyclic array of timers for idle workers to wait for hard preemption on
- struct TIdleQueue: public TNonCopyable {
- TArrayHolder<TTimerFd> Timers;
- size_t Size;
- TAtomic EnqueueCounter = 0;
- TAtomic DequeueCounter = 0;
-
- explicit TIdleQueue(size_t size)
- : Timers(new TTimerFd[size])
- , Size(size)
- {}
-
- void Stop() {
- for (size_t i = 0; i < Size; i++) {
- Timers[i].Wake();
- }
- }
-
- // Returns timer which new idle-worker should wait for
- TTimerFd* Enqueue() {
- return &Timers[AtomicGetAndIncrement(EnqueueCounter) % Size];
- }
-
- // Returns timer that hard preemption should trigger to wake idle-worker
- TTimerFd* Dequeue() {
- return &Timers[AtomicGetAndIncrement(DequeueCounter) % Size];
- }
- };
-
- // Base class for cpu-local managers that help workers on single cpu to cooperate
- struct TCpuLocalManager: public TThrRefBase {
- TUnitedWorkers* United;
-
- explicit TCpuLocalManager(TUnitedWorkers* united)
- : United(united)
- {}
-
- virtual TWorkerId WorkerCount() const = 0;
- virtual void AddWorker(TWorkerId workerId) = 0;
- virtual void Stop() = 0;
- };
-
- // Represents cpu with single associated worker that is able to execute any pool.
- // It always executes pool assigned by balancer and switch pool only if assigned pool has changed
- struct TAssignedCpu: public TCpuLocalManager {
- bool Started = false;
-
- TAssignedCpu(TUnitedWorkers* united)
- : TCpuLocalManager(united)
- {}
-
- TWorkerId WorkerCount() const override {
- return 1;
- }
-
- void AddWorker(TWorkerId workerId) override {
- Y_UNUSED(workerId);
- }
-
- ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
- ui32 activation;
- if (Y_UNLIKELY(!Started)) {
- Started = true;
- } else if (Y_UNLIKELY(United->IsPoolReassigned(wctx))) {
- United->StopExecution(wctx.PoolId); // stop current execution and switch pool if reassigned
- } else if (United->NextExecution(wctx.PoolId, activation, revolvingCounter)) {
- return activation; // another activation from currently executing pool (or 0 if stopped)
- }
-
- // Switch to another pool, it blocks until token is acquired
- if (Y_UNLIKELY(!SwitchPool(wctx))) {
- return 0; // stopped
- }
- United->SwitchPool(wctx, 0);
- United->BeginExecution(wctx.PoolId, activation, revolvingCounter);
- return activation;
- }
-
- void Stop() override {
- }
-
- private:
- // Sets next pool to run, and acquires token, blocks if there are no tokens
- bool SwitchPool(TWorkerContext& wctx) {
- if (Y_UNLIKELY(United->IsStopped())) {
- return false;
- }
-
- // Run balancer (if it's time to)
- United->Balance();
-
- // Select pool to execute
- wctx.PoolId = United->AssignedPool(wctx);
- Y_ABORT_UNLESS(wctx.PoolId != CpuShared);
- if (United->TryAcquireToken(wctx.PoolId)) {
- return true;
- }
-
- // No more work -- wait for activations (spinning, then blocked)
- wctx.PoolId = United->Idle(wctx.PoolId, wctx);
-
- // Wakeup or stop occured
- if (Y_UNLIKELY(wctx.PoolId == CpuStopped)) {
- return false;
- }
- return true; // United->Idle() has already acquired token
- }
- };
-
- // Lock-free data structure that help workers on single cpu to discover their state and do hard preemptions
- struct TSharedCpu: public TCpuLocalManager {
- // Current lease
- volatile TLease::TValue CurrentLease;
- char Padding1[64 - sizeof(TLease)];
-
- // Slow pools
- // the highest bit: 1=wait-for-slow-workers mode 0=else
- // any lower bit (poolId is bit position): 1=pool-is-slow 0=pool-is-fast
- volatile TPoolsMask SlowPoolsMask = 0;
- char Padding2[64 - sizeof(TPoolsMask)];
-
- // Must be accessed under never expiring lease to avoid races
- TPoolScheduler PoolSched;
- TWorkerId FastWorker = MaxWorkers;
- TTimerFd* PreemptionTimer = nullptr;
- ui64 HardPreemptionTs = 0;
- bool Started = false;
-
- TIdleQueue IdleQueue;
-
- struct TConfig {
- const TCpuId CpuId;
- const TWorkerId Workers;
- ui64 SoftLimitTs;
- ui64 HardLimitTs;
- ui64 EventLimitTs;
- ui64 LimitPrecisionTs;
- const int IdleWorkerPriority;
- const int FastWorkerPriority;
- const bool NoRealtime;
- const bool NoAffinity;
- const TCpuAllocation CpuAlloc;
-
- TConfig(const TCpuAllocation& allocation, const TUnitedWorkersConfig& united)
- : CpuId(allocation.CpuId)
- , Workers(allocation.AllowedPools.size() + 1)
- , SoftLimitTs(Us2Ts(united.PoolLimitUs))
- , HardLimitTs(Us2Ts(united.PoolLimitUs + united.EventLimitUs))
- , EventLimitTs(Us2Ts(united.EventLimitUs))
- , LimitPrecisionTs(Us2Ts(united.LimitPrecisionUs))
- , IdleWorkerPriority(std::clamp<ui64>(united.IdleWorkerPriority ? united.IdleWorkerPriority : 20, 1, 99))
- , FastWorkerPriority(std::clamp<ui64>(united.FastWorkerPriority ? united.FastWorkerPriority : 10, 1, IdleWorkerPriority - 1))
- , NoRealtime(united.NoRealtime)
- , NoAffinity(united.NoAffinity)
- , CpuAlloc(allocation)
- {}
- };
-
- TConfig Config;
- TVector<TWorkerId> Workers;
-
- TSharedCpu(const TConfig& cfg, TUnitedWorkers* united)
- : TCpuLocalManager(united)
- , IdleQueue(cfg.Workers)
- , Config(cfg)
- {
- for (const auto& pa : Config.CpuAlloc.AllowedPools) {
- PoolSched.AddPool(pa.PoolId, pa.Weight);
- }
- }
-
- TWorkerId WorkerCount() const override {
- return Config.Workers;
- }
-
- void AddWorker(TWorkerId workerId) override {
- if (Workers.empty()) {
- // Grant lease to the first worker
- AtomicStore(&CurrentLease, TLease(workerId, NeverExpire).Value);
- }
- Workers.push_back(workerId);
- }
-
- ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
- ui32 activation;
- if (!wctx.Lease.IsNeverExpiring()) {
- if (wctx.SoftDeadlineTs < GetCycleCountFast()) { // stop if lease has expired or is near to be expired
- United->StopExecution(wctx.PoolId);
- } else if (United->NextExecution(wctx.PoolId, activation, revolvingCounter)) {
- return activation; // another activation from currently executing pool (or 0 if stopped)
- }
- }
-
- // Switch to another pool, it blocks until token is acquired
- if (Y_UNLIKELY(!SwitchPool(wctx))) {
- return 0; // stopped
- }
- United->BeginExecution(wctx.PoolId, activation, revolvingCounter);
- return activation;
- }
-
- void Stop() override {
- IdleQueue.Stop();
- }
-
- private:
- enum EPriority {
- IdlePriority, // highest (real-time, Config.IdleWorkerPriority)
- FastPriority, // normal (real-time, Config.FastWorkerPriority)
- SlowPriority, // lowest (not real-time)
- };
-
- enum EWorkerAction {
- // Fast-worker
- ExecuteFast,
- WaitForSlow,
-
- // Slow-worker
- BecameIdle,
- WakeFast,
-
- // Idle-worker
- BecameFast,
- Standby,
-
- // Common
- Stopped,
- };
-
- // Thread-safe; should be called from worker
- // Blocks for idle-workers; sets lease and next pool to run
- bool SwitchPool(TWorkerContext& wctx) {
- TTimerFd* idleTimer = nullptr;
- while (true) {
- if (DisablePreemptionAndTryExtend(wctx.Lease)) { // if fast-worker
- if (Y_UNLIKELY(!Started)) {
- SetPriority(0, FastPriority);
- Started = true;
- }
- while (true) {
- switch (FastWorkerAction(wctx)) {
- case ExecuteFast:
- United->SwitchPool(wctx, wctx.Lease.GetPreciseExpireTs() - Config.EventLimitTs);
- EnablePreemptionAndGrant(wctx.Lease);
- return true;
- case WaitForSlow:
- FastWorkerSleep(GetCycleCountFast() + Config.SoftLimitTs);
- break;
- case Stopped: return false;
- default: Y_ABORT();
- }
- }
- } else if (wctx.Lease.IsNeverExpiring()) { // if idle-worker
- switch (IdleWorkerAction(idleTimer, wctx.Lease.GetWorkerId())) {
- case BecameFast:
- SetPriority(0, FastPriority);
- break; // try acquire new lease
- case Standby:
- if (!idleTimer) {
- idleTimer = IdleQueue.Enqueue();
- }
- SetPriority(0, IdlePriority);
- idleTimer->Wait();
- break;
- case Stopped: return false;
- default: Y_ABORT();
- }
- } else { // lease has expired and hard preemption occured, so we are executing in a slow-worker
- wctx.IncrementPreemptedEvents();
- switch (SlowWorkerAction(wctx.PoolId)) {
- case WakeFast:
- WakeFastWorker();
- [[fallthrough]]; // no break; pass through
- case BecameIdle:
- wctx.Lease = wctx.Lease.NeverExpire();
- wctx.PoolId = MaxPools;
- idleTimer = nullptr;
- break;
- case Stopped: return false;
- default: Y_ABORT();
- }
- }
- }
- }
-
- enum ETryRunPool {
- RunFastPool,
- RunSlowPool,
- NoTokens,
- };
-
- ETryRunPool TryRun(TPoolId pool) {
- while (true) {
- // updates WaitPoolsFlag in SlowPoolsMask according to scheduled pool slowness
- TPoolsMask slow = AtomicLoad(&SlowPoolsMask);
- if ((1ull << pool) & slow) { // we are about to execute slow pool (fast-worker will just wait, token is NOT required)
- if (slow & WaitPoolsFlag) {
- return RunSlowPool; // wait flag is already set
- } else {
- if (AtomicCas(&SlowPoolsMask, slow | WaitPoolsFlag, slow)) { // try set wait flag
- return RunSlowPool; // wait flag has been successfully set
- }
- }
- } else { // we are about to execute fast pool, token required
- if (slow & WaitPoolsFlag) { // reset wait flag if required
- if (AtomicCas(&SlowPoolsMask, slow & ~WaitPoolsFlag, slow)) { // try reset wait flag
- return United->TryAcquireToken(pool) ? RunFastPool : NoTokens; // wait flag has been successfully reset
- }
- } else {
- return United->TryAcquireToken(pool) ? RunFastPool : NoTokens; // wait flag is already reset
- }
- }
- }
- }
-
- EWorkerAction FastWorkerAction(TWorkerContext& wctx) {
- if (Y_UNLIKELY(United->IsStopped())) {
- return Stopped;
- }
-
- // Account current pool
- ui64 ts = GetCycleCountFast();
- PoolSched.Account(ts);
-
- // Select next pool to execute
- for (wctx.PoolId = PoolSched.Begin(); wctx.PoolId != PoolSched.End(); wctx.PoolId = PoolSched.Next()) {
- switch (TryRun(wctx.PoolId)) {
- case RunFastPool:
- PoolSched.Scheduled();
- wctx.Lease = PostponePreemption(wctx.Lease.GetWorkerId(), ts);
- return ExecuteFast;
- case RunSlowPool:
- PoolSched.Scheduled();
- ResetPreemption(wctx.Lease.GetWorkerId(), ts); // there is no point in preemption during wait
- return WaitForSlow;
- case NoTokens: // concurrency limit reached, or no more work in pool
- break; // just try next pool (if any)
- }
- }
-
- // No more work, no slow-workers -- wait for activations (active, then blocked)
- wctx.PoolId = United->Idle(CpuShared, wctx);
-
- // Wakeup or stop occured
- if (Y_UNLIKELY(wctx.PoolId == CpuStopped)) {
- return Stopped;
- }
- ts = GetCycleCountFast();
- PoolSched.ScheduledAfterIdle(wctx.PoolId, ts);
- wctx.Lease = PostponePreemption(wctx.Lease.GetWorkerId(), ts);
- return ExecuteFast; // United->Idle() has already acquired token
- }
-
- EWorkerAction IdleWorkerAction(TTimerFd* idleTimer, TWorkerId workerId) {
- if (Y_UNLIKELY(United->IsStopped())) {
- return Stopped;
- }
- if (!idleTimer) { // either worker start or became idle -- hard preemption is not required
- return Standby;
- }
-
- TLease lease = TLease(AtomicLoad(&CurrentLease));
- ui64 ts = GetCycleCountFast();
- if (lease.GetExpireTs() < ts) { // current lease has expired
- if (TryBeginHardPreemption(lease)) {
- SetPoolIsSlowFlag(PoolSched.CurrentPool());
- TWorkerId preempted = lease.GetWorkerId();
- SetPriority(United->GetWorkerThreadId(preempted), SlowPriority);
- LWPROBE(HardPreemption, Config.CpuId, PoolSched.CurrentPool(), preempted, workerId);
- EndHardPreemption(workerId);
- return BecameFast;
- } else {
- // Lease has been changed just now, no way we need preemption right now, so no retry needed
- return Standby;
- }
- } else {
- // Lease has not expired yet (maybe never expiring lease)
- return Standby;
- }
- }
-
- EWorkerAction SlowWorkerAction(TPoolId pool) {
- if (Y_UNLIKELY(United->IsStopped())) {
- return Stopped;
- }
- while (true) {
- TPoolsMask slow = AtomicLoad(&SlowPoolsMask);
- if (slow & (1ull << pool)) {
- if (slow == (1ull << pool) & WaitPoolsFlag) { // the last slow pool is going to became fast
- if (AtomicCas(&SlowPoolsMask, 0, slow)) { // reset both pool-is-slow flag and WaitPoolsFlag
- return WakeFast;
- }
- } else { // there are (a) several slow-worker or (b) one slow-worker w/o waiting fast-worker
- if (AtomicCas(&SlowPoolsMask, slow & ~(1ull << pool), slow)) { // reset pool-is-slow flag
- return BecameIdle;
- }
- }
- } else {
- // SlowWorkerAction has been called between TryBeginHardPreemption and SetPoolIsSlowFlag
- // flag for this pool is not set yet, but we can be sure pool is slow:
- // - because SlowWorkerAction has been called;
- // - this mean lease has expired and hard preemption occured.
- // So just wait other worker to call SetPoolIsSlowFlag
- LWPROBE(SlowWorkerActionRace, Config.CpuId, pool, slow);
- }
- }
- }
-
- void SetPoolIsSlowFlag(TPoolId pool) {
- while (true) {
- TPoolsMask slow = AtomicLoad(&SlowPoolsMask);
- if ((slow & (1ull << pool)) == 0) { // if pool is fast
- if (AtomicCas(&SlowPoolsMask, slow | (1ull << pool), slow)) { // set pool-is-slow flag
- return;
- }
- } else {
- Y_ABORT("two slow-workers executing the same pool on the same core");
- return; // pool is already slow
- }
- }
- }
-
- bool TryBeginHardPreemption(TLease lease) {
- return AtomicCas(&CurrentLease, HardPreemptionLease, lease);
- }
-
- void EndHardPreemption(TWorkerId to) {
- ATOMIC_COMPILER_BARRIER();
- if (!AtomicCas(&CurrentLease, TLease(to, NeverExpire), HardPreemptionLease)) {
- Y_ABORT("hard preemption failed");
- }
- }
-
- bool DisablePreemptionAndTryExtend(TLease lease) {
- return AtomicCas(&CurrentLease, lease.NeverExpire(), lease);
- }
-
- void EnablePreemptionAndGrant(TLease lease) {
- ATOMIC_COMPILER_BARRIER();
- if (!AtomicCas(&CurrentLease, lease, lease.NeverExpire())) {
- Y_ABORT("lease grant failed");
- }
- }
-
- void FastWorkerSleep(ui64 deadlineTs) {
- while (true) {
- TPoolsMask slow = AtomicLoad(&SlowPoolsMask);
- if ((slow & WaitPoolsFlag) == 0) {
- return; // woken by WakeFast action
- }
- ui64 ts = GetCycleCountFast();
- if (deadlineTs <= ts) {
- if (AtomicCas(&SlowPoolsMask, slow & ~WaitPoolsFlag, slow)) { // try reset wait flag
- return; // wait flag has been successfully reset after timeout
- }
- } else { // should wait
- ui64 timeoutNs = Ts2Ns(deadlineTs - ts);
-#ifdef _linux_
- timespec timeout;
- timeout.tv_sec = timeoutNs / 1'000'000'000;
- timeout.tv_nsec = timeoutNs % 1'000'000'000;
- SysFutex(FastWorkerFutex(), FUTEX_WAIT_PRIVATE, FastWorkerFutexValue(slow), &timeout, nullptr, 0);
-#else
- NanoSleep(timeoutNs); // non-linux wake is not supported, cpu will go idle on slow -> fast switch
-#endif
- }
- }
- }
-
- void WakeFastWorker() {
-#ifdef _linux_
- SysFutex(FastWorkerFutex(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
-#endif
- }
-
-#ifdef _linux_
- ui32* FastWorkerFutex() {
- // Actually we wait on one highest bit, but futex value size is 4 bytes on all platforms
- static_assert(sizeof(TPoolsMask) >= 4, "cannot be used as futex value on linux");
- return (ui32*)&SlowPoolsMask + 1; // higher 32 bits (little endian assumed)
- }
-
- ui32 FastWorkerFutexValue(TPoolsMask slow) {
- return ui32(slow >> 32); // higher 32 bits
- }
-#endif
-
- void SetPriority(TThreadId tid, EPriority priority) {
- if (Config.NoRealtime) {
- return;
- }
-#ifdef _linux_
- int policy;
- struct sched_param param;
- switch (priority) {
- case IdlePriority:
- policy = SCHED_FIFO;
- param.sched_priority = Config.IdleWorkerPriority;
- break;
- case FastPriority:
- policy = SCHED_FIFO;
- param.sched_priority = Config.FastWorkerPriority;
- break;
- case SlowPriority:
- policy = SCHED_OTHER;
- param.sched_priority = 0;
- break;
- }
- int ret = sched_setscheduler(tid, policy, &param);
- switch (ret) {
- case 0: return;
- case EINVAL:
- Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> EINVAL", tid, policy, param.sched_priority);
- case EPERM:
- // Requirements:
- // * CAP_SYS_NICE capability to run real-time processes and set cpu affinity.
- // Either run under root or set application capabilities:
- // sudo setcap cap_sys_nice=eip BINARY
- // * Non-zero rt-runtime (in case cgroups are used).
- // Either (a) disable global limit on RT processes bandwidth:
- // sudo sysctl -w kernel.sched_rt_runtime_us=-1
- // Or (b) set non-zero rt-runtime for your cgroup:
- // echo -1 > /sys/fs/cgroup/cpu/[cgroup]/cpu.rt_runtime_us
- // (also set the same value for every parent cgroup)
- // https://www.kernel.org/doc/Documentation/scheduler/sched-rt-group.txt
- Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> EPERM", tid, policy, param.sched_priority);
- case ESRCH:
- Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> ESRCH", tid, policy, param.sched_priority);
- default:
- Y_ABORT("sched_setscheduler(%" PRIu64 ", %d, %d) -> %d", tid, policy, param.sched_priority, ret);
- }
-#else
- Y_UNUSED(tid);
- Y_UNUSED(priority);
-#endif
- }
-
- void ResetPreemption(TWorkerId fastWorkerId, ui64 ts) {
- if (Y_UNLIKELY(!PreemptionTimer)) {
- return;
- }
- if (FastWorker == fastWorkerId && HardPreemptionTs > 0) {
- PreemptionTimer->Reset();
- LWPROBE(ResetPreemptionTimer, Config.CpuId, FastWorker, PreemptionTimer->Fd, Ts2Ms(ts), Ts2Ms(HardPreemptionTs));
- HardPreemptionTs = 0;
- }
- }
-
- TLease PostponePreemption(TWorkerId fastWorkerId, ui64 ts) {
- // Select new timer after hard preemption
- if (FastWorker != fastWorkerId) {
- FastWorker = fastWorkerId;
- PreemptionTimer = IdleQueue.Dequeue();
- HardPreemptionTs = 0;
- }
-
- ui64 hardPreemptionTs = ts + Config.HardLimitTs;
- if (hardPreemptionTs > HardPreemptionTs) {
- // Reset timer (at most once in TickIntervalTs, sacrifice precision)
- HardPreemptionTs = hardPreemptionTs + Config.LimitPrecisionTs;
- PreemptionTimer->Set(HardPreemptionTs);
- LWPROBE(SetPreemptionTimer, Config.CpuId, FastWorker, PreemptionTimer->Fd, Ts2Ms(ts), Ts2Ms(HardPreemptionTs));
- }
-
- return TLease(fastWorkerId, hardPreemptionTs);
- }
- };
-
- // Proxy for start and switching TUnitedExecutorPool-s on single cpu via GetReadyActivation()
- // (does not implement any other method in IExecutorPool)
- class TCpuExecutorPool: public IExecutorPool {
- const TString Name;
-
- public:
- explicit TCpuExecutorPool(const TString& name)
- : IExecutorPool(MaxPools)
- , Name(name)
- {}
-
- TString GetName() const override {
- return Name;
- }
-
- void SetRealTimeMode() const override {
- // derived classes controls rt-priority - do nothing
- }
-
- // Should never be called
- void ReclaimMailbox(TMailboxType::EType, ui32, TWorkerId, ui64) override { Y_ABORT(); }
- TMailboxHeader *ResolveMailbox(ui32) override { Y_ABORT(); }
- void Schedule(TInstant, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_ABORT(); }
- void Schedule(TMonotonic, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_ABORT(); }
- void Schedule(TDuration, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_ABORT(); }
- bool Send(TAutoPtr<IEventHandle>&) override { Y_ABORT(); }
- bool SpecificSend(TAutoPtr<IEventHandle>&) override { Y_ABORT(); }
- void ScheduleActivation(ui32) override { Y_ABORT(); }
- void SpecificScheduleActivation(ui32) override { Y_ABORT(); }
- void ScheduleActivationEx(ui32, ui64) override { Y_ABORT(); }
- TActorId Register(IActor*, TMailboxType::EType, ui64, const TActorId&) override { Y_ABORT(); }
- TActorId Register(IActor*, TMailboxHeader*, ui32, const TActorId&) override { Y_ABORT(); }
- void Prepare(TActorSystem*, NSchedulerQueue::TReader**, ui32*) override { Y_ABORT(); }
- void Start() override { Y_ABORT(); }
- void PrepareStop() override { Y_ABORT(); }
- void Shutdown() override { Y_ABORT(); }
- bool Cleanup() override { Y_ABORT(); }
- };
-
- // Proxy executor pool working with cpu-local scheduler (aka actorsystem 2.0)
- class TSharedCpuExecutorPool: public TCpuExecutorPool {
- TSharedCpu* Local;
- TIntrusivePtr<TAffinity> SingleCpuAffinity; // no migration support yet
- public:
- explicit TSharedCpuExecutorPool(TSharedCpu* local, const TUnitedWorkersConfig& config)
- : TCpuExecutorPool("u-" + ToString(local->Config.CpuId))
- , Local(local)
- , SingleCpuAffinity(config.NoAffinity ? nullptr : new TAffinity(TCpuMask(local->Config.CpuId)))
- {}
-
- TAffinity* Affinity() const override {
- return SingleCpuAffinity.Get();
- }
-
- ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
- return Local->GetReadyActivation(wctx, revolvingCounter);
- }
- };
-
- // Proxy executor pool working with balancer and assigned pools (aka actorsystem 1.5)
- class TAssignedCpuExecutorPool: public TCpuExecutorPool {
- TAssignedCpu* Local;
- TIntrusivePtr<TAffinity> CpuAffinity;
- public:
- explicit TAssignedCpuExecutorPool(TAssignedCpu* local, const TUnitedWorkersConfig& config)
- : TCpuExecutorPool("United")
- , Local(local)
- , CpuAffinity(config.NoAffinity ? nullptr : new TAffinity(config.Allowed))
- {}
-
- TAffinity* Affinity() const override {
- return CpuAffinity.Get();
- }
-
- ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
- return Local->GetReadyActivation(wctx, revolvingCounter);
- }
- };
-
- // Representation of a single cpu and it's state visible to other cpus and pools
- struct TUnitedWorkers::TCpu: public TNonCopyable {
- struct TScopedWaiters {
- TCpu& Cpu;
- TPool* AssignedPool; // nullptr if CpuShared
-
- // Subscribe on wakeups from allowed pools
- TScopedWaiters(TCpu& cpu, TPool* assignedPool) : Cpu(cpu), AssignedPool(assignedPool) {
- if (!AssignedPool) {
- for (TPool* pool : Cpu.AllowedPools) {
- AtomicIncrement(pool->Waiters);
- }
- } else {
- AtomicIncrement(AssignedPool->Waiters);
- }
- }
-
- // Unsubscribe from pools we've subscribed on
- ~TScopedWaiters() {
- if (!AssignedPool) {
- for (TPool* pool : Cpu.AllowedPools) {
- AtomicDecrement(pool->Waiters);
- }
- } else {
- AtomicDecrement(AssignedPool->Waiters);
- }
- }
- };
-
- // Current cpu state important for other cpus and balancer
- TCpuState State;
-
- // Thread-safe per pool stats
- // NOTE: It's guaranteed that cpu never executes two instance of the same pool
- TVector<TExecutorThreadStats> PoolStats;
- TCpuLoadLog<1024> LoadLog;
-
-
- // Configuration
- TCpuId CpuId;
- THolder<TCpuLocalManager> LocalManager;
- THolder<TCpuExecutorPool> ExecutorPool;
-
- // Pools allowed to run on this cpu
- TStackVec<TPool*, 15> AllowedPools;
-
- void Stop() {
- if (LocalManager) {
- State.Stop();
- LocalManager->Stop();
- }
- }
-
- bool StartSpinning(TUnitedWorkers* united, TPool* assignedPool, TPoolId& result) {
- // Mark cpu as idle
- if (Y_UNLIKELY(!State.StartSpinning())) {
- result = CpuStopped;
- return true;
- }
-
- // Avoid using multiple atomic seq_cst loads in cycle, use barrier once and relaxed ops
- AtomicBarrier();
-
- // Check there is no pending tokens (can be released before Waiters increment)
- if (!assignedPool) {
- for (TPool* pool : AllowedPools) {
- if (pool->TryAcquireTokenRelaxed()) {
- result = WakeWithTokenAcquired(united, pool->PoolId);
- return true; // token acquired or stop
- }
- }
- } else {
- if (assignedPool->TryAcquireTokenRelaxed()) {
- result = WakeWithTokenAcquired(united, assignedPool->PoolId);
- return true; // token acquired or stop
- }
- }
-
- // At this point we can be sure wakeup won't be lost
- // So we can actively spin or block w/o checking for pending tokens
- return false;
- }
-
- bool ActiveWait(ui64 spinThresholdTs, TPoolId& result) {
- ui64 ts = GetCycleCountFast();
- LoadLog.RegisterBusyPeriod(ts);
- ui64 deadline = ts + spinThresholdTs;
- while (GetCycleCountFast() < deadline) {
- for (ui32 i = 0; i < 12; ++i) {
- TPoolId current = State.CurrentPool();
- if (current == CpuSpinning) {
- SpinLockPause();
- } else {
- result = current;
- LoadLog.RegisterIdlePeriod(GetCycleCountFast());
- return true; // wakeup
- }
- }
- }
- return false; // spin threshold exceeded, no wakeups
- }
-
- bool StartBlocking(TPoolId& result) {
- // Switch into blocked state
- if (State.StartBlocking()) {
- result = State.CurrentPool();
- return true;
- } else {
- return false;
- }
- }
-
- bool BlockedWait(TPoolId& result, ui64 timeoutNs) {
- return State.Block(timeoutNs, result);
- }
-
- void SwitchPool(TPoolId pool) {
- return State.SwitchPool(pool);
- }
-
- private:
- TPoolId WakeWithTokenAcquired(TUnitedWorkers* united, TPoolId token) {
- switch (State.WakeWithTokenAcquired(token)) {
- case TCpuState::Woken: // we've got token and successfully woken up this cpu
- // NOTE: sending thread may also wakeup another worker, which wont be able to acquire token and will go idle (it's ok)
- return token;
- case TCpuState::NotIdle: { // wakeup event has also occured
- TPoolId wakeup = State.CurrentPool();
- if (wakeup != token) { // token and wakeup for different pools
- united->TryWake(wakeup); // rewake another cpu to avoid losing wakeup
- }
- return token;
- }
- case TCpuState::Forbidden:
- Y_ABORT();
- case TCpuState::Stopped:
- return CpuStopped;
- }
- }
- };
-
- TUnitedWorkers::TUnitedWorkers(
- const TUnitedWorkersConfig& config,
- const TVector<TUnitedExecutorPoolConfig>& unitedPools,
- const TCpuAllocationConfig& allocation,
- IBalancer* balancer)
- : Balancer(balancer)
- , Config(config)
- , Allocation(allocation)
- {
- // Find max pool id and initialize pools
- PoolCount = 0;
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- for (const auto& pa : cpuAlloc.AllowedPools) {
- PoolCount = Max<size_t>(PoolCount, pa.PoolId + 1);
- }
- }
- Pools.Reset(new TPool[PoolCount]);
-
- // Find max cpu id and initialize cpus
- CpuCount = 0;
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- CpuCount = Max<size_t>(CpuCount, cpuAlloc.CpuId + 1);
- }
- Cpus.Reset(new TCpu[CpuCount]);
-
- // Setup allocated cpus
- // NOTE: leave gaps for not allocated cpus (default-initialized)
- WorkerCount = 0;
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- TCpu& cpu = Cpus[cpuAlloc.CpuId];
- cpu.CpuId = cpuAlloc.CpuId;
- cpu.PoolStats.resize(PoolCount); // NOTE: also may have gaps
- for (const auto& pa : cpuAlloc.AllowedPools) {
- cpu.AllowedPools.emplace_back(&Pools[pa.PoolId]);
- }
-
- // Setup balancing and cpu-local manager
- if (!Balancer->AddCpu(cpuAlloc, &cpu.State)) {
- cpu.State.SwitchPool(0); // set initial state to non-idle to avoid losing wakeups on start
- cpu.State.AssignPool(CpuShared);
- TSharedCpu* local = new TSharedCpu(TSharedCpu::TConfig(cpuAlloc, Config), this);
- cpu.LocalManager.Reset(local);
- cpu.ExecutorPool.Reset(new TSharedCpuExecutorPool(local, Config));
- } else {
- TAssignedCpu* local = new TAssignedCpu(this);
- cpu.LocalManager.Reset(local);
- cpu.ExecutorPool.Reset(new TAssignedCpuExecutorPool(local, Config));
- }
- WorkerCount += cpu.LocalManager->WorkerCount();
- }
-
- // Initialize workers
- Workers.Reset(new TWorker[WorkerCount]);
-
- // Setup pools
- // NOTE: leave gaps for not united pools (default-initialized)
- for (const TUnitedExecutorPoolConfig& cfg : unitedPools) {
- TPool& pool = Pools[cfg.PoolId];
- Y_ABORT_UNLESS(cfg.PoolId < MaxPools);
- pool.PoolId = cfg.PoolId;
- pool.Concurrency = cfg.Concurrency ? cfg.Concurrency : Config.CpuCount;
- pool.ExecutorPool = nullptr; // should be set later using SetupPool()
- pool.MailboxTable = nullptr; // should be set later using SetupPool()
- pool.TimePerMailboxTs = DurationToCycles(cfg.TimePerMailbox);
- pool.EventsPerMailbox = cfg.EventsPerMailbox;
-
- // Reinitialize per cpu pool stats with right MaxActivityType
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- TCpu& cpu = Cpus[cpuAlloc.CpuId];
- cpu.PoolStats[cfg.PoolId] = TExecutorThreadStats();
- }
-
- // Setup WakeOrderCpus: left to right exclusive cpus, then left to right shared cpus.
- // Waking exclusive cpus first reduce load on shared cpu and improve latency isolation, which is
- // the point of using exclusive cpu. But note that number of actively spinning idle cpus may increase,
- // so cpu consumption on light load is higher.
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- TCpu& cpu = Cpus[cpuAlloc.CpuId];
- if (cpu.AllowedPools.size() == 1 && cpu.AllowedPools[0] == &pool) {
- pool.WakeOrderCpus.emplace_back(&cpu);
- }
- }
- for (const TCpuAllocation& cpuAlloc : allocation.Items) {
- TCpu& cpu = Cpus[cpuAlloc.CpuId];
- if (cpu.AllowedPools.size() > 1 && cpuAlloc.HasPool(pool.PoolId)) {
- pool.WakeOrderCpus.emplace_back(&cpu);
- }
- }
- }
- }
-
- TUnitedWorkers::~TUnitedWorkers() {
- }
-
- void TUnitedWorkers::Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders) {
- // Setup allocated cpus
- // NOTE: leave gaps for not allocated cpus (default-initialized)
- TWorkerId workers = 0;
- for (TCpuId cpuId = 0; cpuId < CpuCount; cpuId++) {
- TCpu& cpu = Cpus[cpuId];
-
- // Setup cpu-local workers
- if (cpu.LocalManager) {
- for (i16 i = 0; i < cpu.LocalManager->WorkerCount(); i++) {
- TWorkerId workerId = workers++;
- cpu.LocalManager->AddWorker(workerId);
-
- // Setup worker
- Y_ABORT_UNLESS(workerId < WorkerCount);
- Workers[workerId].Thread.Reset(new TExecutorThread(
- workerId,
- cpu.CpuId,
- actorSystem,
- cpu.ExecutorPool.Get(), // use cpu-local manager as proxy executor for all workers on cpu
- nullptr, // MailboxTable is pool-specific, will be set on pool switch
- cpu.ExecutorPool->GetName()));
- // NOTE: TWorker::ThreadId will be initialized after in Start()
-
- scheduleReaders.push_back(&Workers[workerId].SchedulerQueue.Reader);
- }
- }
- }
- }
-
- void TUnitedWorkers::Start() {
- for (TWorkerId workerId = 0; workerId < WorkerCount; workerId++) {
- Workers[workerId].Thread->Start();
- }
- for (TWorkerId workerId = 0; workerId < WorkerCount; workerId++) {
- AtomicStore(&Workers[workerId].ThreadId, Workers[workerId].Thread->GetThreadId());
- }
- }
-
- inline TThreadId TUnitedWorkers::GetWorkerThreadId(TWorkerId workerId) const {
- volatile TThreadId* threadId = &Workers[workerId].ThreadId;
-#ifdef _linux_
- while (AtomicLoad(threadId) == UnknownThreadId) {
- NanoSleep(1000);
- }
-#endif
- return AtomicLoad(threadId);
- }
-
- inline NSchedulerQueue::TWriter* TUnitedWorkers::GetScheduleWriter(TWorkerId workerId) const {
- return &Workers[workerId].SchedulerQueue.Writer;
- }
-
- void TUnitedWorkers::SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable) {
- Pools[pool].ExecutorPool = executorPool;
- Pools[pool].MailboxTable = mailboxTable;
- }
-
- void TUnitedWorkers::PrepareStop() {
- AtomicStore(&StopFlag, true);
- for (TPoolId pool = 0; pool < PoolCount; pool++) {
- Pools[pool].Stop();
- }
- for (TCpuId cpuId = 0; cpuId < CpuCount; cpuId++) {
- Cpus[cpuId].Stop();
- }
- }
-
- void TUnitedWorkers::Shutdown() {
- for (TWorkerId workerId = 0; workerId < WorkerCount; workerId++) {
- Workers[workerId].Thread->Join();
- }
- }
-
- inline void TUnitedWorkers::PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter) {
- if (Pools[pool].PushActivation(activation, revolvingCounter)) { // token generated
- TryWake(pool);
- }
- }
-
- inline bool TUnitedWorkers::TryAcquireToken(TPoolId pool) {
- return Pools[pool].TryAcquireToken();
- }
-
- inline void TUnitedWorkers::TryWake(TPoolId pool) {
- // Avoid using multiple atomic seq_cst loads in cycle, use barrier once
- AtomicBarrier();
-
- // Scan every allowed cpu in pool's wakeup order and try to wake the first idle cpu
- if (RelaxedLoad(&Pools[pool].Waiters) > 0) {
- for (TCpu* cpu : Pools[pool].WakeOrderCpus) {
- if (cpu->State.WakeWithoutToken(pool) == TCpuState::Woken) {
- return; // successful wake up
- }
- }
- }
-
- // Cpu has not been woken up
- }
-
- inline void TUnitedWorkers::BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) {
- Pools[pool].BeginExecution(activation, revolvingCounter);
- }
-
- inline bool TUnitedWorkers::NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) {
- return Pools[pool].NextExecution(activation, revolvingCounter);
- }
-
- inline void TUnitedWorkers::StopExecution(TPoolId pool) {
- if (Pools[pool].StopExecution()) { // pending token
- TryWake(pool);
- }
- }
-
- inline void TUnitedWorkers::Balance() {
- ui64 ts = GetCycleCountFast();
- if (Balancer->TryLock(ts)) {
- for (TPoolId pool = 0; pool < PoolCount; pool++) {
- if (Pools[pool].IsUnited()) {
- ui64 ElapsedTs = 0;
- ui64 ParkedTs = 0;
- TStackVec<TCpuLoadLog<1024>*, 128> logs;
- ui64 worstActivationTimeUs = 0;
- for (TCpu* cpu : Pools[pool].WakeOrderCpus) {
- TExecutorThreadStats& cpuStats = cpu->PoolStats[pool];
- ElapsedTs += cpuStats.ElapsedTicks;
- ParkedTs += cpuStats.ParkedTicks;
- worstActivationTimeUs = Max(worstActivationTimeUs, cpuStats.WorstActivationTimeUs);
- AtomicStore<decltype(cpuStats.WorstActivationTimeUs)>(&cpuStats.WorstActivationTimeUs, 0ul);
- logs.push_back(&cpu->LoadLog);
- }
- ui64 minPeriodTs = Min(ui64(Us2Ts(Balancer->GetPeriodUs())), ui64((1024ull-2ull)*64ull*128ull*1024ull));
- ui64 estimatedTs = MinusOneCpuEstimator.MaxLatencyIncreaseWithOneLessCpu(
- &logs[0], logs.size(), ts, minPeriodTs);
- TBalancerStats stats;
- stats.Ts = ts;
- stats.CpuUs = Ts2Us(ElapsedTs);
- stats.IdleUs = Ts2Us(ParkedTs);
- stats.ExpectedLatencyIncreaseUs = Ts2Us(estimatedTs);
- stats.WorstActivationTimeUs = worstActivationTimeUs;
- Balancer->SetPoolStats(pool, stats);
- }
- }
- Balancer->Balance();
- Balancer->Unlock();
- }
- }
-
- inline TPoolId TUnitedWorkers::AssignedPool(TWorkerContext& wctx) {
- return Cpus[wctx.CpuId].State.AssignedPool();
- }
-
- inline bool TUnitedWorkers::IsPoolReassigned(TWorkerContext& wctx) {
- return Cpus[wctx.CpuId].State.IsPoolReassigned(wctx.PoolId);
- }
-
- inline void TUnitedWorkers::SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs) {
- Pools[wctx.PoolId].Switch(wctx, softDeadlineTs, Cpus[wctx.CpuId].PoolStats[wctx.PoolId]);
- Cpus[wctx.CpuId].SwitchPool(wctx.PoolId);
- }
-
- TPoolId TUnitedWorkers::Idle(TPoolId assigned, TWorkerContext& wctx) {
- wctx.SwitchToIdle();
-
- TPoolId result;
- TTimeTracker timeTracker;
- TCpu& cpu = Cpus[wctx.CpuId];
- TPool* assignedPool = assigned == CpuShared ? nullptr : &Pools[assigned];
- TCpu::TScopedWaiters scopedWaiters(cpu, assignedPool);
- while (true) {
- if (cpu.StartSpinning(this, assignedPool, result)) {
- break; // token already acquired (or stop)
- }
- result = WaitSequence(cpu, wctx, timeTracker);
- if (Y_UNLIKELY(result == CpuStopped) || TryAcquireToken(result)) {
- break; // token acquired (or stop)
- }
- }
-
- wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed());
- return result;
- }
-
- TPoolId TUnitedWorkers::WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker) {
- TPoolId result;
- if (cpu.ActiveWait(Us2Ts(Config.SpinThresholdUs), result)) {
- wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed());
- return result;
- }
- if (cpu.StartBlocking(result)) {
- wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed());
- return result;
- }
- wctx.AddElapsedCycles(ActorSystemIndex, timeTracker.Elapsed());
- cpu.LoadLog.RegisterBusyPeriod(GetCycleCountFast());
- bool wakeup;
- do {
- wakeup = cpu.BlockedWait(result, Config.Balancer.PeriodUs * 1000);
- wctx.AddParkedCycles(timeTracker.Elapsed());
- } while (!wakeup);
- cpu.LoadLog.RegisterIdlePeriod(GetCycleCountFast());
- return result;
- }
-
- void TUnitedWorkers::GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const {
- size_t idx = 1;
- statsCopy.resize(idx + Pools[pool].WakeOrderCpus.size());
- for (TCpu* cpu : Pools[pool].WakeOrderCpus) {
- TExecutorThreadStats& s = statsCopy[idx++];
- s = TExecutorThreadStats();
- s.Aggregate(cpu->PoolStats[pool]);
- }
- }
-
- TUnitedExecutorPool::TUnitedExecutorPool(const TUnitedExecutorPoolConfig& cfg, TUnitedWorkers* united)
- : TExecutorPoolBaseMailboxed(cfg.PoolId)
- , United(united)
- , PoolName(cfg.PoolName)
- {
- United->SetupPool(TPoolId(cfg.PoolId), this, MailboxTable.Get());
- }
-
- void TUnitedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
- ActorSystem = actorSystem;
-
- // Schedule readers are initialized through TUnitedWorkers::Prepare
- *scheduleReaders = nullptr;
- *scheduleSz = 0;
- }
-
- void TUnitedExecutorPool::Start() {
- // workers are actually started in TUnitedWorkers::Start()
- }
-
- void TUnitedExecutorPool::PrepareStop() {
- }
-
- void TUnitedExecutorPool::Shutdown() {
- // workers are actually joined in TUnitedWorkers::Shutdown()
- }
-
- TAffinity* TUnitedExecutorPool::Affinity() const {
- Y_ABORT(); // should never be called, TCpuExecutorPool is used instead
- }
-
- ui32 TUnitedExecutorPool::GetThreads() const {
- return 0;
- }
-
- ui32 TUnitedExecutorPool::GetReadyActivation(TWorkerContext&, ui64) {
- Y_ABORT(); // should never be called, TCpu*ExecutorPool is used instead
- }
-
- inline void TUnitedExecutorPool::ScheduleActivation(ui32 activation) {
- TUnitedExecutorPool::ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
- }
-
- inline void TUnitedExecutorPool::SpecificScheduleActivation(ui32 activation) {
- TUnitedExecutorPool::ScheduleActivation(activation);
- }
-
- inline void TUnitedExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
- United->PushActivation(PoolId, activation, revolvingCounter);
- }
-
- void TUnitedExecutorPool::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
- TUnitedExecutorPool::Schedule(deadline - ActorSystem->Timestamp(), ev, cookie, workerId);
- }
-
- void TUnitedExecutorPool::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
- Y_DEBUG_ABORT_UNLESS(workerId < United->GetWorkerCount());
- const auto current = ActorSystem->Monotonic();
- if (deadline < current) {
- deadline = current;
- }
- United->GetScheduleWriter(workerId)->Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
- void TUnitedExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
- Y_DEBUG_ABORT_UNLESS(workerId < United->GetWorkerCount());
- const auto deadline = ActorSystem->Monotonic() + delta;
- United->GetScheduleWriter(workerId)->Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
- void TUnitedExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
- Y_UNUSED(poolStats);
- if (statsCopy.empty()) {
- statsCopy.resize(1);
- }
- statsCopy[0] = TExecutorThreadStats();
- statsCopy[0].Aggregate(Stats);
- United->GetCurrentStats(PoolId, statsCopy);
- }
-}
diff --git a/ydb/library/actors/core/executor_pool_united.h b/ydb/library/actors/core/executor_pool_united.h
deleted file mode 100644
index cd38162769..0000000000
--- a/ydb/library/actors/core/executor_pool_united.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#pragma once
-
-#include "actorsystem.h"
-#include "balancer.h"
-#include "scheduler_queue.h"
-#include "executor_pool_base.h"
-
-#include <ydb/library/actors/util/unordered_cache.h>
-
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <ydb/library/actors/util/cpu_load_log.h>
-#include <ydb/library/actors/util/unordered_cache.h>
-#include <library/cpp/containers/stack_vector/stack_vec.h>
-
-#include <util/generic/noncopyable.h>
-
-namespace NActors {
- class TMailboxTable;
-
- class TUnitedExecutorPool: public TExecutorPoolBaseMailboxed {
- TUnitedWorkers* United;
- const TString PoolName;
- TAtomic ActivationsRevolvingCounter = 0;
- public:
- TUnitedExecutorPool(const TUnitedExecutorPoolConfig& cfg, TUnitedWorkers* united);
-
- void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
- void Start() override;
- void PrepareStop() override;
- void Shutdown() override;
-
- TAffinity* Affinity() const override;
- ui32 GetThreads() const override;
- ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
- void ScheduleActivation(ui32 activation) override;
- void SpecificScheduleActivation(ui32 activation) override;
- void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override;
- void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
- void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
- void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
-
- void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
-
- TString GetName() const override {
- return PoolName;
- }
- };
-}
diff --git a/ydb/library/actors/core/executor_pool_united_ut.cpp b/ydb/library/actors/core/executor_pool_united_ut.cpp
deleted file mode 100644
index 16c3a49711..0000000000
--- a/ydb/library/actors/core/executor_pool_united_ut.cpp
+++ /dev/null
@@ -1,341 +0,0 @@
-#include "actorsystem.h"
-#include "executor_pool_basic.h"
-#include "hfunc.h"
-#include "scheduler_basic.h"
-
-#include <ydb/library/actors/util/should_continue.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-#include <ydb/library/actors/protos/unittests.pb.h>
-
-using namespace NActors;
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg");
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-inline ui64 DoTimedWork(ui64 workUs) {
- ui64 startUs = ThreadCPUTime();
- ui64 endUs = startUs + workUs;
- ui64 nowUs = startUs;
- do {
- ui64 endTs = GetCycleCountFast() + Us2Ts(endUs - nowUs);
- while (GetCycleCountFast() <= endTs) {}
- nowUs = ThreadCPUTime();
- } while (nowUs <= endUs);
- return nowUs - startUs;
-}
-
-class TTestSenderActor : public IActorCallback {
-private:
- using EActivityType = IActor::EActivityType ;
- using EActorActivity = IActor::EActorActivity;
-
-private:
- TAtomic Counter;
- TActorId Receiver;
-
- std::function<void(void)> Action;
-
-public:
- TTestSenderActor(std::function<void(void)> action = [](){},
- EActivityType activityType = EActorActivity::OTHER)
- : IActorCallback(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType)
- , Action(action)
- {}
-
- void Start(TActorId receiver, size_t count) {
- AtomicSet(Counter, count);
- Receiver = receiver;
- }
-
- void Stop() {
- while (true) {
- if (GetCounter() == 0) {
- break;
- }
-
- Sleep(TDuration::MilliSeconds(1));
- }
- }
-
- size_t GetCounter() const {
- return AtomicGet(Counter);
- }
-
-private:
- STFUNC(Execute) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvMsg, Handle);
- }
- }
-
- void Handle(TEvMsg::TPtr &ev) {
- Y_UNUSED(ev);
- Action();
- TAtomicBase count = AtomicDecrement(Counter);
- Y_ABORT_UNLESS(count != Max<TAtomicBase>());
- if (count) {
- Send(Receiver, new TEvMsg());
- }
- }
-};
-
-// Single cpu balancer that switches pool on every activation; not thread-safe
-struct TRoundRobinBalancer: public IBalancer {
- TCpuState* State;
- TMap<TPoolId, TPoolId> NextPool;
-
- bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override {
- State = cpu;
- TPoolId prev = cpuAlloc.AllowedPools.rbegin()->PoolId;
- for (auto& p : cpuAlloc.AllowedPools) {
- NextPool[prev] = p.PoolId;
- prev = p.PoolId;
- }
- return true;
- }
-
- bool TryLock(ui64) override { return true; }
- void SetPoolStats(TPoolId, const TBalancerStats&) override {}
- void Unlock() override {}
-
- void Balance() override {
- TPoolId assigned;
- TPoolId current;
- State->Load(assigned, current);
- State->AssignPool(NextPool[assigned]);
- }
-
- ui64 GetPeriodUs() override {
- return 1000;
- }
-};
-
-void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) {
- TUnitedExecutorPoolConfig united;
- united.PoolId = setup->GetExecutorsCount();
- united.Concurrency = concurrency;
- setup->CpuManager.United.emplace_back(std::move(united));
-}
-
-THolder<TActorSystemSetup> GetActorSystemSetup(ui32 cpuCount) {
- auto setup = MakeHolder<NActors::TActorSystemSetup>();
- setup->NodeId = 1;
- setup->CpuManager.UnitedWorkers.CpuCount = cpuCount;
- setup->CpuManager.UnitedWorkers.NoRealtime = true; // unavailable in test environment
- setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
- return setup;
-}
-
-Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
-
-#ifdef _linux_
-
- Y_UNIT_TEST(OnePoolManyCpus) {
- const size_t msgCount = 1e4;
- auto setup = GetActorSystemSetup(4);
- AddUnitedPool(setup);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- auto actor = new TTestSenderActor();
- auto actorId = actorSystem.Register(actor);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
-
- while (actor->GetCounter()) {
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
-
- Sleep(TDuration::MilliSeconds(1));
- }
-
- TVector<TExecutorThreadStats> stats;
- TExecutorPoolStats poolStats;
- actorSystem.GetPoolStats(0, poolStats, stats);
- // Sum all per-thread counters into the 0th element
- for (ui32 idx = 1; idx < stats.size(); ++idx) {
- stats[0].Aggregate(stats[idx]);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(stats[0].SentEvents, msgCount - 1);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero
- UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
- UNIT_ASSERT(stats[0].ElapsedTicks > 0);
- //UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools
- UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
- UNIT_ASSERT(stats[0].ActivationTimeHistogram.TotalSamples >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].EventDeliveryTimeHistogram.TotalSamples, msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].EventProcessingCountHistogram.TotalSamples, msgCount);
- UNIT_ASSERT(stats[0].EventProcessingTimeHistogram.TotalSamples > 0);
- UNIT_ASSERT(stats[0].ElapsedTicksByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()] > 0);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ActorsAliveByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 1);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ScheduledEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 0);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolDestroyedActors, 0);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolAllocatedMailboxes, 4095); // one line
- UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount + stats[0].MailboxPushedOutBySoftPreemption >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
- }
-
- Y_UNIT_TEST(ManyPoolsOneSharedCpu) {
- const size_t msgCount = 1e4;
- const size_t pools = 4;
- auto setup = GetActorSystemSetup(1);
- for (size_t pool = 0; pool < pools; pool++) {
- AddUnitedPool(setup);
- }
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- TVector<TTestSenderActor*> actors;
- for (size_t pool = 0; pool < pools; pool++) {
- auto actor = new TTestSenderActor();
- auto actorId = actorSystem.Register(actor, TMailboxType::HTSwap, pool);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
- actors.push_back(actor);
- }
-
- while (true) {
- size_t left = 0;
- for (auto actor : actors) {
- left += actor->GetCounter();
- }
- if (left == 0) {
- break;
- }
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left);
- Sleep(TDuration::MilliSeconds(1));
- }
-
- for (size_t pool = 0; pool < pools; pool++) {
- TVector<TExecutorThreadStats> stats;
- TExecutorPoolStats poolStats;
- actorSystem.GetPoolStats(pool, poolStats, stats);
- // Sum all per-thread counters into the 0th element
- for (ui32 idx = 1; idx < stats.size(); ++idx) {
- stats[0].Aggregate(stats[idx]);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
- }
- }
-
- Y_UNIT_TEST(ManyPoolsOneAssignedCpu) {
- const size_t msgCount = 1e4;
- const size_t pools = 4;
- auto setup = GetActorSystemSetup(1);
- setup->Balancer.Reset(new TRoundRobinBalancer());
- for (size_t pool = 0; pool < pools; pool++) {
- AddUnitedPool(setup);
- }
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- TVector<TTestSenderActor*> actors;
- for (size_t pool = 0; pool < pools; pool++) {
- auto actor = new TTestSenderActor();
- auto actorId = actorSystem.Register(actor, TMailboxType::HTSwap, pool);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
- actors.push_back(actor);
- }
-
- while (true) {
- size_t left = 0;
- for (auto actor : actors) {
- left += actor->GetCounter();
- }
- if (left == 0) {
- break;
- }
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left);
- Sleep(TDuration::MilliSeconds(1));
- }
-
- for (size_t pool = 0; pool < pools; pool++) {
- TVector<TExecutorThreadStats> stats;
- TExecutorPoolStats poolStats;
- actorSystem.GetPoolStats(pool, poolStats, stats);
- // Sum all per-thread counters into the 0th element
- for (ui32 idx = 1; idx < stats.size(); ++idx) {
- stats[0].Aggregate(stats[idx]);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
- }
- }
-
- Y_UNIT_TEST(ManyPoolsOneCpuSlowEvents) {
- const size_t msgCount = 3;
- const size_t pools = 4;
- auto setup = GetActorSystemSetup(1);
- for (size_t pool = 0; pool < pools; pool++) {
- AddUnitedPool(setup);
- }
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- TVector<TTestSenderActor*> actors;
- for (size_t pool = 0; pool < pools; pool++) {
- auto actor = new TTestSenderActor([]() {
- DoTimedWork(100'000);
- });
- auto actorId = actorSystem.Register(actor, TMailboxType::HTSwap, pool);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
- actors.push_back(actor);
- }
-
- while (true) {
- size_t left = 0;
- for (auto actor : actors) {
- left += actor->GetCounter();
- }
- if (left == 0) {
- break;
- }
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(15), "left " << left);
- Sleep(TDuration::MilliSeconds(1));
- }
-
- for (size_t pool = 0; pool < pools; pool++) {
- TVector<TExecutorThreadStats> stats;
- TExecutorPoolStats poolStats;
- actorSystem.GetPoolStats(pool, poolStats, stats);
- // Sum all per-thread counters into the 0th element
- for (ui32 idx = 1; idx < stats.size(); ++idx) {
- stats[0].Aggregate(stats[idx]);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, msgCount); // every 100ms event should be preempted
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
- }
- }
-
-#endif
-
-}
diff --git a/ydb/library/actors/core/executor_pool_united_workers.h b/ydb/library/actors/core/executor_pool_united_workers.h
deleted file mode 100644
index 7b57e3d8f9..0000000000
--- a/ydb/library/actors/core/executor_pool_united_workers.h
+++ /dev/null
@@ -1,105 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include "balancer.h"
-#include "scheduler_queue.h"
-
-#include <ydb/library/actors/actor_type/indexes.h>
-#include <ydb/library/actors/util/cpu_load_log.h>
-#include <ydb/library/actors/util/datetime.h>
-#include <util/generic/noncopyable.h>
-
-namespace NActors {
- class TActorSystem;
- class TMailboxTable;
-
- class TUnitedWorkers: public TNonCopyable {
- struct TWorker;
- struct TPool;
- struct TCpu;
-
- i16 WorkerCount;
- TArrayHolder<TWorker> Workers; // indexed by WorkerId
- size_t PoolCount;
- TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools
- size_t CpuCount;
- TArrayHolder<TCpu> Cpus; // indexed by CpuId, so may include not allocated CPUs
-
- IBalancer* Balancer; // external pool cpu balancer
-
- TUnitedWorkersConfig Config;
- TCpuAllocationConfig Allocation;
-
- volatile bool StopFlag = false;
- TMinusOneCpuEstimator<1024> MinusOneCpuEstimator;
- const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
- public:
- TUnitedWorkers(
- const TUnitedWorkersConfig& config,
- const TVector<TUnitedExecutorPoolConfig>& unitedPools,
- const TCpuAllocationConfig& allocation,
- IBalancer* balancer);
- ~TUnitedWorkers();
- void Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders);
- void Start();
- void PrepareStop();
- void Shutdown();
-
- bool IsStopped() const {
- return RelaxedLoad(&StopFlag);
- }
-
- TWorkerId GetWorkerCount() const {
- return WorkerCount;
- }
-
- // Returns thread id of a worker
- TThreadId GetWorkerThreadId(TWorkerId workerId) const;
-
- // Returns per worker schedule writers
- NSchedulerQueue::TWriter* GetScheduleWriter(TWorkerId workerId) const;
-
- // Sets executor for specified pool
- void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable);
-
- // Add activation of newly scheduled mailbox and wake cpu to execute it if required
- void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter);
-
- // Try acquire pending token. Must be done before execution
- bool TryAcquireToken(TPoolId pool);
-
- // Try to wake idle cpu waiting for tokens on specified pool
- void TryWake(TPoolId pool);
-
- // Get activation from pool; requires pool's token
- void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
-
- // Stop currently active execution and start new one if token is available
- // NOTE: Reuses token if it's not destroyed
- bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
-
- // Stop active execution
- void StopExecution(TPoolId pool);
-
- // Runs balancer to assign pools to cpus
- void Balance();
-
- // Returns pool to be executed by worker or `CpuShared`
- TPoolId AssignedPool(TWorkerContext& wctx);
-
- // Checks if balancer has assigned another pool for worker's cpu
- bool IsPoolReassigned(TWorkerContext& wctx);
-
- // Switch worker context into specified pool
- void SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs);
-
- // Wait for tokens from any pool allowed on specified cpu
- TPoolId Idle(TPoolId assigned, TWorkerContext& wctx);
-
- // Fill stats for specified pool
- void GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const;
-
- private:
- TPoolId WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker);
- };
-}
diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp
index fb1f968888..29d75ef0c0 100644
--- a/ydb/library/actors/core/executor_thread.cpp
+++ b/ydb/library/actors/core/executor_thread.cpp
@@ -43,7 +43,6 @@ namespace NActors {
, ExecutorPool(executorPool)
, Ctx(workerId, cpuId)
, ThreadName(threadName)
- , IsUnitedWorker(true)
, TimePerMailbox(timePerMailbox)
, EventsPerMailbox(eventsPerMailbox)
{
@@ -67,7 +66,6 @@ namespace NActors {
, AvailableExecutorPools(executorPools)
, Ctx(workerId, 0)
, ThreadName(threadName)
- , IsUnitedWorker(false)
, TimePerMailbox(timePerMailbox)
, EventsPerMailbox(eventsPerMailbox)
, SoftProcessingDurationTs(softProcessingDurationTs)
@@ -79,7 +77,7 @@ namespace NActors {
{ }
void TExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) {
- Y_DEBUG_ABORT_UNLESS(IsUnitedWorker || actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox);
+ Y_DEBUG_ABORT_UNLESS(actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox);
IActor* actor = mailbox->DetachActor(actorId.LocalId());
Ctx.DecrementActorsAliveByActivity(actor->GetActivityType());
DyingActors.push_back(THolder(actor));
diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h
index 83e323b4a8..79d4001b21 100644
--- a/ydb/library/actors/core/executor_thread.h
+++ b/ydb/library/actors/core/executor_thread.h
@@ -99,7 +99,6 @@ namespace NActors {
ui64 RevolvingWriteCounter = 0;
const TString ThreadName;
volatile TThreadId ThreadId = UnknownThreadId;
- bool IsUnitedWorker = false;
TDuration TimePerMailbox;
ui32 EventsPerMailbox;
diff --git a/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt b/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt
index 72ca15e72e..98e9cdcae1 100644
--- a/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/actors/core/ut/CMakeLists.darwin-arm64.txt
@@ -33,11 +33,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp
diff --git a/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt
index 784612191d..d20fbe3e95 100644
--- a/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/actors/core/ut/CMakeLists.darwin-x86_64.txt
@@ -34,11 +34,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp
diff --git a/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt b/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt
index 260aa2596f..e47e5555b1 100644
--- a/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/actors/core/ut/CMakeLists.linux-aarch64.txt
@@ -37,11 +37,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp
diff --git a/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt b/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt
index 4adb4cbe7e..4610ee7de3 100644
--- a/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/actors/core/ut/CMakeLists.linux-x86_64.txt
@@ -38,11 +38,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp
diff --git a/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt b/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt
index 56c30fa8ef..c3834bf41d 100644
--- a/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/actors/core/ut/CMakeLists.windows-x86_64.txt
@@ -27,11 +27,9 @@ target_sources(ydb-library-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/performance_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/ask_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/balancer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_payload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/event_pb_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_basic_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/actors/core/executor_pool_united_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/log_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/mon_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/actors/core/scheduler_actor_ut.cpp
diff --git a/ydb/library/actors/core/ut/ya.make b/ydb/library/actors/core/ut/ya.make
index 1af7984cce..e1a225e528 100644
--- a/ydb/library/actors/core/ut/ya.make
+++ b/ydb/library/actors/core/ut/ya.make
@@ -30,11 +30,9 @@ SRCS(
actorsystem_ut.cpp
performance_ut.cpp
ask_ut.cpp
- balancer_ut.cpp
event_pb_payload_ut.cpp
event_pb_ut.cpp
executor_pool_basic_ut.cpp
- executor_pool_united_ut.cpp
log_ut.cpp
mon_ut.cpp
scheduler_actor_ut.cpp
diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make
index 9f6a4b7b2b..f1d9abc3f8 100644
--- a/ydb/library/actors/core/ya.make
+++ b/ydb/library/actors/core/ya.make
@@ -27,8 +27,6 @@ SRCS(
ask.cpp
ask.h
av_bootstrapped.cpp
- balancer.h
- balancer.cpp
buffer.cpp
buffer.h
callstack.cpp
@@ -53,8 +51,6 @@ SRCS(
executor_pool_basic.h
executor_pool_io.cpp
executor_pool_io.h
- executor_pool_united.cpp
- executor_pool_united.h
executor_thread.cpp
executor_thread.h
harmonizer.cpp