diff options
author | kruall <kruall@ydb.tech> | 2024-01-24 12:32:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-24 12:32:57 +0300 |
commit | ba1fea3e79ac91decac8e328880e85156152e2c3 (patch) | |
tree | f71653ddc4df23727e77583aa9d8b302f9c9a066 | |
parent | 902255aefe691f65f89ad22d6df31ed970b45ab7 (diff) | |
download | ydb-ba1fea3e79ac91decac8e328880e85156152e2c3.tar.gz |
Add shared logic to executor pool (#1236)
-rw-r--r-- | ydb/library/actors/core/actor_benchmark_helper.h | 7 | ||||
-rw-r--r-- | ydb/library/actors/core/actor_ut.cpp | 99 | ||||
-rw-r--r-- | ydb/library/actors/core/cpu_manager.cpp | 40 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.cpp | 112 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.h | 11 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_shared.cpp | 8 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.cpp | 4 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread_ctx.h | 14 | ||||
-rw-r--r-- | ydb/library/actors/core/worker_context.h | 4 |
9 files changed, 242 insertions, 57 deletions
diff --git a/ydb/library/actors/core/actor_benchmark_helper.h b/ydb/library/actors/core/actor_benchmark_helper.h index 074295840e..e1b80e9475 100644 --- a/ydb/library/actors/core/actor_benchmark_helper.h +++ b/ydb/library/actors/core/actor_benchmark_helper.h @@ -138,6 +138,9 @@ struct TActorBenchmark { , InFlight(params.InFlight) {} + ~TSendReceiveActor() { + } + void StoreCounters(std::vector<NThreading::TPadded<std::atomic<ui64>>> &dest) { for (ui32 idx = 0; idx < dest.size(); ++idx) { dest[idx].store(SharedCounters->Counters[idx]); @@ -258,14 +261,14 @@ struct TActorBenchmark { ui32 ReceiveTurn = 0; }; - static void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, i16 sharedExecutorsCount) { + static void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, bool hasSharedThread) { TBasicExecutorPoolConfig basic; basic.PoolId = setup->GetExecutorsCount(); basic.PoolName = TStringBuilder() << "b" << basic.PoolId; basic.Threads = threads; basic.SpinThreshold = TSettings::DefaultSpinThreshold; basic.TimePerMailbox = TDuration::Hours(1); - basic.SharedExecutorsCount = sharedExecutorsCount; + basic.HasSharedThread = hasSharedThread; basic.SoftProcessingDurationTs = Us2Ts(100); if (activateEveryEvent) { basic.EventsPerMailbox = 1; diff --git a/ydb/library/actors/core/actor_ut.cpp b/ydb/library/actors/core/actor_ut.cpp index 0b35ae0f8f..0c69d7ce29 100644 --- a/ydb/library/actors/core/actor_ut.cpp +++ b/ydb/library/actors/core/actor_ut.cpp @@ -24,11 +24,9 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { using TSettings = TActorBenchmark::TSettings; using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams; - Y_UNIT_TEST(WithSharedExecutors) { - return; + Y_UNIT_TEST(WithOnlyOneSharedExecutors) { THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(); - TActorBenchmark::AddBasicPool(setup, 2, 1, 0); - TActorBenchmark::AddBasicPool(setup, 2, 1, 1); + TActorBenchmark::AddBasicPool(setup, 1, 1, true); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -37,12 +35,51 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TAtomic actorsAlive = 0; THPTimer Timer; - ui64 eventsPerPair = TSettings::TotalEventsAmountPerThread * 4 / 60; + ui64 eventsPerPair = 1000; Timer.Reset(); - for (ui32 i = 0; i < 50; ++i) { - ui32 followerPoolId = 0; - ui32 leaderPoolId = 0; + ui32 followerPoolId = 0; + ui32 leaderPoolId = 0; + TActorId followerId = actorSystem.Register( + new TActorBenchmark::TSendReceiveActor( + TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true} + ), + TMailboxType::HTSwap, + followerPoolId + ); + THolder<IActor> leader{ + new TTestEndDecorator(THolder(new TActorBenchmark::TSendReceiveActor( + TSendReceiveActorParams{.OwnEvents=eventsPerPair / 2, .Receivers={followerId}, .Allocation=true} + )), + &pad, + &actorsAlive) + }; + actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); + + pad.Park(); + auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4); + actorSystem.Stop(); + + Cerr << "Completed " << 1e9 * elapsedTime << Endl; + } + + Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) { + THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(); + TActorBenchmark::AddBasicPool(setup, 2, true, true); + + TActorSystem actorSystem(setup); + actorSystem.Start(); + + TThreadParkPad pad; + TAtomic actorsAlive = 0; + THPTimer Timer; + + ui64 eventsPerPair = 1000; + + Timer.Reset(); + ui32 followerPoolId = 0; + ui32 leaderPoolId = 0; + for (ui32 idx = 0; idx < 50; ++idx) { TActorId followerId = actorSystem.Register( new TActorBenchmark::TSendReceiveActor( TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true} @@ -59,6 +96,50 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }; actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); } + + pad.Park(); + auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4); + actorSystem.Stop(); + + Cerr << "Completed " << 1e9 * elapsedTime << Endl; + } + + Y_UNIT_TEST(WithSharedExecutors) { + THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(); + TActorBenchmark::AddBasicPool(setup, 2, 1, false); + TActorBenchmark::AddBasicPool(setup, 2, 1, true); + + TActorSystem actorSystem(setup); + actorSystem.Start(); + + TThreadParkPad pad; + TAtomic actorsAlive = 0; + THPTimer Timer; + + ui64 eventsPerPair = TSettings::TotalEventsAmountPerThread * 4 / 60; + + Timer.Reset(); + for (ui32 i = 0; i < 50; ++i) { + ui32 followerPoolId = 0; + ui32 leaderPoolId = 0; + TActorId followerId = actorSystem.Register( + new TActorBenchmark::TSendReceiveActor( + TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true} + ), + TMailboxType::HTSwap, + followerPoolId + ); + THolder<IActor> leader{ + new TTestEndDecorator( + THolder(new TActorBenchmark::TSendReceiveActor( + TSendReceiveActorParams{.OwnEvents=eventsPerPair / 2, .Receivers={followerId}, .Allocation=true} + )), + &pad, + &actorsAlive + ) + }; + actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); + } for (ui32 i = 0; i < 10; ++i) { ui32 followerPoolId = 1; ui32 leaderPoolId = 1; @@ -82,7 +163,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } pad.Park(); - auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4); + auto elapsedTime = Timer.Passed() / (4 * TSettings::TotalEventsAmountPerThread); actorSystem.Stop(); Cerr << "Completed " << 1e9 * elapsedTime << Endl; diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 7c0c2276b3..039625fd29 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -47,8 +47,15 @@ namespace NActors { } void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) { + NSchedulerQueue::TReader* readers; + ui32 readersCount = 0; + if (Shared) { + Shared->Prepare(actorSystem, &readers, &readersCount); + for (ui32 i = 0; i != readersCount; ++i, ++readers) { + scheduleReaders.push_back(readers); + } + } for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { - NSchedulerQueue::TReader* readers; ui32 readersCount = 0; Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount); for (ui32 i = 0; i != readersCount; ++i, ++readers) { @@ -58,6 +65,9 @@ namespace NActors { } void TCpuManager::Start() { + if (Shared) { + Shared->Start(); + } for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx]->Start(); } @@ -67,6 +77,9 @@ namespace NActors { for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx]->PrepareStop(); } + if (Shared) { + Shared->PrepareStop(); + } } void TCpuManager::Shutdown() { @@ -81,6 +94,10 @@ namespace NActors { } } } + if (Shared) { + Shared->Shutdown(); + Shared->Cleanup(); + } } void TCpuManager::Cleanup() { @@ -93,13 +110,32 @@ namespace NActors { } } } + if (Shared) { + Shared->Cleanup(); + } Executors.Destroy(); + if (Shared) { + Shared.reset(); + } } IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) { for (TBasicExecutorPoolConfig& cfg : Config.Basic) { if (cfg.PoolId == poolId) { - return new TBasicExecutorPool(cfg, Harmonizer.get()); + if (cfg.HasSharedThread) { + cfg.Threads -= 1; + if (cfg.MaxThreadCount) { + cfg.MaxThreadCount -= 1; + } + auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get()); + auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get()); + if (pool) { + pool->AddSharedThread(sharedPool->GetSharedThread(poolId)); + } + return pool; + } else { + return new TBasicExecutorPool(cfg, Harmonizer.get()); + } } } for (TIOExecutorPoolConfig& cfg : Config.IO) { diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index e7fd15fc82..4c6db5d609 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -4,6 +4,7 @@ #include "executor_thread_ctx.h" #include "probes.h" #include "mailbox.h" +#include "thread_context.h" #include <atomic> #include <ydb/library/actors/util/affinity.h> #include <ydb/library/actors/util/datetime.h> @@ -82,10 +83,10 @@ namespace NActors { DefaultThreadCount = limit; } - MaxThreadCount = Min(Max(MaxThreadCount, DefaultThreadCount), limit); + MaxThreadCount = Min(Max<i16>(MaxThreadCount, DefaultThreadCount), limit); if (MinThreadCount) { - MinThreadCount = Max((i16)1, Min(MinThreadCount, DefaultThreadCount)); + MinThreadCount = Max<i16>(0, Min(MinThreadCount, DefaultThreadCount)); } else { MinThreadCount = DefaultThreadCount; } @@ -170,12 +171,15 @@ namespace NActors { if (workerId >= 0) { Threads[workerId].UnsetWork(); + } else { + Y_ABORT_UNLESS(wctx.SharedThread); + wctx.SharedThread->UnsetWork(); } TAtomic x = AtomicGet(Semaphore); TSemaphore semaphore = TSemaphore::GetSemaphore(x); while (!StopFlag.load(std::memory_order_acquire)) { - if (!semaphore.OldSemaphore || semaphore.CurrentSleepThreadCount < 0) { + if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) { if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) { TlsThreadContext->Timers.HPNow = GetCycleCountFast(); wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart); @@ -194,6 +198,9 @@ namespace NActors { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { if (workerId >= 0) { Threads[workerId].SetWork(); + } else { + Y_ABORT_UNLESS(wctx.SharedThread); + wctx.SharedThread->SetWork(); } AtomicDecrement(Semaphore); TlsThreadContext->Timers.HPNow = GetCycleCountFast(); @@ -233,9 +240,6 @@ namespace NActors { ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { if constexpr (NFeatures::IsLocalQueues()) { - if (SharedExecutorsCount) { - return GetReadyActivationCommon(wctx, revolvingCounter); - } return GetReadyActivationLocalQueue(wctx, revolvingCounter); } else { return GetReadyActivationCommon(wctx, revolvingCounter); @@ -251,20 +255,38 @@ namespace NActors { } return; } - if (++i >= MaxThreadCount - SharedExecutorsCount) { + if (++i >= PoolThreads) { i = 0; } } } + bool TBasicExecutorPool::WakeUpLoopShared() { + for (ui32 idx = 0; idx < MaxSharedThreadsForPool; ++idx) { + TSharedExecutorThreadCtx *thread = SharedThreads[idx].load(std::memory_order_acquire); + if (!thread) { + return false; + } + if (thread->WakeUp()) { + return true; + } + } + return false; + } + void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) { TSemaphore semaphore = TSemaphore::GetSemaphore(x); Activations.Push(activation, revolvingCounter); bool needToWakeUp = false; + if (WakeUpLoopShared()) { + x = AtomicIncrement(Semaphore); + return; + } + do { - needToWakeUp = semaphore.CurrentSleepThreadCount > SharedExecutorsCount; + needToWakeUp = semaphore.CurrentSleepThreadCount > 0; i64 oldX = semaphore.ConvertToI64(); semaphore.OldSemaphore++; if (needToWakeUp) { @@ -329,7 +351,7 @@ namespace NActors { poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); poolStats.DefaultThreadCount = DefaultThreadCount; - poolStats.MaxThreadCount = MaxThreadCount; + poolStats.MaxThreadCount = PoolThreads; poolStats.SpinningTimeUs = Ts2Us(SpinningTimeUs); poolStats.SpinThresholdUs = Ts2Us(SpinThresholdCycles); if (Harmonizer) { @@ -371,18 +393,16 @@ namespace NActors { ScheduleWriters.Reset(new NSchedulerQueue::TWriter[PoolThreads]); for (i16 i = 0; i != PoolThreads; ++i) { - if (i < MaxThreadCount - SharedExecutorsCount) { - Threads[i].Thread.Reset( - new TExecutorThread( - i, - 0, // CpuId is not used in BASIC pool - actorSystem, - this, - MailboxTable.Get(), - PoolName, - TimePerMailbox, - EventsPerMailbox)); - } + Threads[i].Thread.Reset( + new TExecutorThread( + i, + 0, // CpuId is not used in BASIC pool + actorSystem, + this, + MailboxTable.Get(), + PoolName, + TimePerMailbox, + EventsPerMailbox)); ScheduleWriters[i].Init(ScheduleReaders[i]); } @@ -454,11 +474,11 @@ namespace NActors { } i16 TBasicExecutorPool::GetThreadCount() const { - return AtomicGet(ThreadCount); + return AtomicGet(ThreadCount) + SharedThreadsCount.load(std::memory_order_acquire); } void TBasicExecutorPool::SetThreadCount(i16 threads) { - threads = Max(i16(1), Min(PoolThreads, threads)); + threads = Max<i16>(0, Min(PoolThreads, threads)); with_lock (ChangeThreadsLock) { i16 prevCount = GetThreadCount(); AtomicSet(ThreadCount, threads); @@ -581,14 +601,10 @@ namespace NActors { bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { i64 requestsForWakeUp = RequestsForWakeUp.fetch_sub(1, std::memory_order_acq_rel); if (requestsForWakeUp) { - if (requestsForWakeUp > 1) { - requestsForWakeUp--; - RequestsForWakeUp.compare_exchange_weak(requestsForWakeUp, 0, std::memory_order_acq_rel); - } return false; } - EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin); - Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state)); + TWaitState state = ExchangeState<TWaitState>(EThreadState::Spin); + Y_ABORT_UNLESS(state.Flag == EThreadState::None, "WaitingFlag# %d", int(state.Flag)); if (spinThresholdCycles > 0) { // spin configured period Spin(spinThresholdCycles, stopFlag); @@ -625,22 +641,33 @@ namespace NActors { } bool TSharedExecutorThreadCtx::WakeUp() { - i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel); - if (requestsForWakeUp >= 0) { + i64 requestsForWakeUp = RequestsForWakeUp.load(std::memory_order_acquire); + if (requestsForWakeUp > 0) { return false; } + for (;;) { + if (RequestsForWakeUp.compare_exchange_strong(requestsForWakeUp, requestsForWakeUp + 1, std::memory_order_acquire)) { + if (requestsForWakeUp == -1) { + break; + } + return false; + } + if (requestsForWakeUp > 0) { + return false; + } + } for (;;) { - EThreadState state = GetState<EThreadState>(); - switch (state) { + TWaitState state = GetState<TWaitState>(); + switch (state.Flag) { case EThreadState::None: case EThreadState::Work: // TODO(kruall): check race continue; case EThreadState::Spin: case EThreadState::Sleep: - if (ReplaceState<EThreadState>(state, EThreadState::None)) { - if (state == EThreadState::Sleep) { + if (ReplaceState<TWaitState>(state, {EThreadState::None})) { + if (state.Flag == EThreadState::Sleep) { ui64 beforeUnpark = GetCycleCountFast(); StartWakingTs = beforeUnpark; WaitingPad.Unpark(); @@ -658,4 +685,19 @@ namespace NActors { return false; } + TSharedExecutorThreadCtx* TBasicExecutorPool::ReleaseSharedThread() { + ui64 count = SharedThreadsCount.fetch_sub(1, std::memory_order_acq_rel); + Y_ABORT_UNLESS(count); + auto *thread = SharedThreads[count - 1].exchange(nullptr); + Y_ABORT_UNLESS(thread); + return thread; + } + + void TBasicExecutorPool::AddSharedThread(TSharedExecutorThreadCtx* thread) { + ui64 count = SharedThreadsCount.fetch_add(1, std::memory_order_acq_rel); + Y_ABORT_UNLESS(count < MaxSharedThreadsForPool); + thread = SharedThreads[count].exchange(thread); + Y_ABORT_UNLESS(!thread); + } + } diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index 576c3b0b29..6872c08451 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -136,7 +136,6 @@ namespace NActors { TArrayHolder<TWaitingStats<double>> MovingWaitingStats; std::atomic<ui16> LocalQueueSize; - TArrayHolder<NSchedulerQueue::TReader> ScheduleReaders; TArrayHolder<NSchedulerQueue::TWriter> ScheduleWriters; @@ -160,11 +159,15 @@ namespace NActors { i16 MaxThreadCount; i16 DefaultThreadCount; IHarmonizer *Harmonizer; - i16 SharedExecutorsCount = 0; ui64 SoftProcessingDurationTs = 0; const i16 Priority = 0; const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex(); + + static constexpr ui64 MaxSharedThreadsForPool = 2; + NThreading::TPadded<std::atomic_uint64_t> SharedThreadsCount = 0; + NThreading::TPadded<std::atomic<TSharedExecutorThreadCtx*>> SharedThreads[MaxSharedThreadsForPool] = {nullptr, nullptr}; + public: struct TSemaphore { i64 OldSemaphore = 0; // 34 bits @@ -251,9 +254,13 @@ namespace NActors { void CalcSpinPerThread(ui64 wakingUpConsumption); void ClearWaitingStats() const; + TSharedExecutorThreadCtx* ReleaseSharedThread(); + void AddSharedThread(TSharedExecutorThreadCtx* thread); + private: void AskToGoToSleep(bool *needToWait, bool *needToBlock); void WakeUpLoop(i16 currentThreadCount); + bool WakeUpLoopShared(); }; } diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index e9d40fdc71..f9b769d404 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -108,8 +108,8 @@ void TSharedExecutorPool::ReturnHalfThread(i16 pool) { Y_ABORT_UNLESS(borrowingPool); BorrowedThreadByPool[PoolByBorrowedThread[threadIdx]] = -1; PoolByBorrowedThread[threadIdx] = -1; - - // TODO(kruall): add change in executor pool basic + // TODO(kruall): Check on race + borrowingPool->ReleaseSharedThread(); } void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { @@ -118,8 +118,8 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release); BorrowedThreadByPool[to] = threadIdx; PoolByBorrowedThread[threadIdx] = to; - - // TODO(kruall): add change in executor pool basic + // TODO(kruall): Check on race + borrowingPool->AddSharedThread(&Threads[threadIdx]); } void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) { diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index d20e6482d2..7262b2b4cf 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -95,7 +95,9 @@ namespace NActors { ui32 eventsPerMailbox) : TGenericExecutorThread(workerId, actorSystem, threadCtx->ExecutorPools[0].load(), poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox) , ThreadCtx(threadCtx) - {} + { + Ctx.SharedThread = threadCtx; + } TGenericExecutorThread::~TGenericExecutorThread() { } diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index 3378571c91..7af5a085dd 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -32,6 +32,10 @@ namespace NActors { public: ui64 StartWakingTs = 0; + ui64 GetStateInt() { + return WaitingFlag.load(); + } + protected: template <typename TWaitState> TWaitState GetState() { @@ -164,7 +168,7 @@ namespace NActors { {} explicit operator ui64() { - return static_cast<ui64>(Flag) | ui64(NextPool << 3); + return static_cast<ui64>(Flag) | (static_cast<ui64>(NextPool) << 3); } explicit operator EThreadState() { @@ -176,6 +180,14 @@ namespace NActors { std::atomic<i64> RequestsForWakeUp = 0; ui32 NextPool = 0; + void SetWork() { + this->ExchangeState(TWaitState{EThreadState::Work}); + } + + void UnsetWork() { + this->ExchangeState(TWaitState{EThreadState::None}); + } + void AfterWakeUp(TWaitState state) { NextPool = state.NextPool; } diff --git a/ydb/library/actors/core/worker_context.h b/ydb/library/actors/core/worker_context.h index 67e731c9ed..bdec5145a1 100644 --- a/ydb/library/actors/core/worker_context.h +++ b/ydb/library/actors/core/worker_context.h @@ -17,7 +17,7 @@ #include <library/cpp/lwtrace/shuttle.h> namespace NActors { - struct TExecutorThreadCtx; + struct TSharedExecutorThreadCtx; struct TWorkerContext { TWorkerId WorkerId; @@ -35,6 +35,8 @@ namespace NActors { bool IsNeededToWaitNextActivation = true; i64 HPStart = 0; ui32 ExecutedEvents = 0; + TSharedExecutorThreadCtx *SharedThread = nullptr; + TWorkerContext(TWorkerId workerId, TCpuId cpuId) : WorkerId(workerId) |