aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-01-24 12:32:57 +0300
committerGitHub <noreply@github.com>2024-01-24 12:32:57 +0300
commitba1fea3e79ac91decac8e328880e85156152e2c3 (patch)
treef71653ddc4df23727e77583aa9d8b302f9c9a066
parent902255aefe691f65f89ad22d6df31ed970b45ab7 (diff)
downloadydb-ba1fea3e79ac91decac8e328880e85156152e2c3.tar.gz
Add shared logic to executor pool (#1236)
-rw-r--r--ydb/library/actors/core/actor_benchmark_helper.h7
-rw-r--r--ydb/library/actors/core/actor_ut.cpp99
-rw-r--r--ydb/library/actors/core/cpu_manager.cpp40
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp112
-rw-r--r--ydb/library/actors/core/executor_pool_basic.h11
-rw-r--r--ydb/library/actors/core/executor_pool_shared.cpp8
-rw-r--r--ydb/library/actors/core/executor_thread.cpp4
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h14
-rw-r--r--ydb/library/actors/core/worker_context.h4
9 files changed, 242 insertions, 57 deletions
diff --git a/ydb/library/actors/core/actor_benchmark_helper.h b/ydb/library/actors/core/actor_benchmark_helper.h
index 074295840e..e1b80e9475 100644
--- a/ydb/library/actors/core/actor_benchmark_helper.h
+++ b/ydb/library/actors/core/actor_benchmark_helper.h
@@ -138,6 +138,9 @@ struct TActorBenchmark {
, InFlight(params.InFlight)
{}
+ ~TSendReceiveActor() {
+ }
+
void StoreCounters(std::vector<NThreading::TPadded<std::atomic<ui64>>> &dest) {
for (ui32 idx = 0; idx < dest.size(); ++idx) {
dest[idx].store(SharedCounters->Counters[idx]);
@@ -258,14 +261,14 @@ struct TActorBenchmark {
ui32 ReceiveTurn = 0;
};
- static void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, i16 sharedExecutorsCount) {
+ static void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, bool hasSharedThread) {
TBasicExecutorPoolConfig basic;
basic.PoolId = setup->GetExecutorsCount();
basic.PoolName = TStringBuilder() << "b" << basic.PoolId;
basic.Threads = threads;
basic.SpinThreshold = TSettings::DefaultSpinThreshold;
basic.TimePerMailbox = TDuration::Hours(1);
- basic.SharedExecutorsCount = sharedExecutorsCount;
+ basic.HasSharedThread = hasSharedThread;
basic.SoftProcessingDurationTs = Us2Ts(100);
if (activateEveryEvent) {
basic.EventsPerMailbox = 1;
diff --git a/ydb/library/actors/core/actor_ut.cpp b/ydb/library/actors/core/actor_ut.cpp
index 0b35ae0f8f..0c69d7ce29 100644
--- a/ydb/library/actors/core/actor_ut.cpp
+++ b/ydb/library/actors/core/actor_ut.cpp
@@ -24,11 +24,9 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSettings = TActorBenchmark::TSettings;
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;
- Y_UNIT_TEST(WithSharedExecutors) {
- return;
+ Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
- TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
- TActorBenchmark::AddBasicPool(setup, 2, 1, 1);
+ TActorBenchmark::AddBasicPool(setup, 1, 1, true);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -37,12 +35,51 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TAtomic actorsAlive = 0;
THPTimer Timer;
- ui64 eventsPerPair = TSettings::TotalEventsAmountPerThread * 4 / 60;
+ ui64 eventsPerPair = 1000;
Timer.Reset();
- for (ui32 i = 0; i < 50; ++i) {
- ui32 followerPoolId = 0;
- ui32 leaderPoolId = 0;
+ ui32 followerPoolId = 0;
+ ui32 leaderPoolId = 0;
+ TActorId followerId = actorSystem.Register(
+ new TActorBenchmark::TSendReceiveActor(
+ TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true}
+ ),
+ TMailboxType::HTSwap,
+ followerPoolId
+ );
+ THolder<IActor> leader{
+ new TTestEndDecorator(THolder(new TActorBenchmark::TSendReceiveActor(
+ TSendReceiveActorParams{.OwnEvents=eventsPerPair / 2, .Receivers={followerId}, .Allocation=true}
+ )),
+ &pad,
+ &actorsAlive)
+ };
+ actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+
+ pad.Park();
+ auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4);
+ actorSystem.Stop();
+
+ Cerr << "Completed " << 1e9 * elapsedTime << Endl;
+ }
+
+ Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) {
+ THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
+ TActorBenchmark::AddBasicPool(setup, 2, true, true);
+
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ TThreadParkPad pad;
+ TAtomic actorsAlive = 0;
+ THPTimer Timer;
+
+ ui64 eventsPerPair = 1000;
+
+ Timer.Reset();
+ ui32 followerPoolId = 0;
+ ui32 leaderPoolId = 0;
+ for (ui32 idx = 0; idx < 50; ++idx) {
TActorId followerId = actorSystem.Register(
new TActorBenchmark::TSendReceiveActor(
TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true}
@@ -59,6 +96,50 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
}
+
+ pad.Park();
+ auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4);
+ actorSystem.Stop();
+
+ Cerr << "Completed " << 1e9 * elapsedTime << Endl;
+ }
+
+ Y_UNIT_TEST(WithSharedExecutors) {
+ THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
+ TActorBenchmark::AddBasicPool(setup, 2, 1, false);
+ TActorBenchmark::AddBasicPool(setup, 2, 1, true);
+
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ TThreadParkPad pad;
+ TAtomic actorsAlive = 0;
+ THPTimer Timer;
+
+ ui64 eventsPerPair = TSettings::TotalEventsAmountPerThread * 4 / 60;
+
+ Timer.Reset();
+ for (ui32 i = 0; i < 50; ++i) {
+ ui32 followerPoolId = 0;
+ ui32 leaderPoolId = 0;
+ TActorId followerId = actorSystem.Register(
+ new TActorBenchmark::TSendReceiveActor(
+ TSendReceiveActorParams{.OtherEvents=eventsPerPair / 2, .Allocation=true}
+ ),
+ TMailboxType::HTSwap,
+ followerPoolId
+ );
+ THolder<IActor> leader{
+ new TTestEndDecorator(
+ THolder(new TActorBenchmark::TSendReceiveActor(
+ TSendReceiveActorParams{.OwnEvents=eventsPerPair / 2, .Receivers={followerId}, .Allocation=true}
+ )),
+ &pad,
+ &actorsAlive
+ )
+ };
+ actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+ }
for (ui32 i = 0; i < 10; ++i) {
ui32 followerPoolId = 1;
ui32 leaderPoolId = 1;
@@ -82,7 +163,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
pad.Park();
- auto elapsedTime = Timer.Passed() / (TSettings::TotalEventsAmountPerThread * 4);
+ auto elapsedTime = Timer.Passed() / (4 * TSettings::TotalEventsAmountPerThread);
actorSystem.Stop();
Cerr << "Completed " << 1e9 * elapsedTime << Endl;
diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp
index 7c0c2276b3..039625fd29 100644
--- a/ydb/library/actors/core/cpu_manager.cpp
+++ b/ydb/library/actors/core/cpu_manager.cpp
@@ -47,8 +47,15 @@ namespace NActors {
}
void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
+ NSchedulerQueue::TReader* readers;
+ ui32 readersCount = 0;
+ if (Shared) {
+ Shared->Prepare(actorSystem, &readers, &readersCount);
+ for (ui32 i = 0; i != readersCount; ++i, ++readers) {
+ scheduleReaders.push_back(readers);
+ }
+ }
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
- NSchedulerQueue::TReader* readers;
ui32 readersCount = 0;
Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount);
for (ui32 i = 0; i != readersCount; ++i, ++readers) {
@@ -58,6 +65,9 @@ namespace NActors {
}
void TCpuManager::Start() {
+ if (Shared) {
+ Shared->Start();
+ }
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->Start();
}
@@ -67,6 +77,9 @@ namespace NActors {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->PrepareStop();
}
+ if (Shared) {
+ Shared->PrepareStop();
+ }
}
void TCpuManager::Shutdown() {
@@ -81,6 +94,10 @@ namespace NActors {
}
}
}
+ if (Shared) {
+ Shared->Shutdown();
+ Shared->Cleanup();
+ }
}
void TCpuManager::Cleanup() {
@@ -93,13 +110,32 @@ namespace NActors {
}
}
}
+ if (Shared) {
+ Shared->Cleanup();
+ }
Executors.Destroy();
+ if (Shared) {
+ Shared.reset();
+ }
}
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
- return new TBasicExecutorPool(cfg, Harmonizer.get());
+ if (cfg.HasSharedThread) {
+ cfg.Threads -= 1;
+ if (cfg.MaxThreadCount) {
+ cfg.MaxThreadCount -= 1;
+ }
+ auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
+ auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get());
+ if (pool) {
+ pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
+ }
+ return pool;
+ } else {
+ return new TBasicExecutorPool(cfg, Harmonizer.get());
+ }
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index e7fd15fc82..4c6db5d609 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -4,6 +4,7 @@
#include "executor_thread_ctx.h"
#include "probes.h"
#include "mailbox.h"
+#include "thread_context.h"
#include <atomic>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/datetime.h>
@@ -82,10 +83,10 @@ namespace NActors {
DefaultThreadCount = limit;
}
- MaxThreadCount = Min(Max(MaxThreadCount, DefaultThreadCount), limit);
+ MaxThreadCount = Min(Max<i16>(MaxThreadCount, DefaultThreadCount), limit);
if (MinThreadCount) {
- MinThreadCount = Max((i16)1, Min(MinThreadCount, DefaultThreadCount));
+ MinThreadCount = Max<i16>(0, Min(MinThreadCount, DefaultThreadCount));
} else {
MinThreadCount = DefaultThreadCount;
}
@@ -170,12 +171,15 @@ namespace NActors {
if (workerId >= 0) {
Threads[workerId].UnsetWork();
+ } else {
+ Y_ABORT_UNLESS(wctx.SharedThread);
+ wctx.SharedThread->UnsetWork();
}
TAtomic x = AtomicGet(Semaphore);
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
while (!StopFlag.load(std::memory_order_acquire)) {
- if (!semaphore.OldSemaphore || semaphore.CurrentSleepThreadCount < 0) {
+ if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart);
@@ -194,6 +198,9 @@ namespace NActors {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
Threads[workerId].SetWork();
+ } else {
+ Y_ABORT_UNLESS(wctx.SharedThread);
+ wctx.SharedThread->SetWork();
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
@@ -233,9 +240,6 @@ namespace NActors {
ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
if constexpr (NFeatures::IsLocalQueues()) {
- if (SharedExecutorsCount) {
- return GetReadyActivationCommon(wctx, revolvingCounter);
- }
return GetReadyActivationLocalQueue(wctx, revolvingCounter);
} else {
return GetReadyActivationCommon(wctx, revolvingCounter);
@@ -251,20 +255,38 @@ namespace NActors {
}
return;
}
- if (++i >= MaxThreadCount - SharedExecutorsCount) {
+ if (++i >= PoolThreads) {
i = 0;
}
}
}
+ bool TBasicExecutorPool::WakeUpLoopShared() {
+ for (ui32 idx = 0; idx < MaxSharedThreadsForPool; ++idx) {
+ TSharedExecutorThreadCtx *thread = SharedThreads[idx].load(std::memory_order_acquire);
+ if (!thread) {
+ return false;
+ }
+ if (thread->WakeUp()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) {
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
Activations.Push(activation, revolvingCounter);
bool needToWakeUp = false;
+ if (WakeUpLoopShared()) {
+ x = AtomicIncrement(Semaphore);
+ return;
+ }
+
do {
- needToWakeUp = semaphore.CurrentSleepThreadCount > SharedExecutorsCount;
+ needToWakeUp = semaphore.CurrentSleepThreadCount > 0;
i64 oldX = semaphore.ConvertToI64();
semaphore.OldSemaphore++;
if (needToWakeUp) {
@@ -329,7 +351,7 @@ namespace NActors {
poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
poolStats.DefaultThreadCount = DefaultThreadCount;
- poolStats.MaxThreadCount = MaxThreadCount;
+ poolStats.MaxThreadCount = PoolThreads;
poolStats.SpinningTimeUs = Ts2Us(SpinningTimeUs);
poolStats.SpinThresholdUs = Ts2Us(SpinThresholdCycles);
if (Harmonizer) {
@@ -371,18 +393,16 @@ namespace NActors {
ScheduleWriters.Reset(new NSchedulerQueue::TWriter[PoolThreads]);
for (i16 i = 0; i != PoolThreads; ++i) {
- if (i < MaxThreadCount - SharedExecutorsCount) {
- Threads[i].Thread.Reset(
- new TExecutorThread(
- i,
- 0, // CpuId is not used in BASIC pool
- actorSystem,
- this,
- MailboxTable.Get(),
- PoolName,
- TimePerMailbox,
- EventsPerMailbox));
- }
+ Threads[i].Thread.Reset(
+ new TExecutorThread(
+ i,
+ 0, // CpuId is not used in BASIC pool
+ actorSystem,
+ this,
+ MailboxTable.Get(),
+ PoolName,
+ TimePerMailbox,
+ EventsPerMailbox));
ScheduleWriters[i].Init(ScheduleReaders[i]);
}
@@ -454,11 +474,11 @@ namespace NActors {
}
i16 TBasicExecutorPool::GetThreadCount() const {
- return AtomicGet(ThreadCount);
+ return AtomicGet(ThreadCount) + SharedThreadsCount.load(std::memory_order_acquire);
}
void TBasicExecutorPool::SetThreadCount(i16 threads) {
- threads = Max(i16(1), Min(PoolThreads, threads));
+ threads = Max<i16>(0, Min(PoolThreads, threads));
with_lock (ChangeThreadsLock) {
i16 prevCount = GetThreadCount();
AtomicSet(ThreadCount, threads);
@@ -581,14 +601,10 @@ namespace NActors {
bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
i64 requestsForWakeUp = RequestsForWakeUp.fetch_sub(1, std::memory_order_acq_rel);
if (requestsForWakeUp) {
- if (requestsForWakeUp > 1) {
- requestsForWakeUp--;
- RequestsForWakeUp.compare_exchange_weak(requestsForWakeUp, 0, std::memory_order_acq_rel);
- }
return false;
}
- EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
- Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
+ TWaitState state = ExchangeState<TWaitState>(EThreadState::Spin);
+ Y_ABORT_UNLESS(state.Flag == EThreadState::None, "WaitingFlag# %d", int(state.Flag));
if (spinThresholdCycles > 0) {
// spin configured period
Spin(spinThresholdCycles, stopFlag);
@@ -625,22 +641,33 @@ namespace NActors {
}
bool TSharedExecutorThreadCtx::WakeUp() {
- i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel);
- if (requestsForWakeUp >= 0) {
+ i64 requestsForWakeUp = RequestsForWakeUp.load(std::memory_order_acquire);
+ if (requestsForWakeUp > 0) {
return false;
}
+ for (;;) {
+ if (RequestsForWakeUp.compare_exchange_strong(requestsForWakeUp, requestsForWakeUp + 1, std::memory_order_acquire)) {
+ if (requestsForWakeUp == -1) {
+ break;
+ }
+ return false;
+ }
+ if (requestsForWakeUp > 0) {
+ return false;
+ }
+ }
for (;;) {
- EThreadState state = GetState<EThreadState>();
- switch (state) {
+ TWaitState state = GetState<TWaitState>();
+ switch (state.Flag) {
case EThreadState::None:
case EThreadState::Work:
// TODO(kruall): check race
continue;
case EThreadState::Spin:
case EThreadState::Sleep:
- if (ReplaceState<EThreadState>(state, EThreadState::None)) {
- if (state == EThreadState::Sleep) {
+ if (ReplaceState<TWaitState>(state, {EThreadState::None})) {
+ if (state.Flag == EThreadState::Sleep) {
ui64 beforeUnpark = GetCycleCountFast();
StartWakingTs = beforeUnpark;
WaitingPad.Unpark();
@@ -658,4 +685,19 @@ namespace NActors {
return false;
}
+ TSharedExecutorThreadCtx* TBasicExecutorPool::ReleaseSharedThread() {
+ ui64 count = SharedThreadsCount.fetch_sub(1, std::memory_order_acq_rel);
+ Y_ABORT_UNLESS(count);
+ auto *thread = SharedThreads[count - 1].exchange(nullptr);
+ Y_ABORT_UNLESS(thread);
+ return thread;
+ }
+
+ void TBasicExecutorPool::AddSharedThread(TSharedExecutorThreadCtx* thread) {
+ ui64 count = SharedThreadsCount.fetch_add(1, std::memory_order_acq_rel);
+ Y_ABORT_UNLESS(count < MaxSharedThreadsForPool);
+ thread = SharedThreads[count].exchange(thread);
+ Y_ABORT_UNLESS(!thread);
+ }
+
}
diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h
index 576c3b0b29..6872c08451 100644
--- a/ydb/library/actors/core/executor_pool_basic.h
+++ b/ydb/library/actors/core/executor_pool_basic.h
@@ -136,7 +136,6 @@ namespace NActors {
TArrayHolder<TWaitingStats<double>> MovingWaitingStats;
std::atomic<ui16> LocalQueueSize;
-
TArrayHolder<NSchedulerQueue::TReader> ScheduleReaders;
TArrayHolder<NSchedulerQueue::TWriter> ScheduleWriters;
@@ -160,11 +159,15 @@ namespace NActors {
i16 MaxThreadCount;
i16 DefaultThreadCount;
IHarmonizer *Harmonizer;
- i16 SharedExecutorsCount = 0;
ui64 SoftProcessingDurationTs = 0;
const i16 Priority = 0;
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
+
+ static constexpr ui64 MaxSharedThreadsForPool = 2;
+ NThreading::TPadded<std::atomic_uint64_t> SharedThreadsCount = 0;
+ NThreading::TPadded<std::atomic<TSharedExecutorThreadCtx*>> SharedThreads[MaxSharedThreadsForPool] = {nullptr, nullptr};
+
public:
struct TSemaphore {
i64 OldSemaphore = 0; // 34 bits
@@ -251,9 +254,13 @@ namespace NActors {
void CalcSpinPerThread(ui64 wakingUpConsumption);
void ClearWaitingStats() const;
+ TSharedExecutorThreadCtx* ReleaseSharedThread();
+ void AddSharedThread(TSharedExecutorThreadCtx* thread);
+
private:
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
void WakeUpLoop(i16 currentThreadCount);
+ bool WakeUpLoopShared();
};
}
diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp
index e9d40fdc71..f9b769d404 100644
--- a/ydb/library/actors/core/executor_pool_shared.cpp
+++ b/ydb/library/actors/core/executor_pool_shared.cpp
@@ -108,8 +108,8 @@ void TSharedExecutorPool::ReturnHalfThread(i16 pool) {
Y_ABORT_UNLESS(borrowingPool);
BorrowedThreadByPool[PoolByBorrowedThread[threadIdx]] = -1;
PoolByBorrowedThread[threadIdx] = -1;
-
- // TODO(kruall): add change in executor pool basic
+ // TODO(kruall): Check on race
+ borrowingPool->ReleaseSharedThread();
}
void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
@@ -118,8 +118,8 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
BorrowedThreadByPool[to] = threadIdx;
PoolByBorrowedThread[threadIdx] = to;
-
- // TODO(kruall): add change in executor pool basic
+ // TODO(kruall): Check on race
+ borrowingPool->AddSharedThread(&Threads[threadIdx]);
}
void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp
index d20e6482d2..7262b2b4cf 100644
--- a/ydb/library/actors/core/executor_thread.cpp
+++ b/ydb/library/actors/core/executor_thread.cpp
@@ -95,7 +95,9 @@ namespace NActors {
ui32 eventsPerMailbox)
: TGenericExecutorThread(workerId, actorSystem, threadCtx->ExecutorPools[0].load(), poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox)
, ThreadCtx(threadCtx)
- {}
+ {
+ Ctx.SharedThread = threadCtx;
+ }
TGenericExecutorThread::~TGenericExecutorThread()
{ }
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
index 3378571c91..7af5a085dd 100644
--- a/ydb/library/actors/core/executor_thread_ctx.h
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -32,6 +32,10 @@ namespace NActors {
public:
ui64 StartWakingTs = 0;
+ ui64 GetStateInt() {
+ return WaitingFlag.load();
+ }
+
protected:
template <typename TWaitState>
TWaitState GetState() {
@@ -164,7 +168,7 @@ namespace NActors {
{}
explicit operator ui64() {
- return static_cast<ui64>(Flag) | ui64(NextPool << 3);
+ return static_cast<ui64>(Flag) | (static_cast<ui64>(NextPool) << 3);
}
explicit operator EThreadState() {
@@ -176,6 +180,14 @@ namespace NActors {
std::atomic<i64> RequestsForWakeUp = 0;
ui32 NextPool = 0;
+ void SetWork() {
+ this->ExchangeState(TWaitState{EThreadState::Work});
+ }
+
+ void UnsetWork() {
+ this->ExchangeState(TWaitState{EThreadState::None});
+ }
+
void AfterWakeUp(TWaitState state) {
NextPool = state.NextPool;
}
diff --git a/ydb/library/actors/core/worker_context.h b/ydb/library/actors/core/worker_context.h
index 67e731c9ed..bdec5145a1 100644
--- a/ydb/library/actors/core/worker_context.h
+++ b/ydb/library/actors/core/worker_context.h
@@ -17,7 +17,7 @@
#include <library/cpp/lwtrace/shuttle.h>
namespace NActors {
- struct TExecutorThreadCtx;
+ struct TSharedExecutorThreadCtx;
struct TWorkerContext {
TWorkerId WorkerId;
@@ -35,6 +35,8 @@ namespace NActors {
bool IsNeededToWaitNextActivation = true;
i64 HPStart = 0;
ui32 ExecutedEvents = 0;
+ TSharedExecutorThreadCtx *SharedThread = nullptr;
+
TWorkerContext(TWorkerId workerId, TCpuId cpuId)
: WorkerId(workerId)