aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-01-11 14:15:52 +0300
committerGitHub <noreply@github.com>2024-01-11 14:15:52 +0300
commit57a192d4f253ebd3c1bcf84d61912f81ba3b99f0 (patch)
treecb47b151434145758b692d3b8004c0a9990f1853
parente4b78b4a312ad6000d440e1601d184087c129da8 (diff)
downloadydb-57a192d4f253ebd3c1bcf84d61912f81ba3b99f0.tar.gz
Add shared executor pool, KIKIMR-18440 (#933)
* Move some methods * add shared executor pool * Add config and improve wake ups --------- Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
-rw-r--r--ydb/library/actors/core/config.h11
-rw-r--r--ydb/library/actors/core/cpu_manager.cpp13
-rw-r--r--ydb/library/actors/core/cpu_manager.h4
-rw-r--r--ydb/library/actors/core/executor_pool.h22
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp43
-rw-r--r--ydb/library/actors/core/executor_pool_basic_ut.cpp44
-rw-r--r--ydb/library/actors/core/executor_pool_shared.cpp153
-rw-r--r--ydb/library/actors/core/executor_pool_shared.h56
-rw-r--r--ydb/library/actors/core/executor_thread.cpp40
-rw-r--r--ydb/library/actors/core/executor_thread.h10
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h10
-rw-r--r--ydb/library/actors/core/ya.make2
12 files changed, 339 insertions, 69 deletions
diff --git a/ydb/library/actors/core/config.h b/ydb/library/actors/core/config.h
index d70a917847..4889206b0a 100644
--- a/ydb/library/actors/core/config.h
+++ b/ydb/library/actors/core/config.h
@@ -35,6 +35,16 @@ namespace NActors {
i16 SharedExecutorsCount = 0;
i16 SoftProcessingDurationTs = 0;
EASProfile ActorSystemProfile = EASProfile::Default;
+ bool HasSharedThread = false;
+ };
+
+ struct TSharedExecutorPoolConfig {
+ ui32 Threads = 1;
+ ui64 SpinThreshold = 100;
+ TCpuMask Affinity; // Executor thread affinity
+ TDuration TimePerMailbox = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;
+ ui32 EventsPerMailbox = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX;
+ i16 SoftProcessingDurationTs = 0;
};
struct TIOExecutorPoolConfig {
@@ -54,6 +64,7 @@ namespace NActors {
TVector<TBasicExecutorPoolConfig> Basic;
TVector<TIOExecutorPoolConfig> IO;
TVector<TSelfPingInfo> PingInfoByPool;
+ TSharedExecutorPoolConfig Shared;
ui32 GetExecutorsCount() const {
return Basic.size() + IO.size();
diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp
index 1898904bb3..7c0c2276b3 100644
--- a/ydb/library/actors/core/cpu_manager.cpp
+++ b/ydb/library/actors/core/cpu_manager.cpp
@@ -22,8 +22,17 @@ namespace NActors {
TAffinity available;
available.Current();
+ std::vector<i16> poolsWithSharedThreads;
+ for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
+ if (cfg.HasSharedThread) {
+ poolsWithSharedThreads.push_back(cfg.PoolId);
+ }
+ }
+ Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
+ // auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
+
ui64 ts = GetCycleCountFast();
- Harmonizer.Reset(MakeHarmonizer(ts));
+ Harmonizer.reset(MakeHarmonizer(ts));
Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);
@@ -90,7 +99,7 @@ namespace NActors {
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
- return new TBasicExecutorPool(cfg, Harmonizer.Get());
+ return new TBasicExecutorPool(cfg, Harmonizer.get());
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h
index 29e86170c9..0fc5f1dbf2 100644
--- a/ydb/library/actors/core/cpu_manager.h
+++ b/ydb/library/actors/core/cpu_manager.h
@@ -2,6 +2,7 @@
#include "harmonizer.h"
#include "executor_pool.h"
+#include "executor_pool_shared.h"
namespace NActors {
struct TActorSystemSetup;
@@ -9,7 +10,8 @@ namespace NActors {
class TCpuManager : public TNonCopyable {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
- THolder<IHarmonizer> Harmonizer;
+ std::unique_ptr<IHarmonizer> Harmonizer;
+ std::unique_ptr<TSharedExecutorPool> Shared;
TCpuManagerConfig Config;
public:
diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h
index 5498a6403e..854689346a 100644
--- a/ydb/library/actors/core/executor_pool.h
+++ b/ydb/library/actors/core/executor_pool.h
@@ -23,7 +23,20 @@ namespace NActors {
}
};
- class IExecutorPool : TNonCopyable {
+ struct IActorThreadPool : TNonCopyable {
+
+ virtual ~IActorThreadPool() {
+ }
+
+ // lifecycle stuff
+ virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
+ virtual void Start() = 0;
+ virtual void PrepareStop() = 0;
+ virtual void Shutdown() = 0;
+ virtual bool Cleanup() = 0;
+ };
+
+ class IExecutorPool : public IActorThreadPool {
public:
const ui32 PoolId;
@@ -87,13 +100,6 @@ namespace NActors {
virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
- // lifecycle stuff
- virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
- virtual void Start() = 0;
- virtual void PrepareStop() = 0;
- virtual void Shutdown() = 0;
- virtual bool Cleanup() = 0;
-
virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
// TODO: make pure virtual and override everywhere
Y_UNUSED(poolStats);
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index e646b27e45..e7fd15fc82 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -4,6 +4,7 @@
#include "executor_thread_ctx.h"
#include "probes.h"
#include "mailbox.h"
+#include <atomic>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/datetime.h>
@@ -578,6 +579,14 @@ namespace NActors {
}
bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
+ i64 requestsForWakeUp = RequestsForWakeUp.fetch_sub(1, std::memory_order_acq_rel);
+ if (requestsForWakeUp) {
+ if (requestsForWakeUp > 1) {
+ requestsForWakeUp--;
+ RequestsForWakeUp.compare_exchange_weak(requestsForWakeUp, 0, std::memory_order_acq_rel);
+ }
+ return false;
+ }
EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
if (spinThresholdCycles > 0) {
@@ -615,4 +624,38 @@ namespace NActors {
return false;
}
+ bool TSharedExecutorThreadCtx::WakeUp() {
+ i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel);
+ if (requestsForWakeUp >= 0) {
+ return false;
+ }
+
+ for (;;) {
+ EThreadState state = GetState<EThreadState>();
+ switch (state) {
+ case EThreadState::None:
+ case EThreadState::Work:
+ // TODO(kruall): check race
+ continue;
+ case EThreadState::Spin:
+ case EThreadState::Sleep:
+ if (ReplaceState<EThreadState>(state, EThreadState::None)) {
+ if (state == EThreadState::Sleep) {
+ ui64 beforeUnpark = GetCycleCountFast();
+ StartWakingTs = beforeUnpark;
+ WaitingPad.Unpark();
+ if (TlsThreadContext && TlsThreadContext->WaitingStats) {
+ TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
+ }
+ }
+ return true;
+ }
+ break;
+ default:
+ Y_ABORT();
+ }
+ }
+ return false;
+ }
+
}
diff --git a/ydb/library/actors/core/executor_pool_basic_ut.cpp b/ydb/library/actors/core/executor_pool_basic_ut.cpp
index 754625c42b..76a868741e 100644
--- a/ydb/library/actors/core/executor_pool_basic_ut.cpp
+++ b/ydb/library/actors/core/executor_pool_basic_ut.cpp
@@ -432,7 +432,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
Y_UNIT_TEST(CheckStats) {
const size_t size = 4;
- const size_t msgCount = 1e4;
+ const size_t msgCount = 5e3;
TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
auto setup = GetActorSystemSetup(executorPool);
@@ -441,17 +441,23 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
auto begin = TInstant::Now();
- auto actor = new TTestSenderActor();
- auto actorId = actorSystem.Register(actor);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
+ auto load = [&]() {
+ auto actor = new TTestSenderActor();
+ auto actorId = actorSystem.Register(actor);
+ actor->Start(actor->SelfId(), msgCount);
+ actorSystem.Send(actorId, new TEvMsg());
- while (actor->GetCounter()) {
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
+ while (actor->GetCounter()) {
+ auto now = TInstant::Now();
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
- Sleep(TDuration::MilliSeconds(1));
- }
+ Sleep(TDuration::MilliSeconds(1));
+ }
+ };
+
+ load();
+ Sleep(TDuration::MilliSeconds(10));
+ load();
TVector<TExecutorThreadStats> stats;
TExecutorPoolStats poolStats;
@@ -461,8 +467,8 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
stats[0].Aggregate(stats[idx]);
}
- UNIT_ASSERT_VALUES_EQUAL(stats[0].SentEvents, msgCount - 1);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].SentEvents, 2 * msgCount - 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, 2 * msgCount);
UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
@@ -470,18 +476,18 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
UNIT_ASSERT(stats[0].ParkedTicks > 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
- UNIT_ASSERT(stats[0].ActivationTimeHistogram.TotalSamples >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].EventDeliveryTimeHistogram.TotalSamples, msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].EventProcessingCountHistogram.TotalSamples, msgCount);
+ UNIT_ASSERT(stats[0].ActivationTimeHistogram.TotalSamples >= 2 * msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].EventDeliveryTimeHistogram.TotalSamples, 2 * msgCount);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].EventProcessingCountHistogram.TotalSamples, 2 * msgCount);
UNIT_ASSERT(stats[0].EventProcessingTimeHistogram.TotalSamples > 0);
UNIT_ASSERT(stats[0].ElapsedTicksByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()] > 0);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], msgCount);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].ActorsAliveByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 2 * msgCount);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].ActorsAliveByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 2);
UNIT_ASSERT_VALUES_EQUAL(stats[0].ScheduledEventsByActivity[NActors::TActorTypeOperator::GetOtherActivityIndex()], 0);
- UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 2);
UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolDestroyedActors, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolAllocatedMailboxes, 4095); // one line
- UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
+ UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount >= 2 * msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0);
}
}
diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp
new file mode 100644
index 0000000000..e9d40fdc71
--- /dev/null
+++ b/ydb/library/actors/core/executor_pool_shared.cpp
@@ -0,0 +1,153 @@
+#include "executor_pool_shared.h"
+
+#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "executor_thread.h"
+#include "executor_thread_ctx.h"
+
+#include <atomic>
+#include <ydb/library/actors/util/affinity.h>
+
+
+namespace NActors {
+
+TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
+ : ThreadByPool(poolCount, -1)
+ , PoolByThread(poolsWithThreads.size())
+ , BorrowedThreadByPool(poolCount, -1)
+ , PoolByBorrowedThread(poolsWithThreads.size(), -1)
+ , Pools(poolCount)
+ , PoolCount(poolCount)
+ , SharedThreadCount(poolsWithThreads.size())
+ , Threads(new TSharedExecutorThreadCtx[SharedThreadCount])
+ , Timers(new TTimers[SharedThreadCount])
+ , TimePerMailbox(config.TimePerMailbox)
+ , EventsPerMailbox(config.EventsPerMailbox)
+ , SoftProcessingDurationTs(config.SoftProcessingDurationTs)
+{
+ for (ui32 poolIdx = 0, threadIdx = 0; poolIdx < poolsWithThreads.size(); ++poolIdx, ++threadIdx) {
+ Y_ABORT_UNLESS(poolsWithThreads[poolIdx] < poolCount);
+ ThreadByPool[poolsWithThreads[poolIdx]] = threadIdx;
+ PoolByThread[threadIdx] = poolsWithThreads[poolIdx];
+ }
+}
+
+void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
+ // ActorSystem = actorSystem;
+
+ ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
+ ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
+
+ std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
+ std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
+ for (IExecutorPool* pool : poolsBasic) {
+ Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
+ i16 threadIdx = ThreadByPool[pool->PoolId];
+ if (threadIdx >= 0) {
+ poolByThread[threadIdx] = pool;
+ }
+ }
+
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ // !TODO
+ Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
+ Threads[i].Thread.Reset(
+ new TSharedExecutorThread(
+ -1,
+ actorSystem,
+ &Threads[i],
+ PoolCount,
+ "SharedThread",
+ SoftProcessingDurationTs,
+ TimePerMailbox,
+ EventsPerMailbox));
+ ScheduleWriters[i].Init(ScheduleReaders[i]);
+ }
+
+ *scheduleReaders = ScheduleReaders.get();
+ *scheduleSz = SharedThreadCount;
+}
+
+void TSharedExecutorPool::Start() {
+ //ThreadUtilization = 0;
+ //AtomicAdd(MaxUtilizationCounter, -(i64)GetCycleCountFast());
+
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ Threads[i].Thread->Start();
+ }
+}
+
+void TSharedExecutorPool::PrepareStop() {
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ Threads[i].Thread->StopFlag = true;
+ Threads[i].Interrupt();
+ }
+}
+
+void TSharedExecutorPool::Shutdown() {
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ Threads[i].Thread->Join();
+ }
+}
+
+bool TSharedExecutorPool::Cleanup() {
+ return true;
+}
+
+TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
+ i16 threadIdx = ThreadByPool[pool];
+ if (threadIdx < 0 || threadIdx >= PoolCount) {
+ return nullptr;
+ }
+ return &Threads[threadIdx];
+}
+
+void TSharedExecutorPool::ReturnHalfThread(i16 pool) {
+ i16 threadIdx = ThreadByPool[pool];
+ TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
+ Y_ABORT_UNLESS(borrowingPool);
+ BorrowedThreadByPool[PoolByBorrowedThread[threadIdx]] = -1;
+ PoolByBorrowedThread[threadIdx] = -1;
+
+ // TODO(kruall): add change in executor pool basic
+}
+
+void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
+ i16 threadIdx = ThreadByPool[from];
+ TBasicExecutorPool* borrowingPool = Pools[to];
+ Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
+ BorrowedThreadByPool[to] = threadIdx;
+ PoolByBorrowedThread[threadIdx] = to;
+
+ // TODO(kruall): add change in executor pool basic
+}
+
+void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
+ statsCopy.resize(SharedThreadCount + 1);
+ for (i16 i = 0; i < SharedThreadCount; ++i) {
+ Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
+ }
+}
+
+TCpuConsumption TSharedExecutorPool::GetThreadCpuConsumption(i16 poolId, i16 threadIdx) {
+ if (threadIdx >= SharedThreadCount) {
+ return {0.0, 0.0};
+ }
+ TExecutorThreadStats stats;
+ Threads[threadIdx].Thread->GetSharedStats(poolId, stats);
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
+}
+
+std::vector<TCpuConsumption> TSharedExecutorPool::GetThreadsCpuConsumption(i16 poolId) {
+ std::vector<TCpuConsumption> result;
+ for (i16 threadIdx = 0; threadIdx < SharedThreadCount; ++threadIdx) {
+ result.push_back(GetThreadCpuConsumption(poolId, threadIdx));
+ }
+ return result;
+}
+
+i16 TSharedExecutorPool::GetSharedThreadCount() const {
+ return SharedThreadCount;
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h
new file mode 100644
index 0000000000..2f0eb60973
--- /dev/null
+++ b/ydb/library/actors/core/executor_pool_shared.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include "config.h"
+#include "executor_pool.h"
+#include "executor_thread_ctx.h"
+
+
+namespace NActors {
+
+ struct TExecutorThreadCtx;
+ class TBasicExecutorPool;
+
+ class TSharedExecutorPool: public IActorThreadPool {
+ public:
+ TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
+
+ // IThreadPool
+ void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
+ void Start() override;
+ void PrepareStop() override;
+ void Shutdown() override;
+ bool Cleanup() override;
+
+ TSharedExecutorThreadCtx *GetSharedThread(i16 poolId);
+ void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy);
+ TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx);
+ std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId);
+
+ void ReturnHalfThread(i16 pool);
+ void GiveHalfThread(i16 from, i16 to);
+
+ i16 GetSharedThreadCount() const;
+
+
+ private:
+ std::vector<i16> ThreadByPool;
+ std::vector<i16> PoolByThread;
+ std::vector<i16> BorrowedThreadByPool;
+ std::vector<i16> PoolByBorrowedThread;
+
+ std::vector<TBasicExecutorPool*> Pools;
+
+ i16 PoolCount;
+ i16 SharedThreadCount;
+ std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
+ std::unique_ptr<TTimers[]> Timers;
+
+ std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
+ std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
+
+ TDuration TimePerMailbox;
+ ui32 EventsPerMailbox;
+ ui64 SoftProcessingDurationTs;
+ };
+
+} \ No newline at end of file
diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp
index 0b24398ac6..d20e6482d2 100644
--- a/ydb/library/actors/core/executor_thread.cpp
+++ b/ydb/library/actors/core/executor_thread.cpp
@@ -10,6 +10,7 @@
#include "executor_thread_ctx.h"
#include "probes.h"
+#include <atomic>
#include <ydb/library/actors/prof/tag.h>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/datetime.h>
@@ -92,7 +93,7 @@ namespace NActors {
ui64 softProcessingDurationTs,
TDuration timePerMailbox,
ui32 eventsPerMailbox)
- : TGenericExecutorThread(workerId, actorSystem, threadCtx->OwnerExecutorPool, poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox)
+ : TGenericExecutorThread(workerId, actorSystem, threadCtx->ExecutorPools[0].load(), poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox)
, ThreadCtx(threadCtx)
{}
@@ -502,10 +503,6 @@ namespace NActors {
return nullptr;
}
- void TSharedExecutorThread::UpdatePools() {
- NeedToReloadPools = EState::NeedToReloadPools;
- }
-
TGenericExecutorThread::TProcessingResult TSharedExecutorThread::ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool) {
Ctx.Switch(
pool,
@@ -515,7 +512,7 @@ namespace NActors {
GetCycleCountFast() + SoftProcessingDurationTs,
&SharedStats[pool->PoolId]);
Y_ABORT_UNLESS(Ctx.Stats->ElapsedTicksByActivity.size());
- Ctx.WorkerId = (pool == ThreadCtx->OwnerExecutorPool ? -1 : -2);
+ Ctx.WorkerId = (pool == ThreadCtx->ExecutorPools[0].load(std::memory_order_relaxed) ? -1 : -2);
Y_ABORT_UNLESS(Ctx.Stats->ElapsedTicksByActivity.size());
return ProcessExecutorPool(pool);
}
@@ -536,33 +533,22 @@ namespace NActors {
::SetCurrentThreadName(ThreadName);
}
- if (!IsSharedThread) {
- ProcessExecutorPool(ExecutorPool);
- return nullptr;
- }
-
- TExecutorPoolBaseMailboxed *ownerPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OwnerExecutorPool);
- TExecutorPoolBaseMailboxed *otherPool = nullptr;
-
-
- std::vector<TExecutorPoolBaseMailboxed*> pools;
do {
- if (NeedToReloadPools.load() == EState::NeedToReloadPools) {
- // otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
- NeedToReloadPools = EState::Running;
- }
bool wasWorking = true;
- while (wasWorking && NeedToReloadPools.load() == EState::Running && !StopFlag.load(std::memory_order_relaxed)) {
+ while (wasWorking && !StopFlag.load(std::memory_order_relaxed)) {
wasWorking = false;
- TProcessingResult result = ProcessSharedExecutorPool(ownerPool);
- wasWorking |= result.WasWorking;
- if (otherPool) {
- result = ProcessSharedExecutorPool(otherPool);
- wasWorking = result.WasWorking;
+
+ for (ui32 poolIdx = 0; poolIdx < MaxPoolsForSharedThreads; ++poolIdx) {
+ TExecutorPoolBaseMailboxed *pool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->ExecutorPools[poolIdx].load(std::memory_order_acquire));
+ if (!pool) {
+ break;
+ }
+ TProcessingResult result = ProcessSharedExecutorPool(pool);
+ wasWorking |= result.WasWorking;
}
}
- if (!wasWorking && NeedToReloadPools.load() == EState::Running && !StopFlag.load(std::memory_order_relaxed)) {
+ if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) {
TlsThreadContext->Timers.Reset();
ThreadCtx->Wait(0, &StopFlag);
}
diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h
index 261a05ab2a..b9a339648b 100644
--- a/ydb/library/actors/core/executor_thread.h
+++ b/ydb/library/actors/core/executor_thread.h
@@ -119,7 +119,6 @@ namespace NActors {
: TGenericExecutorThread(workerId, cpuId, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox)
{}
-
TExecutorThread(TWorkerId workerId,
TActorSystem* actorSystem,
IExecutorPool* executorPool,
@@ -138,11 +137,6 @@ namespace NActors {
};
class TSharedExecutorThread: public TGenericExecutorThread {
- enum class EState : ui64 {
- Running = 0,
- NeedToReloadPools,
- };
-
public:
TSharedExecutorThread(TWorkerId workerId,
TActorSystem* actorSystem,
@@ -156,15 +150,11 @@ namespace NActors {
virtual ~TSharedExecutorThread()
{}
- void UpdatePools();
-
private:
TProcessingResult ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool);
void* ThreadProc();
- std::atomic<EState> NeedToReloadPools = EState::NeedToReloadPools;
-
TSharedExecutorThreadCtx *ThreadCtx;
};
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
index e9bcb500f6..3378571c91 100644
--- a/ydb/library/actors/core/executor_thread_ctx.h
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -142,7 +142,7 @@ namespace NActors {
};
- constexpr ui32 MaxPoolsForSharedThreads = 4;
+ constexpr ui32 MaxPoolsForSharedThreads = 2;
struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;
@@ -172,8 +172,8 @@ namespace NActors {
}
};
- TBasicExecutorPool *OwnerExecutorPool = nullptr;
std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
+ std::atomic<i64> RequestsForWakeUp = 0;
ui32 NextPool = 0;
void AfterWakeUp(TWaitState state) {
@@ -190,6 +190,12 @@ namespace NActors {
bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp
+ bool WakeUp();
+
+ void Interrupt() {
+ WaitingPad.Interrupt();
+ }
+
TSharedExecutorThreadCtx() = default;
};
diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make
index f1d9abc3f8..a86b8e3297 100644
--- a/ydb/library/actors/core/ya.make
+++ b/ydb/library/actors/core/ya.make
@@ -51,6 +51,8 @@ SRCS(
executor_pool_basic.h
executor_pool_io.cpp
executor_pool_io.h
+ executor_pool_shared.cpp
+ executor_pool_shared.h
executor_thread.cpp
executor_thread.h
harmonizer.cpp