diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/queue/mpsc_intrusive_unordered.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/queue/mpsc_intrusive_unordered.cpp')
-rw-r--r-- | library/cpp/threading/queue/mpsc_intrusive_unordered.cpp | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp new file mode 100644 index 0000000000..3bb1a04f7e --- /dev/null +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp @@ -0,0 +1,79 @@ +#include "mpsc_intrusive_unordered.h" +#include <util/system/atomic.h> + +namespace NThreading { + void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { + auto head = AtomicGet(HeadForCaS); + for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { + // no ABA here, because Next is exactly head + // it does not matter how many travels head was made/ + node->Next = head; + auto prev = AtomicGetAndCas(&HeadForCaS, node, head); + if (head == prev) { + return; + } + head = prev; + } + // boring of trying to do cas, let's just swap + + // no need for atomic here, because the next is atomic swap + node->Next = 0; + + head = AtomicSwap(&HeadForSwap, node); + if (head != nullptr) { + AtomicSet(node->Next, head); + } else { + // consumer must know if no other thread may access the memory, + // setting Next to node is a way to notify consumer + AtomicSet(node->Next, node); + } + } + + TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { + if (NotReadyChain == nullptr) { + auto head = AtomicSwap(&HeadForSwap, nullptr); + NotReadyChain = head; + } + + if (NotReadyChain != nullptr) { + auto next = AtomicGet(NotReadyChain->Next); + if (next != nullptr) { + auto ready = NotReadyChain; + TIntrusiveNode* cut; + do { + cut = NotReadyChain; + NotReadyChain = next; + next = AtomicGet(NotReadyChain->Next); + if (next == NotReadyChain) { + cut = NotReadyChain; + NotReadyChain = nullptr; + break; + } + } while (next != nullptr); + cut->Next = nullptr; + return ready; + } + } + + if (AtomicGet(HeadForCaS) != nullptr) { + return AtomicSwap(&HeadForCaS, nullptr); + } + return nullptr; + } + + TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { + if (PopOneQueue != nullptr) { + auto head = PopOneQueue; + PopOneQueue = PopOneQueue->Next; + return head; + } + + PopOneQueue = PopMany(); + if (PopOneQueue != nullptr) { + auto head = PopOneQueue; + PopOneQueue = PopOneQueue->Next; + return head; + } + return nullptr; + } +} |