diff options
author | ironpeter <ironpeter@yandex-team.ru> | 2022-02-10 16:49:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:52 +0300 |
commit | edee5b99e1eec042f46725b89dcd81ea7e41d663 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /util | |
parent | ff97837ecc5972a00cb395483d8856566738375c (diff) | |
download | ydb-edee5b99e1eec042f46725b89dcd81ea7e41d663.tar.gz |
Restoring authorship annotation for <ironpeter@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'util')
-rw-r--r-- | util/generic/guid.cpp | 2 | ||||
-rw-r--r-- | util/memory/pool.h | 2 | ||||
-rw-r--r-- | util/system/atomic_gcc.h | 2 | ||||
-rw-r--r-- | util/system/atomic_win.h | 24 | ||||
-rw-r--r-- | util/system/file.cpp | 4 | ||||
-rw-r--r-- | util/system/hp_timer.cpp | 4 | ||||
-rw-r--r-- | util/system/shmat.cpp | 2 | ||||
-rw-r--r-- | util/system/shmat.h | 2 | ||||
-rw-r--r-- | util/system/thread.i | 2 | ||||
-rw-r--r-- | util/thread/lfqueue.h | 218 | ||||
-rw-r--r-- | util/thread/lfstack.h | 92 |
11 files changed, 177 insertions, 177 deletions
diff --git a/util/generic/guid.cpp b/util/generic/guid.cpp index 285a7bd25a..8b907457bc 100644 --- a/util/generic/guid.cpp +++ b/util/generic/guid.cpp @@ -1,4 +1,4 @@ -#include "guid.h" +#include "guid.h" #include "ylimits.h" #include "string.h" diff --git a/util/memory/pool.h b/util/memory/pool.h index fa36fac01d..13c8b6b9ed 100644 --- a/util/memory/pool.h +++ b/util/memory/pool.h @@ -4,7 +4,7 @@ #include <util/system/align.h> #include <util/system/yassert.h> -#include <util/generic/bitops.h> +#include <util/generic/bitops.h> #include <util/generic/utility.h> #include <util/generic/intrlist.h> #include <util/generic/strbuf.h> diff --git a/util/system/atomic_gcc.h b/util/system/atomic_gcc.h index 00bea6fd90..ed8dc2bdc5 100644 --- a/util/system/atomic_gcc.h +++ b/util/system/atomic_gcc.h @@ -4,7 +4,7 @@ : \ : \ : "memory") - + static inline TAtomicBase AtomicGet(const TAtomic& a) { TAtomicBase tmp; #if defined(_arm64_) diff --git a/util/system/atomic_win.h b/util/system/atomic_win.h index 6cbe8e86b6..65c290e6cc 100644 --- a/util/system/atomic_win.h +++ b/util/system/atomic_win.h @@ -49,45 +49,45 @@ static inline intptr_t AtomicGetAndCas(TAtomic* a, intptr_t exchange, intptr_t c } #else // _x86_64_ - + #pragma intrinsic(_InterlockedIncrement64) #pragma intrinsic(_InterlockedDecrement64) #pragma intrinsic(_InterlockedExchangeAdd64) #pragma intrinsic(_InterlockedExchange64) #pragma intrinsic(_InterlockedCompareExchange64) - + static inline intptr_t AtomicIncrement(TAtomic& a) { return _InterlockedIncrement64((volatile __int64*)&a); -} - +} + static inline intptr_t AtomicGetAndIncrement(TAtomic& a) { return _InterlockedIncrement64((volatile __int64*)&a) - 1; } static inline intptr_t AtomicDecrement(TAtomic& a) { return _InterlockedDecrement64((volatile __int64*)&a); -} - +} + static inline intptr_t AtomicGetAndDecrement(TAtomic& a) { return _InterlockedDecrement64((volatile __int64*)&a) + 1; } static inline intptr_t AtomicAdd(TAtomic& a, intptr_t b) { return _InterlockedExchangeAdd64((volatile __int64*)&a, b) + b; -} - +} + static inline intptr_t AtomicGetAndAdd(TAtomic& a, intptr_t b) { return _InterlockedExchangeAdd64((volatile __int64*)&a, b); } static inline intptr_t AtomicSwap(TAtomic* a, intptr_t b) { return _InterlockedExchange64((volatile __int64*)a, b); -} - +} + static inline bool AtomicCas(TAtomic* a, intptr_t exchange, intptr_t compare) { return _InterlockedCompareExchange64((volatile __int64*)a, exchange, compare) == compare; -} - +} + static inline intptr_t AtomicGetAndCas(TAtomic* a, intptr_t exchange, intptr_t compare) { return _InterlockedCompareExchange64((volatile __int64*)a, exchange, compare); } diff --git a/util/system/file.cpp b/util/system/file.cpp index 60a2ea666c..4a261d020c 100644 --- a/util/system/file.cpp +++ b/util/system/file.cpp @@ -255,9 +255,9 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { if (oMode & NoReadAhead) { ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_RANDOM); } - } + } #endif - + //temp file if (Fd_ >= 0 && (oMode & Transient)) { unlink(fName.data()); diff --git a/util/system/hp_timer.cpp b/util/system/hp_timer.cpp index 3b458428a0..e4c3f21e6b 100644 --- a/util/system/hp_timer.cpp +++ b/util/system/hp_timer.cpp @@ -1,6 +1,6 @@ -#include "hp_timer.h" +#include "hp_timer.h" -#include <util/generic/algorithm.h> +#include <util/generic/algorithm.h> #include <util/generic/singleton.h> #include <util/datetime/cputimer.h> diff --git a/util/system/shmat.cpp b/util/system/shmat.cpp index 3e82016614..07ff0d6caa 100644 --- a/util/system/shmat.cpp +++ b/util/system/shmat.cpp @@ -1,6 +1,6 @@ #include "shmat.h" -#include <util/generic/guid.h> +#include <util/generic/guid.h> #if defined(_win_) #include <stdio.h> diff --git a/util/system/shmat.h b/util/system/shmat.h index a2d289a9fd..d9da3c151a 100644 --- a/util/system/shmat.h +++ b/util/system/shmat.h @@ -3,7 +3,7 @@ #include "fhandle.h" #include <util/generic/ptr.h> -#include <util/generic/guid.h> +#include <util/generic/guid.h> class TSharedMemory: public TThrRefBase { TGUID Id; diff --git a/util/system/thread.i b/util/system/thread.i index 77067844d7..8cba505473 100644 --- a/util/system/thread.i +++ b/util/system/thread.i @@ -1,5 +1,5 @@ //do not use directly -#pragma once +#pragma once #include "platform.h" #if defined(_win_) diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h index 510c91e788..ab523631e4 100644 --- a/util/thread/lfqueue.h +++ b/util/thread/lfqueue.h @@ -1,12 +1,12 @@ #pragma once - + #include "fwd.h" #include <util/generic/ptr.h> -#include <util/system/atomic.h> +#include <util/system/atomic.h> #include <util/system/yassert.h> #include "lfstack.h" - + struct TDefaultLFCounter { template <class T> void IncCount(const T& data) { @@ -40,137 +40,137 @@ class TLockFreeQueue: public TNonCopyable { } TListNode* volatile Next; - T Data; - }; - + T Data; + }; + // using inheritance to be able to use 0 bytes for TCounter when we don't need one struct TRootNode: public TCounter { TListNode* volatile PushQueue; TListNode* volatile PopQueue; TListNode* volatile ToDelete; TRootNode* volatile NextFree; - + TRootNode() : PushQueue(nullptr) , PopQueue(nullptr) , ToDelete(nullptr) , NextFree(nullptr) - { - } + { + } void CopyCounter(TRootNode* x) { *(TCounter*)this = *(TCounter*)x; } - }; - + }; + static void EraseList(TListNode* n) { - while (n) { + while (n) { TListNode* keepNext = AtomicGet(n->Next); - delete n; - n = keepNext; - } - } + delete n; + n = keepNext; + } + } alignas(64) TRootNode* volatile JobQueue; alignas(64) volatile TAtomic FreememCounter; alignas(64) volatile TAtomic FreeingTaskCounter; alignas(64) TRootNode* volatile FreePtr; - + void TryToFreeAsyncMemory() { TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0); TRootNode* current = AtomicGet(FreePtr); if (current == nullptr) - return; + return; if (AtomicAdd(FreememCounter, 0) == 1) { - // we are the last thread, try to cleanup + // we are the last thread, try to cleanup // check if another thread have cleaned up if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) { return; } if (AtomicCas(&FreePtr, (TRootNode*)nullptr, current)) { - // free list - while (current) { + // free list + while (current) { TRootNode* p = AtomicGet(current->NextFree); EraseList(AtomicGet(current->ToDelete)); - delete current; - current = p; - } + delete current; + current = p; + } AtomicAdd(FreeingTaskCounter, 1); - } - } - } + } + } + } void AsyncRef() { AtomicAdd(FreememCounter, 1); - } + } void AsyncUnref() { - TryToFreeAsyncMemory(); + TryToFreeAsyncMemory(); AtomicAdd(FreememCounter, -1); - } + } void AsyncDel(TRootNode* toDelete, TListNode* lst) { AtomicSet(toDelete->ToDelete, lst); for (;;) { AtomicSet(toDelete->NextFree, AtomicGet(FreePtr)); if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree))) - break; - } - } + break; + } + } void AsyncUnref(TRootNode* toDelete, TListNode* lst) { - TryToFreeAsyncMemory(); + TryToFreeAsyncMemory(); if (AtomicAdd(FreememCounter, -1) == 0) { - // no other operations in progress, can safely reclaim memory - EraseList(lst); - delete toDelete; - } else { - // Dequeue()s in progress, put node to free list - AsyncDel(toDelete, lst); - } - } - + // no other operations in progress, can safely reclaim memory + EraseList(lst); + delete toDelete; + } else { + // Dequeue()s in progress, put node to free list + AsyncDel(toDelete, lst); + } + } + struct TListInvertor { TListNode* Copy; TListNode* Tail; TListNode* PrevFirst; - + TListInvertor() : Copy(nullptr) , Tail(nullptr) , PrevFirst(nullptr) - { - } + { + } ~TListInvertor() { - EraseList(Copy); - } + EraseList(Copy); + } void CopyWasUsed() { Copy = nullptr; Tail = nullptr; PrevFirst = nullptr; - } + } void DoCopy(TListNode* ptr) { TListNode* newFirst = ptr; TListNode* newCopy = nullptr; TListNode* newTail = nullptr; - while (ptr) { - if (ptr == PrevFirst) { - // short cut, we have copied this part already + while (ptr) { + if (ptr == PrevFirst) { + // short cut, we have copied this part already AtomicSet(Tail->Next, newCopy); - newCopy = Copy; + newCopy = Copy; Copy = nullptr; // do not destroy prev try - if (!newTail) - newTail = Tail; // tried to invert same list - break; - } + if (!newTail) + newTail = Tail; // tried to invert same list + break; + } TListNode* newElem = new TListNode(ptr->Data, newCopy); - newCopy = newElem; + newCopy = newElem; ptr = AtomicGet(ptr->Next); - if (!newTail) - newTail = newElem; - } - EraseList(Copy); // copy was useless - Copy = newCopy; - PrevFirst = newFirst; - Tail = newTail; - } - }; - + if (!newTail) + newTail = newElem; + } + EraseList(Copy); // copy was useless + Copy = newCopy; + PrevFirst = newFirst; + Tail = newTail; + } + }; + void EnqueueImpl(TListNode* head, TListNode* tail) { TRootNode* newRoot = new TRootNode; AsyncRef(); @@ -224,21 +224,21 @@ class TLockFreeQueue: public TNonCopyable { return lst; } -public: +public: TLockFreeQueue() - : JobQueue(new TRootNode) - , FreememCounter(0) + : JobQueue(new TRootNode) + , FreememCounter(0) , FreeingTaskCounter(0) , FreePtr(nullptr) - { - } + { + } ~TLockFreeQueue() { AsyncRef(); AsyncUnref(); // should free FreeList - EraseList(JobQueue->PushQueue); - EraseList(JobQueue->PopQueue); - delete JobQueue; - } + EraseList(JobQueue->PushQueue); + EraseList(JobQueue->PopQueue); + delete JobQueue; + } template <typename U> void Enqueue(U&& data) { TListNode* newNode = new TListNode(std::forward<U>(data)); @@ -268,21 +268,21 @@ public: for (++i; i != dataEnd; ++i) { TListNode* nextNode = node; node = new TListNode(*i, nextNode); - } + } EnqueueImpl(node, tail); - } + } bool Dequeue(T* data) { TRootNode* newRoot = nullptr; - TListInvertor listInvertor; - AsyncRef(); - for (;;) { + TListInvertor listInvertor; + AsyncRef(); + for (;;) { TRootNode* curRoot = AtomicGet(JobQueue); TListNode* tail = AtomicGet(curRoot->PopQueue); - if (tail) { - // has elems to pop - if (!newRoot) - newRoot = new TRootNode; - + if (tail) { + // has elems to pop + if (!newRoot) + newRoot = new TRootNode; + AtomicSet(newRoot->PushQueue, AtomicGet(curRoot->PushQueue)); AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next)); newRoot->CopyCounter(curRoot); @@ -291,19 +291,19 @@ public: if (AtomicCas(&JobQueue, newRoot, curRoot)) { *data = std::move(tail->Data); AtomicSet(tail->Next, nullptr); - AsyncUnref(curRoot, tail); - return true; - } - continue; - } + AsyncUnref(curRoot, tail); + return true; + } + continue; + } if (AtomicGet(curRoot->PushQueue) == nullptr) { - delete newRoot; - AsyncUnref(); - return false; // no elems to pop - } - - if (!newRoot) - newRoot = new TRootNode; + delete newRoot; + AsyncUnref(); + return false; // no elems to pop + } + + if (!newRoot) + newRoot = new TRootNode; AtomicSet(newRoot->PushQueue, nullptr); listInvertor.DoCopy(AtomicGet(curRoot->PushQueue)); AtomicSet(newRoot->PopQueue, listInvertor.Copy); @@ -311,13 +311,13 @@ public: Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr); if (AtomicCas(&JobQueue, newRoot, curRoot)) { newRoot = nullptr; - listInvertor.CopyWasUsed(); + listInvertor.CopyWasUsed(); AsyncDel(curRoot, AtomicGet(curRoot->PushQueue)); - } else { + } else { AtomicSet(newRoot->PopQueue, nullptr); - } - } - } + } + } + } template <typename TCollection> void DequeueAll(TCollection* res) { AsyncRef(); @@ -344,12 +344,12 @@ public: AsyncUnref(curRoot, toDeleteHead); } bool IsEmpty() { - AsyncRef(); + AsyncRef(); TRootNode* curRoot = AtomicGet(JobQueue); bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr; - AsyncUnref(); - return res; - } + AsyncUnref(); + return res; + } TCounter GetCounter() { AsyncRef(); TRootNode* curRoot = AtomicGet(JobQueue); @@ -357,7 +357,7 @@ public: AsyncUnref(); return res; } -}; +}; template <class T, class TCounter> class TAutoLockFreeQueue { diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h index a85b6100b6..ca3d95f3c3 100644 --- a/util/thread/lfstack.h +++ b/util/thread/lfstack.h @@ -1,16 +1,16 @@ #pragma once - + #include <util/generic/noncopyable.h> -#include <util/system/atomic.h> - -////////////////////////////// -// lock free lifo stack +#include <util/system/atomic.h> + +////////////////////////////// +// lock free lifo stack template <class T> class TLockFreeStack: TNonCopyable { struct TNode { - T Value; + T Value; TNode* Next; - + TNode() = default; template <class U> @@ -19,29 +19,29 @@ class TLockFreeStack: TNonCopyable { , Next(nullptr) { } - }; - + }; + TNode* Head; TNode* FreePtr; - TAtomic DequeueCount; - + TAtomic DequeueCount; + void TryToFreeMemory() { TNode* current = AtomicGet(FreePtr); - if (!current) - return; + if (!current) + return; if (AtomicAdd(DequeueCount, 0) == 1) { - // node current is in free list, we are the last thread so try to cleanup + // node current is in free list, we are the last thread so try to cleanup if (AtomicCas(&FreePtr, (TNode*)nullptr, current)) - EraseList(current); - } - } + EraseList(current); + } + } void EraseList(TNode* volatile p) { - while (p) { + while (p) { TNode* next = p->Next; - delete p; - p = next; - } - } + delete p; + p = next; + } + } void EnqueueImpl(TNode* volatile head, TNode* volatile tail) { for (;;) { tail->Next = AtomicGet(Head); @@ -55,7 +55,7 @@ class TLockFreeStack: TNonCopyable { EnqueueImpl(node, node); } -public: +public: TLockFreeStack() : Head(nullptr) , FreePtr(nullptr) @@ -63,9 +63,9 @@ public: { } ~TLockFreeStack() { - EraseList(Head); - EraseList(FreePtr); - } + EraseList(Head); + EraseList(FreePtr); + } void Enqueue(const T& t) { EnqueueImpl(t); @@ -83,7 +83,7 @@ public: void EnqueueAll(TIter dataBegin, TIter dataEnd) { if (dataBegin == dataEnd) { return; - } + } TIter i = dataBegin; TNode* volatile node = new TNode(*i); TNode* volatile tail = node; @@ -94,33 +94,33 @@ public: node->Next = nextNode; } EnqueueImpl(node, tail); - } + } bool Dequeue(T* res) { AtomicAdd(DequeueCount, 1); for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) { if (AtomicCas(&Head, AtomicGet(current->Next), current)) { *res = std::move(current->Value); - // delete current; // ABA problem - // even more complex node deletion - TryToFreeMemory(); + // delete current; // ABA problem + // even more complex node deletion + TryToFreeMemory(); if (AtomicAdd(DequeueCount, -1) == 0) { - // no other Dequeue()s, can safely reclaim memory - delete current; - } else { - // Dequeue()s in progress, put node to free list + // no other Dequeue()s, can safely reclaim memory + delete current; + } else { + // Dequeue()s in progress, put node to free list for (;;) { AtomicSet(current->Next, AtomicGet(FreePtr)); if (AtomicCas(&FreePtr, current, current->Next)) - break; - } - } - return true; - } - } - TryToFreeMemory(); + break; + } + } + return true; + } + } + TryToFreeMemory(); AtomicAdd(DequeueCount, -1); - return false; - } + return false; + } // add all elements to *res // elements are returned in order of dequeue (top to bottom; see example in unittest) template <typename TCollection> @@ -184,5 +184,5 @@ public: bool IsEmpty() { AtomicAdd(DequeueCount, 0); // mem barrier return AtomicGet(Head) == nullptr; // without lock, so result is approximate - } -}; + } +}; |