diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/queue/mpmc_unordered_ring.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/queue/mpmc_unordered_ring.cpp')
-rw-r--r-- | library/cpp/threading/queue/mpmc_unordered_ring.cpp | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp new file mode 100644 index 0000000000..160547f594 --- /dev/null +++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp @@ -0,0 +1,74 @@ +#include "mpmc_unordered_ring.h" + +namespace NThreading { + TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { + Y_VERIFY(size > 0); + RingSize = size; + RingBuffer.Reset(new void*[size]); + memset(&RingBuffer[0], 0, sizeof(void*) * size); + } + + bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { + if (retryCount == 0) { + StubbornPush(msg); + return true; + } + for (ui16 itry = retryCount; itry-- > 0;) { + if (WeakPush(msg)) { + return true; + } + } + return false; + } + + bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { + auto pawl = AtomicIncrement(WritePawl); + if (pawl - AtomicGet(ReadFront) >= RingSize) { + // Queue is full + AtomicDecrement(WritePawl); + return false; + } + + auto writeSlot = AtomicGetAndIncrement(WriteFront); + if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { + return true; + } + // slot is occupied for some reason, retry + return false; + } + + void* TMPMCUnorderedRing::Pop() noexcept { + ui64 readSlot; + + for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { + auto pawl = AtomicIncrement(ReadPawl); + if (pawl > AtomicGet(WriteFront)) { + // Queue is empty + AtomicDecrement(ReadPawl); + return nullptr; + } + + readSlot = AtomicGetAndIncrement(ReadFront); + + auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); + if (msg != nullptr) { + return msg; + } + } + + /* got no message in the slot, let's try to rollback readfront */ + AtomicCas(&ReadFront, readSlot - 1, readSlot); + return nullptr; + } + + void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { + for (; *last < RingSize;) { + auto msg = AtomicSwap(&RingBuffer[*last], nullptr); + ++*last; + if (msg != nullptr) { + return msg; + } + } + return nullptr; + } +} |