aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-09-07 22:53:22 +0300
committeralexvru <alexvru@ydb.tech>2023-09-07 23:17:01 +0300
commitc7138e84c19043351e6f4bd80a9e287a04a7a839 (patch)
tree243f25eda7b916acb507d689f10942c6f6f3c3f7
parent40f9322d2bfaf54f78bce83bae3af79f73234488 (diff)
downloadydb-c7138e84c19043351e6f4bd80a9e287a04a7a839.tar.gz
Introduce perfect activation queue
-rw-r--r--library/cpp/actors/core/CMakeLists.darwin-x86_64.txt1
-rw-r--r--library/cpp/actors/core/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/actors/core/CMakeLists.linux-x86_64.txt1
-rw-r--r--library/cpp/actors/core/CMakeLists.windows-x86_64.txt1
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_base.h4
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp8
-rw-r--r--library/cpp/actors/core/executor_pool_io.cpp16
-rw-r--r--library/cpp/actors/core/executor_pool_io.h2
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp12
-rw-r--r--library/cpp/actors/core/mailbox.cpp36
-rw-r--r--library/cpp/actors/core/mailbox.h3
-rw-r--r--library/cpp/actors/core/perfect_activation_queue.cpp95
-rw-r--r--library/cpp/actors/core/perfect_activation_queue.h29
-rw-r--r--library/cpp/actors/core/ya.make2
15 files changed, 174 insertions, 43 deletions
diff --git a/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt
index c5c1629513..392a1b5ea7 100644
--- a/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt
@@ -77,6 +77,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp
diff --git a/library/cpp/actors/core/CMakeLists.linux-aarch64.txt b/library/cpp/actors/core/CMakeLists.linux-aarch64.txt
index 23f4e3ba64..cf4e5c9440 100644
--- a/library/cpp/actors/core/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/actors/core/CMakeLists.linux-aarch64.txt
@@ -78,6 +78,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp
diff --git a/library/cpp/actors/core/CMakeLists.linux-x86_64.txt b/library/cpp/actors/core/CMakeLists.linux-x86_64.txt
index 23f4e3ba64..cf4e5c9440 100644
--- a/library/cpp/actors/core/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/actors/core/CMakeLists.linux-x86_64.txt
@@ -78,6 +78,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp
diff --git a/library/cpp/actors/core/CMakeLists.windows-x86_64.txt b/library/cpp/actors/core/CMakeLists.windows-x86_64.txt
index c5c1629513..392a1b5ea7 100644
--- a/library/cpp/actors/core/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/actors/core/CMakeLists.windows-x86_64.txt
@@ -77,6 +77,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index 10362a7a2b..c8e3c0aa71 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -71,7 +71,7 @@ namespace NActors {
{}
TExecutorPoolBase::~TExecutorPoolBase() {
- while (Activations.Pop(0))
+ while (Activations.Pop())
;
}
@@ -105,7 +105,7 @@ namespace NActors {
}
void TExecutorPoolBase::ScheduleActivation(ui32 activation) {
- ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
+ ScheduleActivationEx(activation, 0);
}
Y_FORCE_INLINE bool IsAllowedToCapture(IExecutorPool *self) {
@@ -121,7 +121,7 @@ namespace NActors {
TlsThreadContext->CapturedType = TlsThreadContext->SendingType;
}
if (activation) {
- ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
+ ScheduleActivationEx(activation, 0);
}
}
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index e94ffdbad9..6a321e32f8 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -3,6 +3,7 @@
#include "executor_pool.h"
#include "executor_thread.h"
#include "scheduler_queue.h"
+#include "perfect_activation_queue.h"
#include <library/cpp/actors/util/affinity.h>
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/actors/util/threadparkpad.h>
@@ -45,8 +46,7 @@ namespace NActors {
const i16 PoolThreads;
TIntrusivePtr<TAffinity> ThreadsAffinity;
TAtomic Semaphore = 0;
- TUnorderedCache<ui32, 512, 4> Activations;
- TAtomic ActivationsRevolvingCounter = 0;
+ TPerfectActivationQueue Activations;
volatile bool StopFlag = false;
public:
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity);
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index b27f98487f..a607435a1a 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -230,7 +230,7 @@ namespace NActors {
} while (true);
}
- ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
+ ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 /*revolvingCounter*/) {
TWorkerId workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
@@ -264,7 +264,7 @@ namespace NActors {
}
}
} else {
- if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
+ if (const ui32 activation = Activations.Pop()) {
if (workerId >= 0) {
AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_RUNNING);
}
@@ -320,8 +320,8 @@ namespace NActors {
}
}
- void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
- Activations.Push(activation, revolvingCounter);
+ void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 /*revolvingCounter*/) {
+ Activations.Push(activation);
bool needToWakeUp = false;
TAtomic x = AtomicGet(Semaphore);
diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp
index 9e1d06c9e2..52cc25bed2 100644
--- a/library/cpp/actors/core/executor_pool_io.cpp
+++ b/library/cpp/actors/core/executor_pool_io.cpp
@@ -22,11 +22,11 @@ namespace NActors {
TIOExecutorPool::~TIOExecutorPool() {
Threads.Destroy();
- while (ThreadQueue.Pop(0))
+ while (ThreadQueue.Pop())
;
}
- ui32 TIOExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
+ ui32 TIOExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 /*revolvingCounter*/) {
i16 workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
@@ -38,7 +38,7 @@ namespace NActors {
const TAtomic x = AtomicDecrement(Semaphore);
if (x < 0) {
TThreadCtx& threadCtx = Threads[workerId];
- ThreadQueue.Push(workerId + 1, revolvingCounter);
+ ThreadQueue.Push(workerId + 1);
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;
if (threadCtx.Pad.Park())
@@ -48,7 +48,7 @@ namespace NActors {
}
while (!RelaxedLoad(&StopFlag)) {
- if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
+ if (const ui32 activation = Activations.Pop()) {
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;
wctx.AddElapsedCycles(ActorSystemIndex, elapsed);
@@ -86,12 +86,12 @@ namespace NActors {
ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
}
- void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) {
- Activations.Push(activation, revolvingWriteCounter);
+ void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 /*revolvingWriteCounter*/) {
+ Activations.Push(activation);
const TAtomic x = AtomicIncrement(Semaphore);
if (x <= 0) {
- for (;; ++revolvingWriteCounter) {
- if (const ui32 x = ThreadQueue.Pop(revolvingWriteCounter)) {
+ for (;;) {
+ if (const ui32 x = ThreadQueue.Pop()) {
const ui32 threadIdx = x - 1;
Threads[threadIdx].Pad.Unpark();
return;
diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h
index f3f1a11819..dc4e872df3 100644
--- a/library/cpp/actors/core/executor_pool_io.h
+++ b/library/cpp/actors/core/executor_pool_io.h
@@ -18,7 +18,7 @@ namespace NActors {
};
TArrayHolder<TThreadCtx> Threads;
- TUnorderedCache<ui32, 512, 4> ThreadQueue;
+ TPerfectActivationQueue ThreadQueue;
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
TTicketLock ScheduleLock;
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index 2e689dec59..cdd37a4fbd 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -33,7 +33,7 @@ namespace NActors {
TAtomic Waiters = 0; // Number of idle cpus, waiting for activations in this pool
char Padding[64 - sizeof(TAtomic)];
- TUnorderedCache<ui32, 512, 4> Activations; // MPMC-queue for mailbox activations
+ TPerfectActivationQueue Activations; // MPMC-queue for mailbox activations
TAtomic Active = 0; // Number of mailboxes ready for execution or currently executing
TAtomic Tokens = 0; // Pending tokens (token is required for worker to start execution, guarantees concurrency limit and activation availability)
volatile bool StopFlag = false;
@@ -51,7 +51,7 @@ namespace NActors {
TStackVec<TCpu*, 15> WakeOrderCpus;
~TPool() {
- while (Activations.Pop(0)) {}
+ while (Activations.Pop()) {}
}
void Stop() {
@@ -63,8 +63,8 @@ namespace NActors {
}
// Add activation of newly scheduled mailbox. Returns generated token (unless concurrency is exceeded)
- bool PushActivation(ui32 activation, ui64 revolvingCounter) {
- Activations.Push(activation, revolvingCounter);
+ bool PushActivation(ui32 activation, ui64 /*revolvingCounter*/) {
+ Activations.Push(activation);
TAtomicBase active = AtomicIncrement(Active);
if (active <= Concurrency) { // token generated
AtomicIncrement(Tokens);
@@ -103,9 +103,9 @@ namespace NActors {
}
// Get activation. Requires acquired token.
- void BeginExecution(ui32& activation, ui64 revolvingCounter) {
+ void BeginExecution(ui32& activation, ui64 /*revolvingCounter*/) {
while (!RelaxedLoad(&StopFlag)) {
- if (activation = Activations.Pop(++revolvingCounter)) {
+ if (activation = Activations.Pop()) {
return;
}
SpinLockPause();
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index b28fdc0771..b0d9a9802d 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -93,15 +93,15 @@ namespace NActors {
}
}
- while (MailboxCacheSimple.Pop(0))
+ while (MailboxCacheSimple.Pop())
;
- while (MailboxCacheRevolving.Pop(0))
+ while (MailboxCacheRevolving.Pop())
;
- while (MailboxCacheHTSwap.Pop(0))
+ while (MailboxCacheHTSwap.Pop())
;
- while (MailboxCacheReadAsFilled.Pop(0))
+ while (MailboxCacheReadAsFilled.Pop())
;
- while (MailboxCacheTinyReadAsFilled.Pop(0))
+ while (MailboxCacheTinyReadAsFilled.Pop())
;
}
@@ -276,11 +276,11 @@ namespace NActors {
return x;
}
- ui32 TMailboxTable::TryAllocateMailbox(TMailboxType::EType type, ui64 revolvingCounter) {
+ ui32 TMailboxTable::TryAllocateMailbox(TMailboxType::EType type, ui64 /*revolvingCounter*/) {
switch (type) {
case TMailboxType::Simple:
do {
- if (ui32 ret = MailboxCacheSimple.Pop(revolvingCounter)) {
+ if (ui32 ret = MailboxCacheSimple.Pop()) {
AtomicDecrement(CachedSimpleMailboxes);
return ret;
}
@@ -288,7 +288,7 @@ namespace NActors {
return 0;
case TMailboxType::Revolving:
do {
- if (ui32 ret = MailboxCacheRevolving.Pop(revolvingCounter)) {
+ if (ui32 ret = MailboxCacheRevolving.Pop()) {
AtomicDecrement(CachedRevolvingMailboxes);
return ret;
}
@@ -296,7 +296,7 @@ namespace NActors {
return 0;
case TMailboxType::HTSwap:
do {
- if (ui32 ret = MailboxCacheHTSwap.Pop(revolvingCounter)) {
+ if (ui32 ret = MailboxCacheHTSwap.Pop()) {
AtomicDecrement(CachedHTSwapMailboxes);
return ret;
}
@@ -304,7 +304,7 @@ namespace NActors {
return 0;
case TMailboxType::ReadAsFilled:
do {
- if (ui32 ret = MailboxCacheReadAsFilled.Pop(revolvingCounter)) {
+ if (ui32 ret = MailboxCacheReadAsFilled.Pop()) {
AtomicDecrement(CachedReadAsFilledMailboxes);
return ret;
}
@@ -312,7 +312,7 @@ namespace NActors {
return 0;
case TMailboxType::TinyReadAsFilled:
do {
- if (ui32 ret = MailboxCacheTinyReadAsFilled.Pop(revolvingCounter)) {
+ if (ui32 ret = MailboxCacheTinyReadAsFilled.Pop()) {
AtomicDecrement(CachedTinyReadAsFilledMailboxes);
return ret;
}
@@ -329,27 +329,27 @@ namespace NActors {
template
bool TMailboxTable::GenericSendTo<&IExecutorPool::SpecificScheduleActivation>(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool);
- void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 revolvingCounter) {
+ void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 /*revolvingCounter*/) {
if (hint != 0) {
switch (type) {
case TMailboxType::Simple:
- MailboxCacheSimple.Push(hint, revolvingCounter);
+ MailboxCacheSimple.Push(hint);
AtomicIncrement(CachedSimpleMailboxes);
break;
case TMailboxType::Revolving:
- MailboxCacheRevolving.Push(hint, revolvingCounter);
+ MailboxCacheRevolving.Push(hint);
AtomicIncrement(CachedRevolvingMailboxes);
break;
case TMailboxType::HTSwap:
- MailboxCacheHTSwap.Push(hint, revolvingCounter);
+ MailboxCacheHTSwap.Push(hint);
AtomicIncrement(CachedHTSwapMailboxes);
break;
case TMailboxType::ReadAsFilled:
- MailboxCacheReadAsFilled.Push(hint, revolvingCounter);
+ MailboxCacheReadAsFilled.Push(hint);
AtomicIncrement(CachedReadAsFilledMailboxes);
break;
case TMailboxType::TinyReadAsFilled:
- MailboxCacheTinyReadAsFilled.Push(hint, revolvingCounter);
+ MailboxCacheTinyReadAsFilled.Push(hint);
AtomicIncrement(CachedTinyReadAsFilledMailboxes);
break;
default:
@@ -571,7 +571,7 @@ namespace NActors {
ui32 bufIndex;
for (bufIndex = 0; index != endIndex && bufIndex != bufSize; ++bufIndex, ++index)
buf[bufIndex] = lineIndexMask | index;
- cache->PushBulk(buf, bufIndex, index);
+ cache->PushBulk(buf, bufIndex);
AtomicAdd(*counter, bufIndex);
}
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 0f1c3abc10..27a668aa0c 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -5,6 +5,7 @@
#include "executor_pool.h"
#include "mailbox_queue_simple.h"
#include "mailbox_queue_revolving.h"
+#include "perfect_activation_queue.h"
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/threading/queue/mpsc_htswap.h>
#include <library/cpp/threading/queue/mpsc_read_as_filled.h>
@@ -300,7 +301,7 @@ namespace NActors {
TAtomic LastAllocatedLine;
TAtomic AllocatedMailboxCount;
- typedef TUnorderedCache<ui32, 512, 4> TMailboxCache;
+ using TMailboxCache = TPerfectActivationQueue;
TMailboxCache MailboxCacheSimple;
TAtomic CachedSimpleMailboxes;
TMailboxCache MailboxCacheRevolving;
diff --git a/library/cpp/actors/core/perfect_activation_queue.cpp b/library/cpp/actors/core/perfect_activation_queue.cpp
new file mode 100644
index 0000000000..d1975b5a47
--- /dev/null
+++ b/library/cpp/actors/core/perfect_activation_queue.cpp
@@ -0,0 +1,95 @@
+#include "perfect_activation_queue.h"
+
+#include <util/string/builder.h>
+
+namespace NActors {
+
+ static constexpr ui32 SizeBits = 20;
+ static constexpr ui32 SizeItems = 1 << SizeBits;
+ static constexpr ui32 SizeMask = SizeItems - 1;
+
+ // index conversion: CCCCCCCCCBBBBBAAA is converted to CCCCCCCCCAAABBBBB
+ static constexpr ui32 StripeSizeBitsA = 4; // makes a cacheline of 64 bytes
+ static constexpr ui32 StripeSizeBitsB = 4;
+
+ static void *AllocateItems(size_t size, size_t alignment) {
+#if defined(_win_)
+ return _aligned_malloc(size, alignment);
+#else
+ return aligned_alloc(alignment, size);
+#endif
+ }
+
+ TPerfectActivationQueue::TPerfectActivationQueue()
+ : QueueData(static_cast<TItem*>(AllocateItems(SizeItems * sizeof(TItem), 4096)))
+ {
+ Y_VERIFY(QueueData);
+ memset(QueueData, 0, SizeItems * sizeof(TItem));
+ }
+
+ TPerfectActivationQueue::~TPerfectActivationQueue() {
+ free(QueueData);
+ }
+
+ void TPerfectActivationQueue::Push(ui32 value) {
+ Y_VERIFY(value && value != Max<ui32>());
+ for (;;) {
+ const ui64 writeIndex = WriteIndex++;
+ TItem *ptr = QueueData + ConvertIndex(writeIndex & SizeMask);
+ const TItem cell = __atomic_exchange_n(ptr, value, __ATOMIC_SEQ_CST);
+
+ if (!cell) { // cell was free
+ return;
+ } else if (cell == Max<ui32>()) { // cell was forbidden
+ __atomic_store_n(ptr, 0, __ATOMIC_SEQ_CST);
+ } else {
+ Y_FAIL();
+ }
+ }
+ }
+
+ ui32 TPerfectActivationQueue::Pop() {
+ for (;;) {
+ const ui64 readIndex = ReadIndex++;
+ TItem *ptr = QueueData + ConvertIndex(readIndex & SizeMask);
+ TItem cell = __atomic_exchange_n(ptr, 0, __ATOMIC_SEQ_CST);
+ if (cell) {
+ Y_VERIFY(cell < Max<ui32>());
+ return cell;
+ }
+
+ // value is either not yet written, or this is a free cell -- forbid this cell for now
+ for (ui32 i = 0; i < 10; ++i) {
+ _mm_pause();
+ }
+
+ cell = __atomic_exchange_n(ptr, Max<ui32>(), __ATOMIC_SEQ_CST);
+ if (cell) {
+ Y_VERIFY(cell < Max<ui32>());
+ __atomic_store_n(ptr, 0, __ATOMIC_SEQ_CST);
+ } else if (ReadIndex < WriteIndex) {
+ continue; // give it another try, it was just a race
+ }
+
+ return cell;
+ }
+ }
+
+ void TPerfectActivationQueue::PushBulk(ui32 *values, size_t count) {
+ while (count--) {
+ Push(*values++);
+ }
+ }
+
+ ui32 TPerfectActivationQueue::ConvertIndex(ui32 index) {
+ constexpr ui32 maskA = (1 << StripeSizeBitsA) - 1;
+ const ui32 a = index & maskA;
+
+ constexpr ui32 maskB = (1 << StripeSizeBitsB) - 1;
+ const ui32 b = index >> StripeSizeBitsA & maskB;
+
+ const ui32 other = index >> StripeSizeBitsA + StripeSizeBitsB;
+ return other << StripeSizeBitsA + StripeSizeBitsB | a << StripeSizeBitsB | b;
+ }
+
+}
diff --git a/library/cpp/actors/core/perfect_activation_queue.h b/library/cpp/actors/core/perfect_activation_queue.h
new file mode 100644
index 0000000000..79e08c27d6
--- /dev/null
+++ b/library/cpp/actors/core/perfect_activation_queue.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NActors {
+
+ class TPerfectActivationQueue {
+ using TItem = ui32;
+
+ alignas(64) std::atomic_uint64_t ReadIndex = 0;
+ alignas(64) std::atomic_uint64_t WriteIndex = 0;
+
+ TItem *QueueData;
+
+ public:
+ static constexpr ui32 Concurrency = 1;
+
+ public:
+ TPerfectActivationQueue();
+ ~TPerfectActivationQueue();
+ void Push(ui32 value);
+ ui32 Pop();
+ void PushBulk(ui32 *values, size_t count);
+
+ private:
+ static ui32 ConvertIndex(ui32 index);
+ };
+
+} // NActors
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 649effd22f..0eb021d871 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -85,6 +85,8 @@ SRCS(
monotonic_provider.h
worker_context.cpp
worker_context.h
+ perfect_activation_queue.cpp
+ perfect_activation_queue.h
probes.cpp
probes.h
process_stats.cpp