diff options
author | kruall <kruall@ydb.tech> | 2024-01-11 14:15:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-11 14:15:52 +0300 |
commit | 57a192d4f253ebd3c1bcf84d61912f81ba3b99f0 (patch) | |
tree | cb47b151434145758b692d3b8004c0a9990f1853 | |
parent | e4b78b4a312ad6000d440e1601d184087c129da8 (diff) | |
download | ydb-57a192d4f253ebd3c1bcf84d61912f81ba3b99f0.tar.gz |
Add shared executor pool, KIKIMR-18440 (#933)
* Move some methods
* add shared executor pool
* Add config and improve wake ups
---------
Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
-rw-r--r-- | ydb/library/actors/core/config.h | 11 | ||||
-rw-r--r-- | ydb/library/actors/core/cpu_manager.cpp | 13 | ||||
-rw-r--r-- | ydb/library/actors/core/cpu_manager.h | 4 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool.h | 22 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.cpp | 43 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic_ut.cpp | 44 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_shared.cpp | 153 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_shared.h | 56 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.cpp | 40 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.h | 10 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread_ctx.h | 10 | ||||
-rw-r--r-- | ydb/library/actors/core/ya.make | 2 |
12 files changed, 339 insertions, 69 deletions
diff --git a/ydb/library/actors/core/config.h b/ydb/library/actors/core/config.h index d70a917847..4889206b0a 100644 --- a/ydb/library/actors/core/config.h +++ b/ydb/library/actors/core/config.h @@ -35,6 +35,16 @@ namespace NActors { i16 SharedExecutorsCount = 0; i16 SoftProcessingDurationTs = 0; EASProfile ActorSystemProfile = EASProfile::Default; + bool HasSharedThread = false; + }; + + struct TSharedExecutorPoolConfig { + ui32 Threads = 1; + ui64 SpinThreshold = 100; + TCpuMask Affinity; // Executor thread affinity + TDuration TimePerMailbox = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX; + ui32 EventsPerMailbox = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX; + i16 SoftProcessingDurationTs = 0; }; struct TIOExecutorPoolConfig { @@ -54,6 +64,7 @@ namespace NActors { TVector<TBasicExecutorPoolConfig> Basic; TVector<TIOExecutorPoolConfig> IO; TVector<TSelfPingInfo> PingInfoByPool; + TSharedExecutorPoolConfig Shared; ui32 GetExecutorsCount() const { return Basic.size() + IO.size(); diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 1898904bb3..7c0c2276b3 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -22,8 +22,17 @@ namespace NActors { TAffinity available; available.Current(); + std::vector<i16> poolsWithSharedThreads; + for (TBasicExecutorPoolConfig& cfg : Config.Basic) { + if (cfg.HasSharedThread) { + poolsWithSharedThreads.push_back(cfg.PoolId); + } + } + Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads)); + // auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get()); + ui64 ts = GetCycleCountFast(); - Harmonizer.Reset(MakeHarmonizer(ts)); + Harmonizer.reset(MakeHarmonizer(ts)); Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]); @@ -90,7 +99,7 @@ namespace NActors { IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) { for (TBasicExecutorPoolConfig& cfg : Config.Basic) { if (cfg.PoolId == poolId) { - return new TBasicExecutorPool(cfg, Harmonizer.Get()); + return new TBasicExecutorPool(cfg, Harmonizer.get()); } } for (TIOExecutorPoolConfig& cfg : Config.IO) { diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h index 29e86170c9..0fc5f1dbf2 100644 --- a/ydb/library/actors/core/cpu_manager.h +++ b/ydb/library/actors/core/cpu_manager.h @@ -2,6 +2,7 @@ #include "harmonizer.h" #include "executor_pool.h" +#include "executor_pool_shared.h" namespace NActors { struct TActorSystemSetup; @@ -9,7 +10,8 @@ namespace NActors { class TCpuManager : public TNonCopyable { const ui32 ExecutorPoolCount; TArrayHolder<TAutoPtr<IExecutorPool>> Executors; - THolder<IHarmonizer> Harmonizer; + std::unique_ptr<IHarmonizer> Harmonizer; + std::unique_ptr<TSharedExecutorPool> Shared; TCpuManagerConfig Config; public: diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h index 5498a6403e..854689346a 100644 --- a/ydb/library/actors/core/executor_pool.h +++ b/ydb/library/actors/core/executor_pool.h @@ -23,7 +23,20 @@ namespace NActors { } }; - class IExecutorPool : TNonCopyable { + struct IActorThreadPool : TNonCopyable { + + virtual ~IActorThreadPool() { + } + + // lifecycle stuff + virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0; + virtual void Start() = 0; + virtual void PrepareStop() = 0; + virtual void Shutdown() = 0; + virtual bool Cleanup() = 0; + }; + + class IExecutorPool : public IActorThreadPool { public: const ui32 PoolId; @@ -87,13 +100,6 @@ namespace NActors { virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0; virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0; - // lifecycle stuff - virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0; - virtual void Start() = 0; - virtual void PrepareStop() = 0; - virtual void Shutdown() = 0; - virtual bool Cleanup() = 0; - virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { // TODO: make pure virtual and override everywhere Y_UNUSED(poolStats); diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index e646b27e45..e7fd15fc82 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -4,6 +4,7 @@ #include "executor_thread_ctx.h" #include "probes.h" #include "mailbox.h" +#include <atomic> #include <ydb/library/actors/util/affinity.h> #include <ydb/library/actors/util/datetime.h> @@ -578,6 +579,14 @@ namespace NActors { } bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { + i64 requestsForWakeUp = RequestsForWakeUp.fetch_sub(1, std::memory_order_acq_rel); + if (requestsForWakeUp) { + if (requestsForWakeUp > 1) { + requestsForWakeUp--; + RequestsForWakeUp.compare_exchange_weak(requestsForWakeUp, 0, std::memory_order_acq_rel); + } + return false; + } EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin); Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state)); if (spinThresholdCycles > 0) { @@ -615,4 +624,38 @@ namespace NActors { return false; } + bool TSharedExecutorThreadCtx::WakeUp() { + i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel); + if (requestsForWakeUp >= 0) { + return false; + } + + for (;;) { + EThreadState state = GetState<EThreadState>(); + switch (state) { + case EThreadState::None: + case EThreadState::Work: + // TODO(kruall): check race + continue; + case EThreadState::Spin: + case EThreadState::Sleep: + if (ReplaceState<EThreadState>(state, EThreadState::None)) { + if (state == EThreadState::Sleep) { + ui64 beforeUnpark = GetCycleCountFast(); + StartWakingTs = beforeUnpark; + WaitingPad.Unpark(); + if (TlsThreadContext && TlsThreadContext->WaitingStats) { + TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark); + } + } + return true; + } + break; + default: + Y_ABORT(); + } + } + return false; + } + } diff --git a/ydb/library/actors/core/executor_pool_basic_ut.cpp b/ydb/library/actors/core/executor_pool_basic_ut.cpp index 754625c42b..76a868741e 100644 --- a/ydb/library/actors/core/executor_pool_basic_ut.cpp +++ b/ydb/library/actors/core/executor_pool_basic_ut.cpp @@ -432,7 +432,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { Y_UNIT_TEST(CheckStats) { const size_t size = 4; - const size_t msgCount = 1e4; + const size_t msgCount = 5e3; TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); auto setup = GetActorSystemSetup(executorPool); @@ -441,17 +441,23 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { auto begin = TInstant::Now(); - auto actor = new TTestSenderActor(); - auto actorId = actorSystem.Register(actor); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); + auto load = [&]() { + auto actor = new TTestSenderActor(); + auto actorId = actorSystem.Register(actor); + actor->Start(actor->SelfId(), msgCount); + actorSystem.Send(actorId, new TEvMsg()); - while (actor->GetCounter()) { - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); + while (actor->GetCounter()) { + auto now = TInstant::Now(); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); - Sleep(TDuration::MilliSeconds(1)); - } + Sleep(TDuration::MilliSeconds(1)); + } + }; + + load(); + Sleep(TDuration::MilliSeconds(10)); + load(); TVector<TExecutorThreadStats> stats; TExecutorPoolStats poolStats; @@ -461,8 +467,8 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { stats[0].Aggregate(stats[idx]); } - UNIT_ASSERT_VALUES_EQUAL(stats[0].SentEvents, msgCount - 1); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); + UNIT_ASSERT_VALUES_EQUAL(stats[0].SentEvents, 2 * msgCount - 2); + UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, 2 * msgCount); UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); @@ -470,18 +476,18 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT(stats[0].ElapsedTicks > 0); UNIT_ASSERT(stats[0].ParkedTicks > 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); - UNIT_ASSERT(stats[0].ActivationTimeHistogram.TotalSamples >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); - UNIT_ASSERT_VALUES_EQUAL(stats[0].EventDeliveryTimeHistogram.TotalSamples, msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].EventProcessingCountHistogram.TotalSamples, msgCount); + UNIT_ASSERT(stats[0].ActivationTimeHistogram.TotalSamples >= 2 * msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); + UNIT_ASSERT_VALUES_EQUAL(stats[0].EventDeliveryTimeHistogram.TotalSamples, 2 * msgCount); + UNIT_ASSERT_VALUES_EQUAL(stats[0].EventProcessingCountHistogram.TotalSamples, 2 * msgCount); UNIT_ASSERT(stats[0].EventProcessingTimeHistogram.TotalSamples > 0); UNIT_ASSERT(stats[0].ElapsedTicksByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()] > 0); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], msgCount); - UNIT_ASSERT_VALUES_EQUAL(stats[0].ActorsAliveByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 1); + UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 2 * msgCount); + UNIT_ASSERT_VALUES_EQUAL(stats[0].ActorsAliveByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 2); UNIT_ASSERT_VALUES_EQUAL(stats[0].ScheduledEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 0); - UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); + UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 2); UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolDestroyedActors, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolAllocatedMailboxes, 4095); // one line - UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); + UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount >= 2 * msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0); } } diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp new file mode 100644 index 0000000000..e9d40fdc71 --- /dev/null +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -0,0 +1,153 @@ +#include "executor_pool_shared.h" + +#include "actorsystem.h" +#include "executor_pool_basic.h" +#include "executor_thread.h" +#include "executor_thread_ctx.h" + +#include <atomic> +#include <ydb/library/actors/util/affinity.h> + + +namespace NActors { + +TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) + : ThreadByPool(poolCount, -1) + , PoolByThread(poolsWithThreads.size()) + , BorrowedThreadByPool(poolCount, -1) + , PoolByBorrowedThread(poolsWithThreads.size(), -1) + , Pools(poolCount) + , PoolCount(poolCount) + , SharedThreadCount(poolsWithThreads.size()) + , Threads(new TSharedExecutorThreadCtx[SharedThreadCount]) + , Timers(new TTimers[SharedThreadCount]) + , TimePerMailbox(config.TimePerMailbox) + , EventsPerMailbox(config.EventsPerMailbox) + , SoftProcessingDurationTs(config.SoftProcessingDurationTs) +{ + for (ui32 poolIdx = 0, threadIdx = 0; poolIdx < poolsWithThreads.size(); ++poolIdx, ++threadIdx) { + Y_ABORT_UNLESS(poolsWithThreads[poolIdx] < poolCount); + ThreadByPool[poolsWithThreads[poolIdx]] = threadIdx; + PoolByThread[threadIdx] = poolsWithThreads[poolIdx]; + } +} + +void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { + // ActorSystem = actorSystem; + + ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]); + ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]); + + std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools(); + std::vector<IExecutorPool*> poolByThread(SharedThreadCount); + for (IExecutorPool* pool : poolsBasic) { + Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool); + i16 threadIdx = ThreadByPool[pool->PoolId]; + if (threadIdx >= 0) { + poolByThread[threadIdx] = pool; + } + } + + for (i16 i = 0; i != SharedThreadCount; ++i) { + // !TODO + Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release); + Threads[i].Thread.Reset( + new TSharedExecutorThread( + -1, + actorSystem, + &Threads[i], + PoolCount, + "SharedThread", + SoftProcessingDurationTs, + TimePerMailbox, + EventsPerMailbox)); + ScheduleWriters[i].Init(ScheduleReaders[i]); + } + + *scheduleReaders = ScheduleReaders.get(); + *scheduleSz = SharedThreadCount; +} + +void TSharedExecutorPool::Start() { + //ThreadUtilization = 0; + //AtomicAdd(MaxUtilizationCounter, -(i64)GetCycleCountFast()); + + for (i16 i = 0; i != SharedThreadCount; ++i) { + Threads[i].Thread->Start(); + } +} + +void TSharedExecutorPool::PrepareStop() { + for (i16 i = 0; i != SharedThreadCount; ++i) { + Threads[i].Thread->StopFlag = true; + Threads[i].Interrupt(); + } +} + +void TSharedExecutorPool::Shutdown() { + for (i16 i = 0; i != SharedThreadCount; ++i) { + Threads[i].Thread->Join(); + } +} + +bool TSharedExecutorPool::Cleanup() { + return true; +} + +TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) { + i16 threadIdx = ThreadByPool[pool]; + if (threadIdx < 0 || threadIdx >= PoolCount) { + return nullptr; + } + return &Threads[threadIdx]; +} + +void TSharedExecutorPool::ReturnHalfThread(i16 pool) { + i16 threadIdx = ThreadByPool[pool]; + TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); + Y_ABORT_UNLESS(borrowingPool); + BorrowedThreadByPool[PoolByBorrowedThread[threadIdx]] = -1; + PoolByBorrowedThread[threadIdx] = -1; + + // TODO(kruall): add change in executor pool basic +} + +void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { + i16 threadIdx = ThreadByPool[from]; + TBasicExecutorPool* borrowingPool = Pools[to]; + Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release); + BorrowedThreadByPool[to] = threadIdx; + PoolByBorrowedThread[threadIdx] = to; + + // TODO(kruall): add change in executor pool basic +} + +void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) { + statsCopy.resize(SharedThreadCount + 1); + for (i16 i = 0; i < SharedThreadCount; ++i) { + Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]); + } +} + +TCpuConsumption TSharedExecutorPool::GetThreadCpuConsumption(i16 poolId, i16 threadIdx) { + if (threadIdx >= SharedThreadCount) { + return {0.0, 0.0}; + } + TExecutorThreadStats stats; + Threads[threadIdx].Thread->GetSharedStats(poolId, stats); + return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions}; +} + +std::vector<TCpuConsumption> TSharedExecutorPool::GetThreadsCpuConsumption(i16 poolId) { + std::vector<TCpuConsumption> result; + for (i16 threadIdx = 0; threadIdx < SharedThreadCount; ++threadIdx) { + result.push_back(GetThreadCpuConsumption(poolId, threadIdx)); + } + return result; +} + +i16 TSharedExecutorPool::GetSharedThreadCount() const { + return SharedThreadCount; +} + +}
\ No newline at end of file diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h new file mode 100644 index 0000000000..2f0eb60973 --- /dev/null +++ b/ydb/library/actors/core/executor_pool_shared.h @@ -0,0 +1,56 @@ +#pragma once + +#include "config.h" +#include "executor_pool.h" +#include "executor_thread_ctx.h" + + +namespace NActors { + + struct TExecutorThreadCtx; + class TBasicExecutorPool; + + class TSharedExecutorPool: public IActorThreadPool { + public: + TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads); + + // IThreadPool + void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; + void Start() override; + void PrepareStop() override; + void Shutdown() override; + bool Cleanup() override; + + TSharedExecutorThreadCtx *GetSharedThread(i16 poolId); + void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy); + TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx); + std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId); + + void ReturnHalfThread(i16 pool); + void GiveHalfThread(i16 from, i16 to); + + i16 GetSharedThreadCount() const; + + + private: + std::vector<i16> ThreadByPool; + std::vector<i16> PoolByThread; + std::vector<i16> BorrowedThreadByPool; + std::vector<i16> PoolByBorrowedThread; + + std::vector<TBasicExecutorPool*> Pools; + + i16 PoolCount; + i16 SharedThreadCount; + std::unique_ptr<TSharedExecutorThreadCtx[]> Threads; + std::unique_ptr<TTimers[]> Timers; + + std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders; + std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters; + + TDuration TimePerMailbox; + ui32 EventsPerMailbox; + ui64 SoftProcessingDurationTs; + }; + +}
\ No newline at end of file diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index 0b24398ac6..d20e6482d2 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -10,6 +10,7 @@ #include "executor_thread_ctx.h" #include "probes.h" +#include <atomic> #include <ydb/library/actors/prof/tag.h> #include <ydb/library/actors/util/affinity.h> #include <ydb/library/actors/util/datetime.h> @@ -92,7 +93,7 @@ namespace NActors { ui64 softProcessingDurationTs, TDuration timePerMailbox, ui32 eventsPerMailbox) - : TGenericExecutorThread(workerId, actorSystem, threadCtx->OwnerExecutorPool, poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox) + : TGenericExecutorThread(workerId, actorSystem, threadCtx->ExecutorPools[0].load(), poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox) , ThreadCtx(threadCtx) {} @@ -502,10 +503,6 @@ namespace NActors { return nullptr; } - void TSharedExecutorThread::UpdatePools() { - NeedToReloadPools = EState::NeedToReloadPools; - } - TGenericExecutorThread::TProcessingResult TSharedExecutorThread::ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool) { Ctx.Switch( pool, @@ -515,7 +512,7 @@ namespace NActors { GetCycleCountFast() + SoftProcessingDurationTs, &SharedStats[pool->PoolId]); Y_ABORT_UNLESS(Ctx.Stats->ElapsedTicksByActivity.size()); - Ctx.WorkerId = (pool == ThreadCtx->OwnerExecutorPool ? -1 : -2); + Ctx.WorkerId = (pool == ThreadCtx->ExecutorPools[0].load(std::memory_order_relaxed) ? -1 : -2); Y_ABORT_UNLESS(Ctx.Stats->ElapsedTicksByActivity.size()); return ProcessExecutorPool(pool); } @@ -536,33 +533,22 @@ namespace NActors { ::SetCurrentThreadName(ThreadName); } - if (!IsSharedThread) { - ProcessExecutorPool(ExecutorPool); - return nullptr; - } - - TExecutorPoolBaseMailboxed *ownerPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OwnerExecutorPool); - TExecutorPoolBaseMailboxed *otherPool = nullptr; - - - std::vector<TExecutorPoolBaseMailboxed*> pools; do { - if (NeedToReloadPools.load() == EState::NeedToReloadPools) { - // otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load()); - NeedToReloadPools = EState::Running; - } bool wasWorking = true; - while (wasWorking && NeedToReloadPools.load() == EState::Running && !StopFlag.load(std::memory_order_relaxed)) { + while (wasWorking && !StopFlag.load(std::memory_order_relaxed)) { wasWorking = false; - TProcessingResult result = ProcessSharedExecutorPool(ownerPool); - wasWorking |= result.WasWorking; - if (otherPool) { - result = ProcessSharedExecutorPool(otherPool); - wasWorking = result.WasWorking; + + for (ui32 poolIdx = 0; poolIdx < MaxPoolsForSharedThreads; ++poolIdx) { + TExecutorPoolBaseMailboxed *pool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->ExecutorPools[poolIdx].load(std::memory_order_acquire)); + if (!pool) { + break; + } + TProcessingResult result = ProcessSharedExecutorPool(pool); + wasWorking |= result.WasWorking; } } - if (!wasWorking && NeedToReloadPools.load() == EState::Running && !StopFlag.load(std::memory_order_relaxed)) { + if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) { TlsThreadContext->Timers.Reset(); ThreadCtx->Wait(0, &StopFlag); } diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h index 261a05ab2a..b9a339648b 100644 --- a/ydb/library/actors/core/executor_thread.h +++ b/ydb/library/actors/core/executor_thread.h @@ -119,7 +119,6 @@ namespace NActors { : TGenericExecutorThread(workerId, cpuId, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox) {} - TExecutorThread(TWorkerId workerId, TActorSystem* actorSystem, IExecutorPool* executorPool, @@ -138,11 +137,6 @@ namespace NActors { }; class TSharedExecutorThread: public TGenericExecutorThread { - enum class EState : ui64 { - Running = 0, - NeedToReloadPools, - }; - public: TSharedExecutorThread(TWorkerId workerId, TActorSystem* actorSystem, @@ -156,15 +150,11 @@ namespace NActors { virtual ~TSharedExecutorThread() {} - void UpdatePools(); - private: TProcessingResult ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool); void* ThreadProc(); - std::atomic<EState> NeedToReloadPools = EState::NeedToReloadPools; - TSharedExecutorThreadCtx *ThreadCtx; }; diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index e9bcb500f6..3378571c91 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -142,7 +142,7 @@ namespace NActors { }; - constexpr ui32 MaxPoolsForSharedThreads = 4; + constexpr ui32 MaxPoolsForSharedThreads = 2; struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx { using TBase = TGenericExecutorThreadCtx; @@ -172,8 +172,8 @@ namespace NActors { } }; - TBasicExecutorPool *OwnerExecutorPool = nullptr; std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads]; + std::atomic<i64> RequestsForWakeUp = 0; ui32 NextPool = 0; void AfterWakeUp(TWaitState state) { @@ -190,6 +190,12 @@ namespace NActors { bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp + bool WakeUp(); + + void Interrupt() { + WaitingPad.Interrupt(); + } + TSharedExecutorThreadCtx() = default; }; diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make index f1d9abc3f8..a86b8e3297 100644 --- a/ydb/library/actors/core/ya.make +++ b/ydb/library/actors/core/ya.make @@ -51,6 +51,8 @@ SRCS( executor_pool_basic.h executor_pool_io.cpp executor_pool_io.h + executor_pool_shared.cpp + executor_pool_shared.h executor_thread.cpp executor_thread.h harmonizer.cpp |