diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | 2909866fbc652492b7d7cab3023cb19489dc4fd8 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/threading/queue/mpsc_intrusive_unordered.cpp | |
parent | d3530b2692e400bd4d29bd4f07cafaee139164e7 (diff) | |
download | ydb-2909866fbc652492b7d7cab3023cb19489dc4fd8.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/queue/mpsc_intrusive_unordered.cpp')
-rw-r--r-- | library/cpp/threading/queue/mpsc_intrusive_unordered.cpp | 158 |
1 files changed, 79 insertions, 79 deletions
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp index a6a2fcef39..3bb1a04f7e 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp @@ -1,79 +1,79 @@ -#include "mpsc_intrusive_unordered.h" -#include <util/system/atomic.h> - -namespace NThreading { - void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { - auto head = AtomicGet(HeadForCaS); - for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { - // no ABA here, because Next is exactly head - // it does not matter how many travels head was made/ - node->Next = head; - auto prev = AtomicGetAndCas(&HeadForCaS, node, head); - if (head == prev) { - return; - } - head = prev; - } - // boring of trying to do cas, let's just swap - - // no need for atomic here, because the next is atomic swap - node->Next = 0; - - head = AtomicSwap(&HeadForSwap, node); - if (head != nullptr) { - AtomicSet(node->Next, head); - } else { - // consumer must know if no other thread may access the memory, - // setting Next to node is a way to notify consumer - AtomicSet(node->Next, node); - } - } - - TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { - if (NotReadyChain == nullptr) { - auto head = AtomicSwap(&HeadForSwap, nullptr); - NotReadyChain = head; - } - - if (NotReadyChain != nullptr) { - auto next = AtomicGet(NotReadyChain->Next); - if (next != nullptr) { - auto ready = NotReadyChain; - TIntrusiveNode* cut; - do { - cut = NotReadyChain; - NotReadyChain = next; - next = AtomicGet(NotReadyChain->Next); - if (next == NotReadyChain) { - cut = NotReadyChain; - NotReadyChain = nullptr; - break; - } - } while (next != nullptr); - cut->Next = nullptr; - return ready; - } - } - - if (AtomicGet(HeadForCaS) != nullptr) { - return AtomicSwap(&HeadForCaS, nullptr); - } - return nullptr; - } - - TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { - if (PopOneQueue != nullptr) { - auto head = PopOneQueue; - PopOneQueue = PopOneQueue->Next; - return head; - } - - PopOneQueue = PopMany(); - if (PopOneQueue != nullptr) { - auto head = PopOneQueue; - PopOneQueue = PopOneQueue->Next; - return head; - } - return nullptr; - } -} +#include "mpsc_intrusive_unordered.h" +#include <util/system/atomic.h> + +namespace NThreading { + void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept { + auto head = AtomicGet(HeadForCaS); + for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) { + // no ABA here, because Next is exactly head + // it does not matter how many travels head was made/ + node->Next = head; + auto prev = AtomicGetAndCas(&HeadForCaS, node, head); + if (head == prev) { + return; + } + head = prev; + } + // boring of trying to do cas, let's just swap + + // no need for atomic here, because the next is atomic swap + node->Next = 0; + + head = AtomicSwap(&HeadForSwap, node); + if (head != nullptr) { + AtomicSet(node->Next, head); + } else { + // consumer must know if no other thread may access the memory, + // setting Next to node is a way to notify consumer + AtomicSet(node->Next, node); + } + } + + TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept { + if (NotReadyChain == nullptr) { + auto head = AtomicSwap(&HeadForSwap, nullptr); + NotReadyChain = head; + } + + if (NotReadyChain != nullptr) { + auto next = AtomicGet(NotReadyChain->Next); + if (next != nullptr) { + auto ready = NotReadyChain; + TIntrusiveNode* cut; + do { + cut = NotReadyChain; + NotReadyChain = next; + next = AtomicGet(NotReadyChain->Next); + if (next == NotReadyChain) { + cut = NotReadyChain; + NotReadyChain = nullptr; + break; + } + } while (next != nullptr); + cut->Next = nullptr; + return ready; + } + } + + if (AtomicGet(HeadForCaS) != nullptr) { + return AtomicSwap(&HeadForCaS, nullptr); + } + return nullptr; + } + + TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept { + if (PopOneQueue != nullptr) { + auto head = PopOneQueue; + PopOneQueue = PopOneQueue->Next; + return head; + } + + PopOneQueue = PopMany(); + if (PopOneQueue != nullptr) { + auto head = PopOneQueue; + PopOneQueue = PopOneQueue->Next; + return head; + } + return nullptr; + } +} |