aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/queue/mpsc_intrusive_unordered.cpp')
-rw-r--r--library/cpp/threading/queue/mpsc_intrusive_unordered.cpp79
1 files changed, 79 insertions, 0 deletions
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
new file mode 100644
index 0000000000..3bb1a04f7e
--- /dev/null
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
@@ -0,0 +1,79 @@
+#include "mpsc_intrusive_unordered.h"
+#include <util/system/atomic.h>
+
+namespace NThreading {
+ void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
+ auto head = AtomicGet(HeadForCaS);
+ for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
+ // no ABA here, because Next is exactly head
+ // it does not matter how many travels head was made/
+ node->Next = head;
+ auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
+ if (head == prev) {
+ return;
+ }
+ head = prev;
+ }
+ // boring of trying to do cas, let's just swap
+
+ // no need for atomic here, because the next is atomic swap
+ node->Next = 0;
+
+ head = AtomicSwap(&HeadForSwap, node);
+ if (head != nullptr) {
+ AtomicSet(node->Next, head);
+ } else {
+ // consumer must know if no other thread may access the memory,
+ // setting Next to node is a way to notify consumer
+ AtomicSet(node->Next, node);
+ }
+ }
+
+ TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
+ if (NotReadyChain == nullptr) {
+ auto head = AtomicSwap(&HeadForSwap, nullptr);
+ NotReadyChain = head;
+ }
+
+ if (NotReadyChain != nullptr) {
+ auto next = AtomicGet(NotReadyChain->Next);
+ if (next != nullptr) {
+ auto ready = NotReadyChain;
+ TIntrusiveNode* cut;
+ do {
+ cut = NotReadyChain;
+ NotReadyChain = next;
+ next = AtomicGet(NotReadyChain->Next);
+ if (next == NotReadyChain) {
+ cut = NotReadyChain;
+ NotReadyChain = nullptr;
+ break;
+ }
+ } while (next != nullptr);
+ cut->Next = nullptr;
+ return ready;
+ }
+ }
+
+ if (AtomicGet(HeadForCaS) != nullptr) {
+ return AtomicSwap(&HeadForCaS, nullptr);
+ }
+ return nullptr;
+ }
+
+ TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+
+ PopOneQueue = PopMany();
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+ return nullptr;
+ }
+}