aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/lfqueue.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /util/thread/lfqueue.h
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'util/thread/lfqueue.h')
-rw-r--r--util/thread/lfqueue.h98
1 files changed, 49 insertions, 49 deletions
diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h
index 1d2a776bbb..ab523631e4 100644
--- a/util/thread/lfqueue.h
+++ b/util/thread/lfqueue.h
@@ -5,15 +5,15 @@
#include <util/generic/ptr.h>
#include <util/system/atomic.h>
#include <util/system/yassert.h>
-#include "lfstack.h"
+#include "lfstack.h"
-struct TDefaultLFCounter {
- template <class T>
- void IncCount(const T& data) {
+struct TDefaultLFCounter {
+ template <class T>
+ void IncCount(const T& data) {
(void)data;
}
- template <class T>
- void DecCount(const T& data) {
+ template <class T>
+ void DecCount(const T& data) {
(void)data;
}
};
@@ -25,7 +25,7 @@ struct TDefaultLFCounter {
// it is TCounter class responsibility to check validity of passed object
template <class T, class TCounter>
class TLockFreeQueue: public TNonCopyable {
- struct TListNode {
+ struct TListNode {
template <typename U>
TListNode(U&& u, TListNode* next)
: Next(next)
@@ -39,30 +39,30 @@ class TLockFreeQueue: public TNonCopyable {
{
}
- TListNode* volatile Next;
+ TListNode* volatile Next;
T Data;
};
// using inheritance to be able to use 0 bytes for TCounter when we don't need one
- struct TRootNode: public TCounter {
- TListNode* volatile PushQueue;
- TListNode* volatile PopQueue;
- TListNode* volatile ToDelete;
- TRootNode* volatile NextFree;
+ struct TRootNode: public TCounter {
+ TListNode* volatile PushQueue;
+ TListNode* volatile PopQueue;
+ TListNode* volatile ToDelete;
+ TRootNode* volatile NextFree;
- TRootNode()
+ TRootNode()
: PushQueue(nullptr)
, PopQueue(nullptr)
, ToDelete(nullptr)
, NextFree(nullptr)
{
}
- void CopyCounter(TRootNode* x) {
+ void CopyCounter(TRootNode* x) {
*(TCounter*)this = *(TCounter*)x;
}
};
- static void EraseList(TListNode* n) {
+ static void EraseList(TListNode* n) {
while (n) {
TListNode* keepNext = AtomicGet(n->Next);
delete n;
@@ -75,12 +75,12 @@ class TLockFreeQueue: public TNonCopyable {
alignas(64) volatile TAtomic FreeingTaskCounter;
alignas(64) TRootNode* volatile FreePtr;
- void TryToFreeAsyncMemory() {
+ void TryToFreeAsyncMemory() {
TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0);
TRootNode* current = AtomicGet(FreePtr);
if (current == nullptr)
return;
- if (AtomicAdd(FreememCounter, 0) == 1) {
+ if (AtomicAdd(FreememCounter, 0) == 1) {
// we are the last thread, try to cleanup
// check if another thread have cleaned up
if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) {
@@ -98,24 +98,24 @@ class TLockFreeQueue: public TNonCopyable {
}
}
}
- void AsyncRef() {
- AtomicAdd(FreememCounter, 1);
+ void AsyncRef() {
+ AtomicAdd(FreememCounter, 1);
}
- void AsyncUnref() {
+ void AsyncUnref() {
TryToFreeAsyncMemory();
- AtomicAdd(FreememCounter, -1);
+ AtomicAdd(FreememCounter, -1);
}
- void AsyncDel(TRootNode* toDelete, TListNode* lst) {
+ void AsyncDel(TRootNode* toDelete, TListNode* lst) {
AtomicSet(toDelete->ToDelete, lst);
- for (;;) {
+ for (;;) {
AtomicSet(toDelete->NextFree, AtomicGet(FreePtr));
if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree)))
break;
}
}
- void AsyncUnref(TRootNode* toDelete, TListNode* lst) {
+ void AsyncUnref(TRootNode* toDelete, TListNode* lst) {
TryToFreeAsyncMemory();
- if (AtomicAdd(FreememCounter, -1) == 0) {
+ if (AtomicAdd(FreememCounter, -1) == 0) {
// no other operations in progress, can safely reclaim memory
EraseList(lst);
delete toDelete;
@@ -125,27 +125,27 @@ class TLockFreeQueue: public TNonCopyable {
}
}
- struct TListInvertor {
- TListNode* Copy;
- TListNode* Tail;
- TListNode* PrevFirst;
+ struct TListInvertor {
+ TListNode* Copy;
+ TListNode* Tail;
+ TListNode* PrevFirst;
- TListInvertor()
+ TListInvertor()
: Copy(nullptr)
, Tail(nullptr)
, PrevFirst(nullptr)
{
}
- ~TListInvertor() {
+ ~TListInvertor() {
EraseList(Copy);
}
- void CopyWasUsed() {
+ void CopyWasUsed() {
Copy = nullptr;
Tail = nullptr;
PrevFirst = nullptr;
}
- void DoCopy(TListNode* ptr) {
- TListNode* newFirst = ptr;
+ void DoCopy(TListNode* ptr) {
+ TListNode* newFirst = ptr;
TListNode* newCopy = nullptr;
TListNode* newTail = nullptr;
while (ptr) {
@@ -171,8 +171,8 @@ class TLockFreeQueue: public TNonCopyable {
}
};
- void EnqueueImpl(TListNode* head, TListNode* tail) {
- TRootNode* newRoot = new TRootNode;
+ void EnqueueImpl(TListNode* head, TListNode* tail) {
+ TRootNode* newRoot = new TRootNode;
AsyncRef();
AtomicSet(newRoot->PushQueue, head);
for (;;) {
@@ -225,14 +225,14 @@ class TLockFreeQueue: public TNonCopyable {
}
public:
- TLockFreeQueue()
+ TLockFreeQueue()
: JobQueue(new TRootNode)
, FreememCounter(0)
, FreeingTaskCounter(0)
, FreePtr(nullptr)
{
}
- ~TLockFreeQueue() {
+ ~TLockFreeQueue() {
AsyncRef();
AsyncUnref(); // should free FreeList
EraseList(JobQueue->PushQueue);
@@ -253,17 +253,17 @@ public:
EnqueueImpl(newNode, newNode);
}
template <typename TCollection>
- void EnqueueAll(const TCollection& data) {
+ void EnqueueAll(const TCollection& data) {
EnqueueAll(data.begin(), data.end());
}
template <typename TIter>
- void EnqueueAll(TIter dataBegin, TIter dataEnd) {
+ void EnqueueAll(TIter dataBegin, TIter dataEnd) {
if (dataBegin == dataEnd)
return;
TIter i = dataBegin;
TListNode* volatile node = new TListNode(*i);
- TListNode* volatile tail = node;
+ TListNode* volatile tail = node;
for (++i; i != dataEnd; ++i) {
TListNode* nextNode = node;
@@ -271,7 +271,7 @@ public:
}
EnqueueImpl(node, tail);
}
- bool Dequeue(T* data) {
+ bool Dequeue(T* data) {
TRootNode* newRoot = nullptr;
TListInvertor listInvertor;
AsyncRef();
@@ -288,7 +288,7 @@ public:
newRoot->CopyCounter(curRoot);
newRoot->DecCount(tail->Data);
Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail);
- if (AtomicCas(&JobQueue, newRoot, curRoot)) {
+ if (AtomicCas(&JobQueue, newRoot, curRoot)) {
*data = std::move(tail->Data);
AtomicSet(tail->Next, nullptr);
AsyncUnref(curRoot, tail);
@@ -309,7 +309,7 @@ public:
AtomicSet(newRoot->PopQueue, listInvertor.Copy);
newRoot->CopyCounter(curRoot);
Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr);
- if (AtomicCas(&JobQueue, newRoot, curRoot)) {
+ if (AtomicCas(&JobQueue, newRoot, curRoot)) {
newRoot = nullptr;
listInvertor.CopyWasUsed();
AsyncDel(curRoot, AtomicGet(curRoot->PushQueue));
@@ -343,14 +343,14 @@ public:
AsyncUnref(curRoot, toDeleteHead);
}
- bool IsEmpty() {
+ bool IsEmpty() {
AsyncRef();
TRootNode* curRoot = AtomicGet(JobQueue);
bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr;
AsyncUnref();
return res;
}
- TCounter GetCounter() {
+ TCounter GetCounter() {
AsyncRef();
TRootNode* curRoot = AtomicGet(JobQueue);
TCounter res = *(TCounter*)curRoot;
@@ -367,8 +367,8 @@ public:
inline ~TAutoLockFreeQueue() {
TRef tmp;
- while (Dequeue(&tmp)) {
- }
+ while (Dequeue(&tmp)) {
+ }
}
inline bool Dequeue(TRef* t) {