aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
diff options
context:
space:
mode:
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commit2909866fbc652492b7d7cab3023cb19489dc4fd8 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
parentd3530b2692e400bd4d29bd4f07cafaee139164e7 (diff)
downloadydb-2909866fbc652492b7d7cab3023cb19489dc4fd8.tar.gz
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/queue/mpsc_intrusive_unordered.cpp')
-rw-r--r--library/cpp/threading/queue/mpsc_intrusive_unordered.cpp158
1 files changed, 79 insertions, 79 deletions
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
index a6a2fcef39..3bb1a04f7e 100644
--- a/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.cpp
@@ -1,79 +1,79 @@
-#include "mpsc_intrusive_unordered.h"
-#include <util/system/atomic.h>
-
-namespace NThreading {
- void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
- auto head = AtomicGet(HeadForCaS);
- for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
- // no ABA here, because Next is exactly head
- // it does not matter how many travels head was made/
- node->Next = head;
- auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
- if (head == prev) {
- return;
- }
- head = prev;
- }
- // boring of trying to do cas, let's just swap
-
- // no need for atomic here, because the next is atomic swap
- node->Next = 0;
-
- head = AtomicSwap(&HeadForSwap, node);
- if (head != nullptr) {
- AtomicSet(node->Next, head);
- } else {
- // consumer must know if no other thread may access the memory,
- // setting Next to node is a way to notify consumer
- AtomicSet(node->Next, node);
- }
- }
-
- TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
- if (NotReadyChain == nullptr) {
- auto head = AtomicSwap(&HeadForSwap, nullptr);
- NotReadyChain = head;
- }
-
- if (NotReadyChain != nullptr) {
- auto next = AtomicGet(NotReadyChain->Next);
- if (next != nullptr) {
- auto ready = NotReadyChain;
- TIntrusiveNode* cut;
- do {
- cut = NotReadyChain;
- NotReadyChain = next;
- next = AtomicGet(NotReadyChain->Next);
- if (next == NotReadyChain) {
- cut = NotReadyChain;
- NotReadyChain = nullptr;
- break;
- }
- } while (next != nullptr);
- cut->Next = nullptr;
- return ready;
- }
- }
-
- if (AtomicGet(HeadForCaS) != nullptr) {
- return AtomicSwap(&HeadForCaS, nullptr);
- }
- return nullptr;
- }
-
- TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
- if (PopOneQueue != nullptr) {
- auto head = PopOneQueue;
- PopOneQueue = PopOneQueue->Next;
- return head;
- }
-
- PopOneQueue = PopMany();
- if (PopOneQueue != nullptr) {
- auto head = PopOneQueue;
- PopOneQueue = PopOneQueue->Next;
- return head;
- }
- return nullptr;
- }
-}
+#include "mpsc_intrusive_unordered.h"
+#include <util/system/atomic.h>
+
+namespace NThreading {
+ void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
+ auto head = AtomicGet(HeadForCaS);
+ for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
+ // no ABA here, because Next is exactly head
+ // it does not matter how many travels head was made/
+ node->Next = head;
+ auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
+ if (head == prev) {
+ return;
+ }
+ head = prev;
+ }
+ // boring of trying to do cas, let's just swap
+
+ // no need for atomic here, because the next is atomic swap
+ node->Next = 0;
+
+ head = AtomicSwap(&HeadForSwap, node);
+ if (head != nullptr) {
+ AtomicSet(node->Next, head);
+ } else {
+ // consumer must know if no other thread may access the memory,
+ // setting Next to node is a way to notify consumer
+ AtomicSet(node->Next, node);
+ }
+ }
+
+ TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
+ if (NotReadyChain == nullptr) {
+ auto head = AtomicSwap(&HeadForSwap, nullptr);
+ NotReadyChain = head;
+ }
+
+ if (NotReadyChain != nullptr) {
+ auto next = AtomicGet(NotReadyChain->Next);
+ if (next != nullptr) {
+ auto ready = NotReadyChain;
+ TIntrusiveNode* cut;
+ do {
+ cut = NotReadyChain;
+ NotReadyChain = next;
+ next = AtomicGet(NotReadyChain->Next);
+ if (next == NotReadyChain) {
+ cut = NotReadyChain;
+ NotReadyChain = nullptr;
+ break;
+ }
+ } while (next != nullptr);
+ cut->Next = nullptr;
+ return ready;
+ }
+ }
+
+ if (AtomicGet(HeadForCaS) != nullptr) {
+ return AtomicSwap(&HeadForCaS, nullptr);
+ }
+ return nullptr;
+ }
+
+ TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+
+ PopOneQueue = PopMany();
+ if (PopOneQueue != nullptr) {
+ auto head = PopOneQueue;
+ PopOneQueue = PopOneQueue->Next;
+ return head;
+ }
+ return nullptr;
+ }
+}