diff options
author | alexvru <alexvru@ydb.tech> | 2023-09-07 22:53:22 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-09-07 23:17:01 +0300 |
commit | c7138e84c19043351e6f4bd80a9e287a04a7a839 (patch) | |
tree | 243f25eda7b916acb507d689f10942c6f6f3c3f7 | |
parent | 40f9322d2bfaf54f78bce83bae3af79f73234488 (diff) | |
download | ydb-c7138e84c19043351e6f4bd80a9e287a04a7a839.tar.gz |
Introduce perfect activation queue
-rw-r--r-- | library/cpp/actors/core/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_io.cpp | 16 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_io.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united.cpp | 12 | ||||
-rw-r--r-- | library/cpp/actors/core/mailbox.cpp | 36 | ||||
-rw-r--r-- | library/cpp/actors/core/mailbox.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/perfect_activation_queue.cpp | 95 | ||||
-rw-r--r-- | library/cpp/actors/core/perfect_activation_queue.h | 29 | ||||
-rw-r--r-- | library/cpp/actors/core/ya.make | 2 |
15 files changed, 174 insertions, 43 deletions
diff --git a/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt index c5c1629513..392a1b5ea7 100644 --- a/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/actors/core/CMakeLists.darwin-x86_64.txt @@ -77,6 +77,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp diff --git a/library/cpp/actors/core/CMakeLists.linux-aarch64.txt b/library/cpp/actors/core/CMakeLists.linux-aarch64.txt index 23f4e3ba64..cf4e5c9440 100644 --- a/library/cpp/actors/core/CMakeLists.linux-aarch64.txt +++ b/library/cpp/actors/core/CMakeLists.linux-aarch64.txt @@ -78,6 +78,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp diff --git a/library/cpp/actors/core/CMakeLists.linux-x86_64.txt b/library/cpp/actors/core/CMakeLists.linux-x86_64.txt index 23f4e3ba64..cf4e5c9440 100644 --- a/library/cpp/actors/core/CMakeLists.linux-x86_64.txt +++ b/library/cpp/actors/core/CMakeLists.linux-x86_64.txt @@ -78,6 +78,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp diff --git a/library/cpp/actors/core/CMakeLists.windows-x86_64.txt b/library/cpp/actors/core/CMakeLists.windows-x86_64.txt index c5c1629513..392a1b5ea7 100644 --- a/library/cpp/actors/core/CMakeLists.windows-x86_64.txt +++ b/library/cpp/actors/core/CMakeLists.windows-x86_64.txt @@ -77,6 +77,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/worker_context.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/perfect_activation_queue.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/probes.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/process_stats.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/scheduler_actor.cpp diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 10362a7a2b..c8e3c0aa71 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -71,7 +71,7 @@ namespace NActors { {} TExecutorPoolBase::~TExecutorPoolBase() { - while (Activations.Pop(0)) + while (Activations.Pop()) ; } @@ -105,7 +105,7 @@ namespace NActors { } void TExecutorPoolBase::ScheduleActivation(ui32 activation) { - ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); + ScheduleActivationEx(activation, 0); } Y_FORCE_INLINE bool IsAllowedToCapture(IExecutorPool *self) { @@ -121,7 +121,7 @@ namespace NActors { TlsThreadContext->CapturedType = TlsThreadContext->SendingType; } if (activation) { - ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); + ScheduleActivationEx(activation, 0); } } diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index e94ffdbad9..6a321e32f8 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -3,6 +3,7 @@ #include "executor_pool.h" #include "executor_thread.h" #include "scheduler_queue.h" +#include "perfect_activation_queue.h" #include <library/cpp/actors/util/affinity.h> #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/actors/util/threadparkpad.h> @@ -45,8 +46,7 @@ namespace NActors { const i16 PoolThreads; TIntrusivePtr<TAffinity> ThreadsAffinity; TAtomic Semaphore = 0; - TUnorderedCache<ui32, 512, 4> Activations; - TAtomic ActivationsRevolvingCounter = 0; + TPerfectActivationQueue Activations; volatile bool StopFlag = false; public: TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity); diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index b27f98487f..a607435a1a 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -230,7 +230,7 @@ namespace NActors { } while (true); } - ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { + ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 /*revolvingCounter*/) { TWorkerId workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); @@ -264,7 +264,7 @@ namespace NActors { } } } else { - if (const ui32 activation = Activations.Pop(++revolvingCounter)) { + if (const ui32 activation = Activations.Pop()) { if (workerId >= 0) { AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_RUNNING); } @@ -320,8 +320,8 @@ namespace NActors { } } - void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { - Activations.Push(activation, revolvingCounter); + void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 /*revolvingCounter*/) { + Activations.Push(activation); bool needToWakeUp = false; TAtomic x = AtomicGet(Semaphore); diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp index 9e1d06c9e2..52cc25bed2 100644 --- a/library/cpp/actors/core/executor_pool_io.cpp +++ b/library/cpp/actors/core/executor_pool_io.cpp @@ -22,11 +22,11 @@ namespace NActors { TIOExecutorPool::~TIOExecutorPool() { Threads.Destroy(); - while (ThreadQueue.Pop(0)) + while (ThreadQueue.Pop()) ; } - ui32 TIOExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { + ui32 TIOExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 /*revolvingCounter*/) { i16 workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); @@ -38,7 +38,7 @@ namespace NActors { const TAtomic x = AtomicDecrement(Semaphore); if (x < 0) { TThreadCtx& threadCtx = Threads[workerId]; - ThreadQueue.Push(workerId + 1, revolvingCounter); + ThreadQueue.Push(workerId + 1); hpnow = GetCycleCountFast(); elapsed += hpnow - hpstart; if (threadCtx.Pad.Park()) @@ -48,7 +48,7 @@ namespace NActors { } while (!RelaxedLoad(&StopFlag)) { - if (const ui32 activation = Activations.Pop(++revolvingCounter)) { + if (const ui32 activation = Activations.Pop()) { hpnow = GetCycleCountFast(); elapsed += hpnow - hpstart; wctx.AddElapsedCycles(ActorSystemIndex, elapsed); @@ -86,12 +86,12 @@ namespace NActors { ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } - void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) { - Activations.Push(activation, revolvingWriteCounter); + void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 /*revolvingWriteCounter*/) { + Activations.Push(activation); const TAtomic x = AtomicIncrement(Semaphore); if (x <= 0) { - for (;; ++revolvingWriteCounter) { - if (const ui32 x = ThreadQueue.Pop(revolvingWriteCounter)) { + for (;;) { + if (const ui32 x = ThreadQueue.Pop()) { const ui32 threadIdx = x - 1; Threads[threadIdx].Pad.Unpark(); return; diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h index f3f1a11819..dc4e872df3 100644 --- a/library/cpp/actors/core/executor_pool_io.h +++ b/library/cpp/actors/core/executor_pool_io.h @@ -18,7 +18,7 @@ namespace NActors { }; TArrayHolder<TThreadCtx> Threads; - TUnorderedCache<ui32, 512, 4> ThreadQueue; + TPerfectActivationQueue ThreadQueue; THolder<NSchedulerQueue::TQueueType> ScheduleQueue; TTicketLock ScheduleLock; diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index 2e689dec59..cdd37a4fbd 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -33,7 +33,7 @@ namespace NActors { TAtomic Waiters = 0; // Number of idle cpus, waiting for activations in this pool char Padding[64 - sizeof(TAtomic)]; - TUnorderedCache<ui32, 512, 4> Activations; // MPMC-queue for mailbox activations + TPerfectActivationQueue Activations; // MPMC-queue for mailbox activations TAtomic Active = 0; // Number of mailboxes ready for execution or currently executing TAtomic Tokens = 0; // Pending tokens (token is required for worker to start execution, guarantees concurrency limit and activation availability) volatile bool StopFlag = false; @@ -51,7 +51,7 @@ namespace NActors { TStackVec<TCpu*, 15> WakeOrderCpus; ~TPool() { - while (Activations.Pop(0)) {} + while (Activations.Pop()) {} } void Stop() { @@ -63,8 +63,8 @@ namespace NActors { } // Add activation of newly scheduled mailbox. Returns generated token (unless concurrency is exceeded) - bool PushActivation(ui32 activation, ui64 revolvingCounter) { - Activations.Push(activation, revolvingCounter); + bool PushActivation(ui32 activation, ui64 /*revolvingCounter*/) { + Activations.Push(activation); TAtomicBase active = AtomicIncrement(Active); if (active <= Concurrency) { // token generated AtomicIncrement(Tokens); @@ -103,9 +103,9 @@ namespace NActors { } // Get activation. Requires acquired token. - void BeginExecution(ui32& activation, ui64 revolvingCounter) { + void BeginExecution(ui32& activation, ui64 /*revolvingCounter*/) { while (!RelaxedLoad(&StopFlag)) { - if (activation = Activations.Pop(++revolvingCounter)) { + if (activation = Activations.Pop()) { return; } SpinLockPause(); diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index b28fdc0771..b0d9a9802d 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -93,15 +93,15 @@ namespace NActors { } } - while (MailboxCacheSimple.Pop(0)) + while (MailboxCacheSimple.Pop()) ; - while (MailboxCacheRevolving.Pop(0)) + while (MailboxCacheRevolving.Pop()) ; - while (MailboxCacheHTSwap.Pop(0)) + while (MailboxCacheHTSwap.Pop()) ; - while (MailboxCacheReadAsFilled.Pop(0)) + while (MailboxCacheReadAsFilled.Pop()) ; - while (MailboxCacheTinyReadAsFilled.Pop(0)) + while (MailboxCacheTinyReadAsFilled.Pop()) ; } @@ -276,11 +276,11 @@ namespace NActors { return x; } - ui32 TMailboxTable::TryAllocateMailbox(TMailboxType::EType type, ui64 revolvingCounter) { + ui32 TMailboxTable::TryAllocateMailbox(TMailboxType::EType type, ui64 /*revolvingCounter*/) { switch (type) { case TMailboxType::Simple: do { - if (ui32 ret = MailboxCacheSimple.Pop(revolvingCounter)) { + if (ui32 ret = MailboxCacheSimple.Pop()) { AtomicDecrement(CachedSimpleMailboxes); return ret; } @@ -288,7 +288,7 @@ namespace NActors { return 0; case TMailboxType::Revolving: do { - if (ui32 ret = MailboxCacheRevolving.Pop(revolvingCounter)) { + if (ui32 ret = MailboxCacheRevolving.Pop()) { AtomicDecrement(CachedRevolvingMailboxes); return ret; } @@ -296,7 +296,7 @@ namespace NActors { return 0; case TMailboxType::HTSwap: do { - if (ui32 ret = MailboxCacheHTSwap.Pop(revolvingCounter)) { + if (ui32 ret = MailboxCacheHTSwap.Pop()) { AtomicDecrement(CachedHTSwapMailboxes); return ret; } @@ -304,7 +304,7 @@ namespace NActors { return 0; case TMailboxType::ReadAsFilled: do { - if (ui32 ret = MailboxCacheReadAsFilled.Pop(revolvingCounter)) { + if (ui32 ret = MailboxCacheReadAsFilled.Pop()) { AtomicDecrement(CachedReadAsFilledMailboxes); return ret; } @@ -312,7 +312,7 @@ namespace NActors { return 0; case TMailboxType::TinyReadAsFilled: do { - if (ui32 ret = MailboxCacheTinyReadAsFilled.Pop(revolvingCounter)) { + if (ui32 ret = MailboxCacheTinyReadAsFilled.Pop()) { AtomicDecrement(CachedTinyReadAsFilledMailboxes); return ret; } @@ -329,27 +329,27 @@ namespace NActors { template bool TMailboxTable::GenericSendTo<&IExecutorPool::SpecificScheduleActivation>(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool); - void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 revolvingCounter) { + void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 /*revolvingCounter*/) { if (hint != 0) { switch (type) { case TMailboxType::Simple: - MailboxCacheSimple.Push(hint, revolvingCounter); + MailboxCacheSimple.Push(hint); AtomicIncrement(CachedSimpleMailboxes); break; case TMailboxType::Revolving: - MailboxCacheRevolving.Push(hint, revolvingCounter); + MailboxCacheRevolving.Push(hint); AtomicIncrement(CachedRevolvingMailboxes); break; case TMailboxType::HTSwap: - MailboxCacheHTSwap.Push(hint, revolvingCounter); + MailboxCacheHTSwap.Push(hint); AtomicIncrement(CachedHTSwapMailboxes); break; case TMailboxType::ReadAsFilled: - MailboxCacheReadAsFilled.Push(hint, revolvingCounter); + MailboxCacheReadAsFilled.Push(hint); AtomicIncrement(CachedReadAsFilledMailboxes); break; case TMailboxType::TinyReadAsFilled: - MailboxCacheTinyReadAsFilled.Push(hint, revolvingCounter); + MailboxCacheTinyReadAsFilled.Push(hint); AtomicIncrement(CachedTinyReadAsFilledMailboxes); break; default: @@ -571,7 +571,7 @@ namespace NActors { ui32 bufIndex; for (bufIndex = 0; index != endIndex && bufIndex != bufSize; ++bufIndex, ++index) buf[bufIndex] = lineIndexMask | index; - cache->PushBulk(buf, bufIndex, index); + cache->PushBulk(buf, bufIndex); AtomicAdd(*counter, bufIndex); } diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 0f1c3abc10..27a668aa0c 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -5,6 +5,7 @@ #include "executor_pool.h" #include "mailbox_queue_simple.h" #include "mailbox_queue_revolving.h" +#include "perfect_activation_queue.h" #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/threading/queue/mpsc_htswap.h> #include <library/cpp/threading/queue/mpsc_read_as_filled.h> @@ -300,7 +301,7 @@ namespace NActors { TAtomic LastAllocatedLine; TAtomic AllocatedMailboxCount; - typedef TUnorderedCache<ui32, 512, 4> TMailboxCache; + using TMailboxCache = TPerfectActivationQueue; TMailboxCache MailboxCacheSimple; TAtomic CachedSimpleMailboxes; TMailboxCache MailboxCacheRevolving; diff --git a/library/cpp/actors/core/perfect_activation_queue.cpp b/library/cpp/actors/core/perfect_activation_queue.cpp new file mode 100644 index 0000000000..d1975b5a47 --- /dev/null +++ b/library/cpp/actors/core/perfect_activation_queue.cpp @@ -0,0 +1,95 @@ +#include "perfect_activation_queue.h" + +#include <util/string/builder.h> + +namespace NActors { + + static constexpr ui32 SizeBits = 20; + static constexpr ui32 SizeItems = 1 << SizeBits; + static constexpr ui32 SizeMask = SizeItems - 1; + + // index conversion: CCCCCCCCCBBBBBAAA is converted to CCCCCCCCCAAABBBBB + static constexpr ui32 StripeSizeBitsA = 4; // makes a cacheline of 64 bytes + static constexpr ui32 StripeSizeBitsB = 4; + + static void *AllocateItems(size_t size, size_t alignment) { +#if defined(_win_) + return _aligned_malloc(size, alignment); +#else + return aligned_alloc(alignment, size); +#endif + } + + TPerfectActivationQueue::TPerfectActivationQueue() + : QueueData(static_cast<TItem*>(AllocateItems(SizeItems * sizeof(TItem), 4096))) + { + Y_VERIFY(QueueData); + memset(QueueData, 0, SizeItems * sizeof(TItem)); + } + + TPerfectActivationQueue::~TPerfectActivationQueue() { + free(QueueData); + } + + void TPerfectActivationQueue::Push(ui32 value) { + Y_VERIFY(value && value != Max<ui32>()); + for (;;) { + const ui64 writeIndex = WriteIndex++; + TItem *ptr = QueueData + ConvertIndex(writeIndex & SizeMask); + const TItem cell = __atomic_exchange_n(ptr, value, __ATOMIC_SEQ_CST); + + if (!cell) { // cell was free + return; + } else if (cell == Max<ui32>()) { // cell was forbidden + __atomic_store_n(ptr, 0, __ATOMIC_SEQ_CST); + } else { + Y_FAIL(); + } + } + } + + ui32 TPerfectActivationQueue::Pop() { + for (;;) { + const ui64 readIndex = ReadIndex++; + TItem *ptr = QueueData + ConvertIndex(readIndex & SizeMask); + TItem cell = __atomic_exchange_n(ptr, 0, __ATOMIC_SEQ_CST); + if (cell) { + Y_VERIFY(cell < Max<ui32>()); + return cell; + } + + // value is either not yet written, or this is a free cell -- forbid this cell for now + for (ui32 i = 0; i < 10; ++i) { + _mm_pause(); + } + + cell = __atomic_exchange_n(ptr, Max<ui32>(), __ATOMIC_SEQ_CST); + if (cell) { + Y_VERIFY(cell < Max<ui32>()); + __atomic_store_n(ptr, 0, __ATOMIC_SEQ_CST); + } else if (ReadIndex < WriteIndex) { + continue; // give it another try, it was just a race + } + + return cell; + } + } + + void TPerfectActivationQueue::PushBulk(ui32 *values, size_t count) { + while (count--) { + Push(*values++); + } + } + + ui32 TPerfectActivationQueue::ConvertIndex(ui32 index) { + constexpr ui32 maskA = (1 << StripeSizeBitsA) - 1; + const ui32 a = index & maskA; + + constexpr ui32 maskB = (1 << StripeSizeBitsB) - 1; + const ui32 b = index >> StripeSizeBitsA & maskB; + + const ui32 other = index >> StripeSizeBitsA + StripeSizeBitsB; + return other << StripeSizeBitsA + StripeSizeBitsB | a << StripeSizeBitsB | b; + } + +} diff --git a/library/cpp/actors/core/perfect_activation_queue.h b/library/cpp/actors/core/perfect_activation_queue.h new file mode 100644 index 0000000000..79e08c27d6 --- /dev/null +++ b/library/cpp/actors/core/perfect_activation_queue.h @@ -0,0 +1,29 @@ +#pragma once + +#include "defs.h" + +namespace NActors { + + class TPerfectActivationQueue { + using TItem = ui32; + + alignas(64) std::atomic_uint64_t ReadIndex = 0; + alignas(64) std::atomic_uint64_t WriteIndex = 0; + + TItem *QueueData; + + public: + static constexpr ui32 Concurrency = 1; + + public: + TPerfectActivationQueue(); + ~TPerfectActivationQueue(); + void Push(ui32 value); + ui32 Pop(); + void PushBulk(ui32 *values, size_t count); + + private: + static ui32 ConvertIndex(ui32 index); + }; + +} // NActors diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 649effd22f..0eb021d871 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -85,6 +85,8 @@ SRCS( monotonic_provider.h worker_context.cpp worker_context.h + perfect_activation_queue.cpp + perfect_activation_queue.h probes.cpp probes.h process_stats.cpp |