diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | d3530b2692e400bd4d29bd4f07cafaee139164e7 (patch) | |
tree | b7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue/mpmc_unordered_ring.cpp | |
parent | 0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff) | |
download | ydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue/mpmc_unordered_ring.cpp')
-rw-r--r-- | library/cpp/threading/queue/mpmc_unordered_ring.cpp | 148 |
1 files changed, 74 insertions, 74 deletions
diff --git a/library/cpp/threading/queue/mpmc_unordered_ring.cpp b/library/cpp/threading/queue/mpmc_unordered_ring.cpp index 160547f594..df48182210 100644 --- a/library/cpp/threading/queue/mpmc_unordered_ring.cpp +++ b/library/cpp/threading/queue/mpmc_unordered_ring.cpp @@ -1,74 +1,74 @@ -#include "mpmc_unordered_ring.h" - -namespace NThreading { - TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { - Y_VERIFY(size > 0); - RingSize = size; - RingBuffer.Reset(new void*[size]); - memset(&RingBuffer[0], 0, sizeof(void*) * size); - } - - bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { - if (retryCount == 0) { - StubbornPush(msg); - return true; - } - for (ui16 itry = retryCount; itry-- > 0;) { - if (WeakPush(msg)) { - return true; - } - } - return false; - } - - bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { - auto pawl = AtomicIncrement(WritePawl); - if (pawl - AtomicGet(ReadFront) >= RingSize) { - // Queue is full - AtomicDecrement(WritePawl); - return false; - } - - auto writeSlot = AtomicGetAndIncrement(WriteFront); - if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { - return true; - } - // slot is occupied for some reason, retry - return false; - } - - void* TMPMCUnorderedRing::Pop() noexcept { - ui64 readSlot; - - for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { - auto pawl = AtomicIncrement(ReadPawl); - if (pawl > AtomicGet(WriteFront)) { - // Queue is empty - AtomicDecrement(ReadPawl); - return nullptr; - } - - readSlot = AtomicGetAndIncrement(ReadFront); - - auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); - if (msg != nullptr) { - return msg; - } - } - - /* got no message in the slot, let's try to rollback readfront */ - AtomicCas(&ReadFront, readSlot - 1, readSlot); - return nullptr; - } - - void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { - for (; *last < RingSize;) { - auto msg = AtomicSwap(&RingBuffer[*last], nullptr); - ++*last; - if (msg != nullptr) { - return msg; - } - } - return nullptr; - } -} +#include "mpmc_unordered_ring.h" + +namespace NThreading { + TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { + Y_VERIFY(size > 0); + RingSize = size; + RingBuffer.Reset(new void*[size]); + memset(&RingBuffer[0], 0, sizeof(void*) * size); + } + + bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { + if (retryCount == 0) { + StubbornPush(msg); + return true; + } + for (ui16 itry = retryCount; itry-- > 0;) { + if (WeakPush(msg)) { + return true; + } + } + return false; + } + + bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { + auto pawl = AtomicIncrement(WritePawl); + if (pawl - AtomicGet(ReadFront) >= RingSize) { + // Queue is full + AtomicDecrement(WritePawl); + return false; + } + + auto writeSlot = AtomicGetAndIncrement(WriteFront); + if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { + return true; + } + // slot is occupied for some reason, retry + return false; + } + + void* TMPMCUnorderedRing::Pop() noexcept { + ui64 readSlot; + + for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { + auto pawl = AtomicIncrement(ReadPawl); + if (pawl > AtomicGet(WriteFront)) { + // Queue is empty + AtomicDecrement(ReadPawl); + return nullptr; + } + + readSlot = AtomicGetAndIncrement(ReadFront); + + auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); + if (msg != nullptr) { + return msg; + } + } + + /* got no message in the slot, let's try to rollback readfront */ + AtomicCas(&ReadFront, readSlot - 1, readSlot); + return nullptr; + } + + void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { + for (; *last < RingSize;) { + auto msg = AtomicSwap(&RingBuffer[*last], nullptr); + ++*last; + if (msg != nullptr) { + return msg; + } + } + return nullptr; + } +} |