diff options
author | ddoarn <ddoarn@yandex-team.ru> | 2022-02-10 16:49:53 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:53 +0300 |
commit | 3bf10d3f40b502d181ef52f5c4602c98cb135360 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors/util | |
parent | 0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e (diff) | |
download | ydb-3bf10d3f40b502d181ef52f5c4602c98cb135360.tar.gz |
Restoring authorship annotation for <ddoarn@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/util')
-rw-r--r-- | library/cpp/actors/util/affinity.cpp | 100 | ||||
-rw-r--r-- | library/cpp/actors/util/affinity.h | 82 | ||||
-rw-r--r-- | library/cpp/actors/util/defs.h | 32 | ||||
-rw-r--r-- | library/cpp/actors/util/futex.h | 26 | ||||
-rw-r--r-- | library/cpp/actors/util/intrinsics.h | 192 | ||||
-rw-r--r-- | library/cpp/actors/util/named_tuple.h | 14 | ||||
-rw-r--r-- | library/cpp/actors/util/queue_chunk.h | 54 | ||||
-rw-r--r-- | library/cpp/actors/util/queue_oneone_inplace.h | 218 | ||||
-rw-r--r-- | library/cpp/actors/util/should_continue.cpp | 46 | ||||
-rw-r--r-- | library/cpp/actors/util/should_continue.h | 44 | ||||
-rw-r--r-- | library/cpp/actors/util/thread.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/util/threadparkpad.cpp | 88 | ||||
-rw-r--r-- | library/cpp/actors/util/threadparkpad.h | 32 | ||||
-rw-r--r-- | library/cpp/actors/util/ticket_lock.h | 86 | ||||
-rw-r--r-- | library/cpp/actors/util/unordered_cache.h | 134 | ||||
-rw-r--r-- | library/cpp/actors/util/ya.make | 52 |
16 files changed, 601 insertions, 601 deletions
diff --git a/library/cpp/actors/util/affinity.cpp b/library/cpp/actors/util/affinity.cpp index aab8219d6f..cc1b6e70ec 100644 --- a/library/cpp/actors/util/affinity.cpp +++ b/library/cpp/actors/util/affinity.cpp @@ -1,41 +1,41 @@ -#include "affinity.h" - -#ifdef _linux_ -#include <sched.h> -#endif - -class TAffinity::TImpl { -#ifdef _linux_ - cpu_set_t Mask; -#endif -public: - TImpl() { -#ifdef _linux_ - int ar = sched_getaffinity(0, sizeof(cpu_set_t), &Mask); +#include "affinity.h" + +#ifdef _linux_ +#include <sched.h> +#endif + +class TAffinity::TImpl { +#ifdef _linux_ + cpu_set_t Mask; +#endif +public: + TImpl() { +#ifdef _linux_ + int ar = sched_getaffinity(0, sizeof(cpu_set_t), &Mask); Y_VERIFY_DEBUG(ar == 0); -#endif - } - +#endif + } + explicit TImpl(const ui8* cpus, ui32 size) { -#ifdef _linux_ - CPU_ZERO(&Mask); +#ifdef _linux_ + CPU_ZERO(&Mask); for (ui32 i = 0; i != size; ++i) { if (cpus[i]) { CPU_SET(i, &Mask); } } -#else +#else Y_UNUSED(cpus); Y_UNUSED(size); -#endif - } - - void Set() const { -#ifdef _linux_ - int ar = sched_setaffinity(0, sizeof(cpu_set_t), &Mask); +#endif + } + + void Set() const { +#ifdef _linux_ + int ar = sched_setaffinity(0, sizeof(cpu_set_t), &Mask); Y_VERIFY_DEBUG(ar == 0); -#endif - } +#endif + } operator TCpuMask() const { TCpuMask result; @@ -48,20 +48,20 @@ public: return result; } -}; - -TAffinity::TAffinity() { -} - -TAffinity::~TAffinity() { -} - -TAffinity::TAffinity(const ui8* x, ui32 sz) { +}; + +TAffinity::TAffinity() { +} + +TAffinity::~TAffinity() { +} + +TAffinity::TAffinity(const ui8* x, ui32 sz) { if (x && sz) { - Impl.Reset(new TImpl(x, sz)); + Impl.Reset(new TImpl(x, sz)); } -} - +} + TAffinity::TAffinity(const TCpuMask& mask) { if (!mask.IsEmpty()) { static_assert(sizeof(ui8) == sizeof(mask.Cpus[0])); @@ -71,19 +71,19 @@ TAffinity::TAffinity(const TCpuMask& mask) { } } -void TAffinity::Current() { - Impl.Reset(new TImpl()); -} - -void TAffinity::Set() const { +void TAffinity::Current() { + Impl.Reset(new TImpl()); +} + +void TAffinity::Set() const { if (!!Impl) { - Impl->Set(); + Impl->Set(); } -} - -bool TAffinity::Empty() const { +} + +bool TAffinity::Empty() const { return !Impl; -} +} TAffinity::operator TCpuMask() const { if (!!Impl) { diff --git a/library/cpp/actors/util/affinity.h b/library/cpp/actors/util/affinity.h index 0a20744240..ae106ed180 100644 --- a/library/cpp/actors/util/affinity.h +++ b/library/cpp/actors/util/affinity.h @@ -1,49 +1,49 @@ -#pragma once - -#include "defs.h" +#pragma once + +#include "defs.h" #include "cpumask.h" - + // Platform-specific class to set or get thread affinity -class TAffinity: public TThrRefBase, TNonCopyable { - class TImpl; - THolder<TImpl> Impl; - -public: - TAffinity(); +class TAffinity: public TThrRefBase, TNonCopyable { + class TImpl; + THolder<TImpl> Impl; + +public: + TAffinity(); TAffinity(const ui8* cpus, ui32 size); explicit TAffinity(const TCpuMask& mask); - ~TAffinity(); - - void Current(); - void Set() const; - bool Empty() const; + ~TAffinity(); + + void Current(); + void Set() const; + bool Empty() const; operator TCpuMask() const; -}; - +}; + // Scoped affinity setter -class TAffinityGuard : TNonCopyable { - bool Stacked; - TAffinity OldAffinity; - -public: - TAffinityGuard(const TAffinity* affinity) { +class TAffinityGuard : TNonCopyable { + bool Stacked; + TAffinity OldAffinity; + +public: + TAffinityGuard(const TAffinity* affinity) { Stacked = false; - if (affinity && !affinity->Empty()) { - OldAffinity.Current(); - affinity->Set(); - Stacked = true; - } - } - - ~TAffinityGuard() { - Release(); - } - - void Release() { - if (Stacked) { - OldAffinity.Set(); - Stacked = false; - } - } -}; + if (affinity && !affinity->Empty()) { + OldAffinity.Current(); + affinity->Set(); + Stacked = true; + } + } + + ~TAffinityGuard() { + Release(); + } + + void Release() { + if (Stacked) { + OldAffinity.Set(); + Stacked = false; + } + } +}; diff --git a/library/cpp/actors/util/defs.h b/library/cpp/actors/util/defs.h index b88ce59ec6..5c3b57665b 100644 --- a/library/cpp/actors/util/defs.h +++ b/library/cpp/actors/util/defs.h @@ -1,16 +1,16 @@ -#pragma once - -// unique tag to fix pragma once gcc glueing: ./library/actors/util/defs.h - -#include <util/system/defaults.h> -#include <util/generic/bt_exception.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/generic/yexception.h> -#include <util/system/atomic.h> -#include <util/system/align.h> -#include <util/generic/vector.h> -#include <util/datetime/base.h> -#include <util/generic/ylimits.h> -#include "intrinsics.h" +#pragma once + +// unique tag to fix pragma once gcc glueing: ./library/actors/util/defs.h + +#include <util/system/defaults.h> +#include <util/generic/bt_exception.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/yexception.h> +#include <util/system/atomic.h> +#include <util/system/align.h> +#include <util/generic/vector.h> +#include <util/datetime/base.h> +#include <util/generic/ylimits.h> +#include "intrinsics.h" diff --git a/library/cpp/actors/util/futex.h b/library/cpp/actors/util/futex.h index 9f8a7b53bc..c193f8d128 100644 --- a/library/cpp/actors/util/futex.h +++ b/library/cpp/actors/util/futex.h @@ -1,13 +1,13 @@ -#pragma once - -#ifdef _linux_ - -#include <linux/futex.h> -#include <unistd.h> -#include <sys/syscall.h> - -static long SysFutex(void* addr1, int op, int val1, struct timespec* timeout, void* addr2, int val3) { - return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); -} - -#endif +#pragma once + +#ifdef _linux_ + +#include <linux/futex.h> +#include <unistd.h> +#include <sys/syscall.h> + +static long SysFutex(void* addr1, int op, int val1, struct timespec* timeout, void* addr2, int val3) { + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + +#endif diff --git a/library/cpp/actors/util/intrinsics.h b/library/cpp/actors/util/intrinsics.h index 983bf4788d..df07e36896 100644 --- a/library/cpp/actors/util/intrinsics.h +++ b/library/cpp/actors/util/intrinsics.h @@ -1,97 +1,97 @@ -#pragma once - -#include <util/system/defaults.h> -#include <util/system/atomic.h> -#include <util/system/spinlock.h> - +#pragma once + +#include <util/system/defaults.h> +#include <util/system/atomic.h> +#include <util/system/spinlock.h> + #include <library/cpp/sse/sse.h> // The header chooses appropriate SSE support - -static_assert(sizeof(TAtomic) == 8, "expect sizeof(TAtomic) == 8"); - -// we need explicit 32 bit operations to keep cache-line friendly packs -// so have to define some atomics additionaly to arcadia one -#ifdef _win_ -#pragma intrinsic(_InterlockedCompareExchange) -#pragma intrinsic(_InterlockedExchangeAdd) -#pragma intrinsic(_InterlockedIncrement) -#pragma intrinsic(_InterlockedDecrement) -#endif - -inline bool AtomicUi32Cas(volatile ui32* a, ui32 exchange, ui32 compare) { -#ifdef _win_ - return _InterlockedCompareExchange((volatile long*)a, exchange, compare) == (long)compare; -#else - ui32 expected = compare; - return __atomic_compare_exchange_n(a, &expected, exchange, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); -#endif -} - -inline ui32 AtomicUi32Add(volatile ui32* a, ui32 add) { -#ifdef _win_ - return _InterlockedExchangeAdd((volatile long*)a, add) + add; -#else - return __atomic_add_fetch(a, add, __ATOMIC_SEQ_CST); -#endif -} - -inline ui32 AtomicUi32Sub(volatile ui32* a, ui32 sub) { -#ifdef _win_ - return _InterlockedExchangeAdd((volatile long*)a, -(long)sub) - sub; -#else - return __atomic_sub_fetch(a, sub, __ATOMIC_SEQ_CST); -#endif -} - -inline ui32 AtomicUi32Increment(volatile ui32* a) { -#ifdef _win_ - return _InterlockedIncrement((volatile long*)a); -#else - return __atomic_add_fetch(a, 1, __ATOMIC_SEQ_CST); -#endif -} - -inline ui32 AtomicUi32Decrement(volatile ui32* a) { -#ifdef _win_ - return _InterlockedDecrement((volatile long*)a); -#else - return __atomic_sub_fetch(a, 1, __ATOMIC_SEQ_CST); -#endif -} - -template <typename T> -inline void AtomicStore(volatile T* a, T x) { - static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); -#ifdef _win_ - *a = x; -#else - __atomic_store_n(a, x, __ATOMIC_RELEASE); -#endif -} - -template <typename T> -inline void RelaxedStore(volatile T* a, T x) { - static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); -#ifdef _win_ - *a = x; -#else - __atomic_store_n(a, x, __ATOMIC_RELAXED); -#endif -} - -template <typename T> -inline T AtomicLoad(volatile T* a) { -#ifdef _win_ - return *a; -#else - return __atomic_load_n(a, __ATOMIC_ACQUIRE); -#endif -} - -template <typename T> -inline T RelaxedLoad(volatile T* a) { -#ifdef _win_ - return *a; -#else - return __atomic_load_n(a, __ATOMIC_RELAXED); -#endif -} + +static_assert(sizeof(TAtomic) == 8, "expect sizeof(TAtomic) == 8"); + +// we need explicit 32 bit operations to keep cache-line friendly packs +// so have to define some atomics additionaly to arcadia one +#ifdef _win_ +#pragma intrinsic(_InterlockedCompareExchange) +#pragma intrinsic(_InterlockedExchangeAdd) +#pragma intrinsic(_InterlockedIncrement) +#pragma intrinsic(_InterlockedDecrement) +#endif + +inline bool AtomicUi32Cas(volatile ui32* a, ui32 exchange, ui32 compare) { +#ifdef _win_ + return _InterlockedCompareExchange((volatile long*)a, exchange, compare) == (long)compare; +#else + ui32 expected = compare; + return __atomic_compare_exchange_n(a, &expected, exchange, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Add(volatile ui32* a, ui32 add) { +#ifdef _win_ + return _InterlockedExchangeAdd((volatile long*)a, add) + add; +#else + return __atomic_add_fetch(a, add, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Sub(volatile ui32* a, ui32 sub) { +#ifdef _win_ + return _InterlockedExchangeAdd((volatile long*)a, -(long)sub) - sub; +#else + return __atomic_sub_fetch(a, sub, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Increment(volatile ui32* a) { +#ifdef _win_ + return _InterlockedIncrement((volatile long*)a); +#else + return __atomic_add_fetch(a, 1, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Decrement(volatile ui32* a) { +#ifdef _win_ + return _InterlockedDecrement((volatile long*)a); +#else + return __atomic_sub_fetch(a, 1, __ATOMIC_SEQ_CST); +#endif +} + +template <typename T> +inline void AtomicStore(volatile T* a, T x) { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); +#ifdef _win_ + *a = x; +#else + __atomic_store_n(a, x, __ATOMIC_RELEASE); +#endif +} + +template <typename T> +inline void RelaxedStore(volatile T* a, T x) { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); +#ifdef _win_ + *a = x; +#else + __atomic_store_n(a, x, __ATOMIC_RELAXED); +#endif +} + +template <typename T> +inline T AtomicLoad(volatile T* a) { +#ifdef _win_ + return *a; +#else + return __atomic_load_n(a, __ATOMIC_ACQUIRE); +#endif +} + +template <typename T> +inline T RelaxedLoad(volatile T* a) { +#ifdef _win_ + return *a; +#else + return __atomic_load_n(a, __ATOMIC_RELAXED); +#endif +} diff --git a/library/cpp/actors/util/named_tuple.h b/library/cpp/actors/util/named_tuple.h index 5a8999c4ce..67f185bba8 100644 --- a/library/cpp/actors/util/named_tuple.h +++ b/library/cpp/actors/util/named_tuple.h @@ -2,29 +2,29 @@ #include "defs.h" -template <typename TDerived> +template <typename TDerived> struct TNamedTupleBase { - friend bool operator==(const TDerived& x, const TDerived& y) { + friend bool operator==(const TDerived& x, const TDerived& y) { return x.ConvertToTuple() == y.ConvertToTuple(); } - friend bool operator!=(const TDerived& x, const TDerived& y) { + friend bool operator!=(const TDerived& x, const TDerived& y) { return x.ConvertToTuple() != y.ConvertToTuple(); } - friend bool operator<(const TDerived& x, const TDerived& y) { + friend bool operator<(const TDerived& x, const TDerived& y) { return x.ConvertToTuple() < y.ConvertToTuple(); } - friend bool operator<=(const TDerived& x, const TDerived& y) { + friend bool operator<=(const TDerived& x, const TDerived& y) { return x.ConvertToTuple() <= y.ConvertToTuple(); } - friend bool operator>(const TDerived& x, const TDerived& y) { + friend bool operator>(const TDerived& x, const TDerived& y) { return x.ConvertToTuple() > y.ConvertToTuple(); } - friend bool operator>=(const TDerived& x, const TDerived& y) { + friend bool operator>=(const TDerived& x, const TDerived& y) { return x.ConvertToTuple() >= y.ConvertToTuple(); } }; diff --git a/library/cpp/actors/util/queue_chunk.h b/library/cpp/actors/util/queue_chunk.h index 96657ee890..8a4e02d8cb 100644 --- a/library/cpp/actors/util/queue_chunk.h +++ b/library/cpp/actors/util/queue_chunk.h @@ -1,29 +1,29 @@ -#pragma once - -#include "defs.h" - -template <typename T, ui32 TSize, typename TDerived> -struct TQueueChunkDerived { - static const ui32 EntriesCount = (TSize - sizeof(TQueueChunkDerived*)) / sizeof(T); +#pragma once + +#include "defs.h" + +template <typename T, ui32 TSize, typename TDerived> +struct TQueueChunkDerived { + static const ui32 EntriesCount = (TSize - sizeof(TQueueChunkDerived*)) / sizeof(T); static_assert(EntriesCount > 0, "expect EntriesCount > 0"); - - volatile T Entries[EntriesCount]; - TDerived* volatile Next; - - TQueueChunkDerived() { - memset(this, 0, sizeof(TQueueChunkDerived)); - } -}; - -template <typename T, ui32 TSize> -struct TQueueChunk { - static const ui32 EntriesCount = (TSize - sizeof(TQueueChunk*)) / sizeof(T); + + volatile T Entries[EntriesCount]; + TDerived* volatile Next; + + TQueueChunkDerived() { + memset(this, 0, sizeof(TQueueChunkDerived)); + } +}; + +template <typename T, ui32 TSize> +struct TQueueChunk { + static const ui32 EntriesCount = (TSize - sizeof(TQueueChunk*)) / sizeof(T); static_assert(EntriesCount > 0, "expect EntriesCount > 0"); - - volatile T Entries[EntriesCount]; - TQueueChunk* volatile Next; - - TQueueChunk() { - memset(this, 0, sizeof(TQueueChunk)); - } -}; + + volatile T Entries[EntriesCount]; + TQueueChunk* volatile Next; + + TQueueChunk() { + memset(this, 0, sizeof(TQueueChunk)); + } +}; diff --git a/library/cpp/actors/util/queue_oneone_inplace.h b/library/cpp/actors/util/queue_oneone_inplace.h index 48507a5235..d7ec8bb21c 100644 --- a/library/cpp/actors/util/queue_oneone_inplace.h +++ b/library/cpp/actors/util/queue_oneone_inplace.h @@ -1,118 +1,118 @@ -#pragma once - -#include "defs.h" -#include "queue_chunk.h" - -template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>> -class TOneOneQueueInplace : TNonCopyable { +#pragma once + +#include "defs.h" +#include "queue_chunk.h" + +template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>> +class TOneOneQueueInplace : TNonCopyable { static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer"); - - TChunk* ReadFrom; - ui32 ReadPosition; - ui32 WritePosition; - TChunk* WriteTo; - - friend class TReadIterator; - -public: - class TReadIterator { - TChunk* ReadFrom; - ui32 ReadPosition; - - public: - TReadIterator(TChunk* readFrom, ui32 readPosition) - : ReadFrom(readFrom) - , ReadPosition(readPosition) - { - } - - inline T Next() { - TChunk* head = ReadFrom; - if (ReadPosition != TChunk::EntriesCount) { - return AtomicLoad(&head->Entries[ReadPosition++]); - } else if (TChunk* next = AtomicLoad(&head->Next)) { - ReadFrom = next; - ReadPosition = 0; - return Next(); - } + + TChunk* ReadFrom; + ui32 ReadPosition; + ui32 WritePosition; + TChunk* WriteTo; + + friend class TReadIterator; + +public: + class TReadIterator { + TChunk* ReadFrom; + ui32 ReadPosition; + + public: + TReadIterator(TChunk* readFrom, ui32 readPosition) + : ReadFrom(readFrom) + , ReadPosition(readPosition) + { + } + + inline T Next() { + TChunk* head = ReadFrom; + if (ReadPosition != TChunk::EntriesCount) { + return AtomicLoad(&head->Entries[ReadPosition++]); + } else if (TChunk* next = AtomicLoad(&head->Next)) { + ReadFrom = next; + ReadPosition = 0; + return Next(); + } return T{}; - } - }; - - TOneOneQueueInplace() - : ReadFrom(new TChunk()) - , ReadPosition(0) - , WritePosition(0) - , WriteTo(ReadFrom) - { - } - - ~TOneOneQueueInplace() { + } + }; + + TOneOneQueueInplace() + : ReadFrom(new TChunk()) + , ReadPosition(0) + , WritePosition(0) + , WriteTo(ReadFrom) + { + } + + ~TOneOneQueueInplace() { Y_VERIFY_DEBUG(Head() == 0); - delete ReadFrom; - } - - struct TPtrCleanDestructor { + delete ReadFrom; + } + + struct TPtrCleanDestructor { static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept { - while (T head = x->Pop()) - delete head; - delete x; - } - }; - - struct TCleanDestructor { + while (T head = x->Pop()) + delete head; + delete x; + } + }; + + struct TCleanDestructor { static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept { while (x->Pop() != nullptr) - continue; - delete x; - } - }; - - struct TPtrCleanInplaceMallocDestructor { - template <typename TPtrVal> + continue; + delete x; + } + }; + + struct TPtrCleanInplaceMallocDestructor { + template <typename TPtrVal> static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept { - while (TPtrVal* head = x->Pop()) { - head->~TPtrVal(); - free(head); - } - delete x; - } - }; - + while (TPtrVal* head = x->Pop()) { + head->~TPtrVal(); + free(head); + } + delete x; + } + }; + void Push(T x) noexcept { - if (WritePosition != TChunk::EntriesCount) { - AtomicStore(&WriteTo->Entries[WritePosition], x); - ++WritePosition; - } else { - TChunk* next = new TChunk(); - next->Entries[0] = x; - AtomicStore(&WriteTo->Next, next); - WriteTo = next; - WritePosition = 1; - } - } - - T Head() { - TChunk* head = ReadFrom; - if (ReadPosition != TChunk::EntriesCount) { - return AtomicLoad(&head->Entries[ReadPosition]); - } else if (TChunk* next = AtomicLoad(&head->Next)) { - ReadFrom = next; - delete head; - ReadPosition = 0; - return Head(); - } + if (WritePosition != TChunk::EntriesCount) { + AtomicStore(&WriteTo->Entries[WritePosition], x); + ++WritePosition; + } else { + TChunk* next = new TChunk(); + next->Entries[0] = x; + AtomicStore(&WriteTo->Next, next); + WriteTo = next; + WritePosition = 1; + } + } + + T Head() { + TChunk* head = ReadFrom; + if (ReadPosition != TChunk::EntriesCount) { + return AtomicLoad(&head->Entries[ReadPosition]); + } else if (TChunk* next = AtomicLoad(&head->Next)) { + ReadFrom = next; + delete head; + ReadPosition = 0; + return Head(); + } return T{}; - } - - T Pop() { - T ret = Head(); - if (ret) - ++ReadPosition; - return ret; - } - - TReadIterator Iterator() { - return TReadIterator(ReadFrom, ReadPosition); - } -}; + } + + T Pop() { + T ret = Head(); + if (ret) + ++ReadPosition; + return ret; + } + + TReadIterator Iterator() { + return TReadIterator(ReadFrom, ReadPosition); + } +}; diff --git a/library/cpp/actors/util/should_continue.cpp b/library/cpp/actors/util/should_continue.cpp index 38bb6a28b3..258e6a0aff 100644 --- a/library/cpp/actors/util/should_continue.cpp +++ b/library/cpp/actors/util/should_continue.cpp @@ -1,23 +1,23 @@ -#include "should_continue.h" - -void TProgramShouldContinue::ShouldRestart() { - AtomicSet(State, Restart); -} - -void TProgramShouldContinue::ShouldStop(int returnCode) { - AtomicSet(ReturnCode, returnCode); - AtomicSet(State, Stop); -} - -TProgramShouldContinue::EState TProgramShouldContinue::PollState() { - return static_cast<EState>(AtomicGet(State)); -} - -int TProgramShouldContinue::GetReturnCode() { - return static_cast<int>(AtomicGet(ReturnCode)); -} - -void TProgramShouldContinue::Reset() { - AtomicSet(ReturnCode, 0); - AtomicSet(State, Continue); -} +#include "should_continue.h" + +void TProgramShouldContinue::ShouldRestart() { + AtomicSet(State, Restart); +} + +void TProgramShouldContinue::ShouldStop(int returnCode) { + AtomicSet(ReturnCode, returnCode); + AtomicSet(State, Stop); +} + +TProgramShouldContinue::EState TProgramShouldContinue::PollState() { + return static_cast<EState>(AtomicGet(State)); +} + +int TProgramShouldContinue::GetReturnCode() { + return static_cast<int>(AtomicGet(ReturnCode)); +} + +void TProgramShouldContinue::Reset() { + AtomicSet(ReturnCode, 0); + AtomicSet(State, Continue); +} diff --git a/library/cpp/actors/util/should_continue.h b/library/cpp/actors/util/should_continue.h index 8fc7f9dab0..76acc40dc4 100644 --- a/library/cpp/actors/util/should_continue.h +++ b/library/cpp/actors/util/should_continue.h @@ -1,22 +1,22 @@ -#pragma once -#include "defs.h" - -class TProgramShouldContinue { -public: - enum EState { - Continue, - Stop, - Restart, - }; - - void ShouldRestart(); - void ShouldStop(int returnCode = 0); - - EState PollState(); - int GetReturnCode(); - - void Reset(); -private: - TAtomic ReturnCode = 0; - TAtomic State = Continue; -}; +#pragma once +#include "defs.h" + +class TProgramShouldContinue { +public: + enum EState { + Continue, + Stop, + Restart, + }; + + void ShouldRestart(); + void ShouldStop(int returnCode = 0); + + EState PollState(); + int GetReturnCode(); + + void Reset(); +private: + TAtomic ReturnCode = 0; + TAtomic State = Continue; +}; diff --git a/library/cpp/actors/util/thread.h b/library/cpp/actors/util/thread.h index 0afbe10921..d742c8c585 100644 --- a/library/cpp/actors/util/thread.h +++ b/library/cpp/actors/util/thread.h @@ -8,7 +8,7 @@ #include <time.h> inline void SetCurrentThreadName(const TString& name, - const ui32 maxCharsFromProcessName = 8) { + const ui32 maxCharsFromProcessName = 8) { #if defined(_linux_) // linux limits threadname by 15 + \0 diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp index 88813e270c..74069ff15b 100644 --- a/library/cpp/actors/util/threadparkpad.cpp +++ b/library/cpp/actors/util/threadparkpad.cpp @@ -1,15 +1,15 @@ -#include "threadparkpad.h" -#include <util/system/winint.h> - -#ifdef _linux_ - -#include "futex.h" - -namespace NActors { +#include "threadparkpad.h" +#include <util/system/winint.h> + +#ifdef _linux_ + +#include "futex.h" + +namespace NActors { class TThreadParkPad::TImpl { volatile bool Interrupted; int Futex; - + public: TImpl() : Interrupted(false) @@ -18,39 +18,39 @@ namespace NActors { } ~TImpl() { } - + bool Park() noexcept { __atomic_fetch_sub(&Futex, 1, __ATOMIC_SEQ_CST); while (__atomic_load_n(&Futex, __ATOMIC_ACQUIRE) == -1) SysFutex(&Futex, FUTEX_WAIT_PRIVATE, -1, nullptr, nullptr, 0); return IsInterrupted(); } - + void Unpark() noexcept { const int old = __atomic_fetch_add(&Futex, 1, __ATOMIC_SEQ_CST); if (old == -1) SysFutex(&Futex, FUTEX_WAKE_PRIVATE, -1, nullptr, nullptr, 0); } - + void Interrupt() noexcept { __atomic_store_n(&Interrupted, true, __ATOMIC_SEQ_CST); Unpark(); } - + bool IsInterrupted() const noexcept { return __atomic_load_n(&Interrupted, __ATOMIC_ACQUIRE); } }; - -#elif defined _win32_ + +#elif defined _win32_ #include <util/generic/bt_exception.h> -#include <util/generic/yexception.h> - -namespace NActors { +#include <util/generic/yexception.h> + +namespace NActors { class TThreadParkPad::TImpl { TAtomic Interrupted; HANDLE EvHandle; - + public: TImpl() : Interrupted(false) @@ -63,35 +63,35 @@ namespace NActors { if (EvHandle) ::CloseHandle(EvHandle); } - + bool Park() noexcept { ::WaitForSingleObject(EvHandle, INFINITE); return AtomicGet(Interrupted); } - + void Unpark() noexcept { ::SetEvent(EvHandle); } - + void Interrupt() noexcept { AtomicSet(Interrupted, true); Unpark(); } - + bool IsInterrupted() const noexcept { return AtomicGet(Interrupted); } }; - -#else - -#include <util/system/event.h> - -namespace NActors { + +#else + +#include <util/system/event.h> + +namespace NActors { class TThreadParkPad::TImpl { TAtomic Interrupted; TSystemEvent Ev; - + public: TImpl() : Interrupted(false) @@ -100,7 +100,7 @@ namespace NActors { } ~TImpl() { } - + bool Park() noexcept { Ev.Wait(); return AtomicGet(Interrupted); @@ -123,26 +123,26 @@ namespace NActors { TThreadParkPad::TThreadParkPad() : Impl(new TThreadParkPad::TImpl()) - { - } + { + } TThreadParkPad::~TThreadParkPad() { - } - + } + bool TThreadParkPad::Park() noexcept { return Impl->Park(); - } - + } + void TThreadParkPad::Unpark() noexcept { Impl->Unpark(); - } - + } + void TThreadParkPad::Interrupt() noexcept { Impl->Interrupt(); - } - + } + bool TThreadParkPad::Interrupted() const noexcept { return Impl->IsInterrupted(); - } - -} + } + +} diff --git a/library/cpp/actors/util/threadparkpad.h b/library/cpp/actors/util/threadparkpad.h index 30f9cffdb9..5b574ccf34 100644 --- a/library/cpp/actors/util/threadparkpad.h +++ b/library/cpp/actors/util/threadparkpad.h @@ -1,21 +1,21 @@ -#pragma once - -#include <util/generic/ptr.h> - -namespace NActors { - class TThreadParkPad { - private: - class TImpl; - THolder<TImpl> Impl; - - public: - TThreadParkPad(); +#pragma once + +#include <util/generic/ptr.h> + +namespace NActors { + class TThreadParkPad { + private: + class TImpl; + THolder<TImpl> Impl; + + public: + TThreadParkPad(); ~TThreadParkPad(); - + bool Park() noexcept; void Unpark() noexcept; void Interrupt() noexcept; bool Interrupted() const noexcept; - }; - -} + }; + +} diff --git a/library/cpp/actors/util/ticket_lock.h b/library/cpp/actors/util/ticket_lock.h index 334922a088..3b1fa80393 100644 --- a/library/cpp/actors/util/ticket_lock.h +++ b/library/cpp/actors/util/ticket_lock.h @@ -1,48 +1,48 @@ -#pragma once - -#include "intrinsics.h" -#include <util/system/guard.h> -#include <util/system/yassert.h> - -class TTicketLock : TNonCopyable { - ui32 TicketIn; - ui32 TicketOut; - -public: - TTicketLock() - : TicketIn(0) - , TicketOut(0) - { - } - +#pragma once + +#include "intrinsics.h" +#include <util/system/guard.h> +#include <util/system/yassert.h> + +class TTicketLock : TNonCopyable { + ui32 TicketIn; + ui32 TicketOut; + +public: + TTicketLock() + : TicketIn(0) + , TicketOut(0) + { + } + void Release() noexcept { - AtomicUi32Increment(&TicketOut); - } - + AtomicUi32Increment(&TicketOut); + } + ui32 Acquire() noexcept { - ui32 revolves = 0; - const ui32 ticket = AtomicUi32Increment(&TicketIn) - 1; - while (ticket != AtomicLoad(&TicketOut)) { + ui32 revolves = 0; + const ui32 ticket = AtomicUi32Increment(&TicketIn) - 1; + while (ticket != AtomicLoad(&TicketOut)) { Y_VERIFY_DEBUG(ticket >= AtomicLoad(&TicketOut)); - SpinLockPause(); - ++revolves; - } - return revolves; - } - + SpinLockPause(); + ++revolves; + } + return revolves; + } + bool TryAcquire() noexcept { - const ui32 x = AtomicLoad(&TicketOut); - if (x == AtomicLoad(&TicketIn) && AtomicUi32Cas(&TicketIn, x + 1, x)) - return true; - else - return false; - } - + const ui32 x = AtomicLoad(&TicketOut); + if (x == AtomicLoad(&TicketIn) && AtomicUi32Cas(&TicketIn, x + 1, x)) + return true; + else + return false; + } + bool IsLocked() noexcept { - const ui32 ticketIn = AtomicLoad(&TicketIn); - const ui32 ticketOut = AtomicLoad(&TicketOut); - return (ticketIn != ticketOut); - } - - typedef ::TGuard<TTicketLock> TGuard; -}; + const ui32 ticketIn = AtomicLoad(&TicketIn); + const ui32 ticketOut = AtomicLoad(&TicketOut); + return (ticketIn != ticketOut); + } + + typedef ::TGuard<TTicketLock> TGuard; +}; diff --git a/library/cpp/actors/util/unordered_cache.h b/library/cpp/actors/util/unordered_cache.h index 57b9c6663d..76f036c0cf 100644 --- a/library/cpp/actors/util/unordered_cache.h +++ b/library/cpp/actors/util/unordered_cache.h @@ -1,22 +1,22 @@ -#pragma once - -#include "defs.h" +#pragma once + +#include "defs.h" #include "queue_chunk.h" - + template <typename T, ui32 Size = 512, ui32 ConcurrencyFactor = 1, typename TChunk = TQueueChunk<T, Size>> -class TUnorderedCache : TNonCopyable { +class TUnorderedCache : TNonCopyable { static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); - -public: + +public: static constexpr ui32 Concurrency = ConcurrencyFactor * 4; - -private: + +private: struct TReadSlot { TChunk* volatile ReadFrom; volatile ui32 ReadPosition; char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line }; - + struct TWriteSlot { TChunk* volatile WriteTo; volatile ui32 WritePosition; @@ -31,7 +31,7 @@ private: TWriteSlot WriteSlots[Concurrency]; static_assert(sizeof(TChunk*) == sizeof(TAtomic), "expect sizeof(TChunk*) == sizeof(TAtomic)"); - + private: struct TLockedWriter { TWriteSlot* Slot; @@ -82,53 +82,53 @@ private: private: TLockedWriter LockWriter(ui64 writerRotation) { ui32 cycle = 0; - for (;;) { + for (;;) { TWriteSlot* slot = &WriteSlots[writerRotation % Concurrency]; if (AtomicLoad(&slot->WriteTo) != nullptr) { if (TChunk* writeTo = AtomicSwap(&slot->WriteTo, nullptr)) { return TLockedWriter(slot, writeTo); - } - } + } + } ++writerRotation; - + // Do a spinlock pause after a full cycle if (++cycle == Concurrency) { SpinLockPause(); cycle = 0; } } - } - + } + void WriteOne(TLockedWriter& lock, T x) { Y_VERIFY_DEBUG(x != 0); - + const ui32 pos = AtomicLoad(&lock.Slot->WritePosition); - if (pos != TChunk::EntriesCount) { + if (pos != TChunk::EntriesCount) { AtomicStore(&lock.Slot->WritePosition, pos + 1); AtomicStore(&lock.WriteTo->Entries[pos], x); - } else { + } else { TChunk* next = new TChunk(); - AtomicStore(&next->Entries[0], x); + AtomicStore(&next->Entries[0], x); AtomicStore(&lock.Slot->WritePosition, 1u); AtomicStore(&lock.WriteTo->Next, next); lock.WriteTo = next; - } - } - -public: + } + } + +public: TUnorderedCache() { for (ui32 i = 0; i < Concurrency; ++i) { ReadSlots[i].ReadFrom = new TChunk(); ReadSlots[i].ReadPosition = 0; - + WriteSlots[i].WriteTo = ReadSlots[i].ReadFrom; WriteSlots[i].WritePosition = 0; - } - } - + } + } + ~TUnorderedCache() { Y_VERIFY(!Pop(0)); - + for (ui64 i = 0; i < Concurrency; ++i) { if (ReadSlots[i].ReadFrom) { delete ReadSlots[i].ReadFrom; @@ -139,63 +139,63 @@ public: } T Pop(ui64 readerRotation) noexcept { - ui64 readerIndex = readerRotation; - const ui64 endIndex = readerIndex + Concurrency; - for (; readerIndex != endIndex; ++readerIndex) { + ui64 readerIndex = readerRotation; + const ui64 endIndex = readerIndex + Concurrency; + for (; readerIndex != endIndex; ++readerIndex) { TReadSlot* slot = &ReadSlots[readerIndex % Concurrency]; if (AtomicLoad(&slot->ReadFrom) != nullptr) { if (TChunk* readFrom = AtomicSwap(&slot->ReadFrom, nullptr)) { const ui32 pos = AtomicLoad(&slot->ReadPosition); - if (pos != TChunk::EntriesCount) { - if (T ret = AtomicLoad(&readFrom->Entries[pos])) { + if (pos != TChunk::EntriesCount) { + if (T ret = AtomicLoad(&readFrom->Entries[pos])) { AtomicStore(&slot->ReadPosition, pos + 1); AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk return ret; // found, return - } else { + } else { AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk - } - } else if (TChunk* next = AtomicLoad(&readFrom->Next)) { - if (T ret = AtomicLoad(&next->Entries[0])) { + } + } else if (TChunk* next = AtomicLoad(&readFrom->Next)) { + if (T ret = AtomicLoad(&next->Entries[0])) { AtomicStore(&slot->ReadPosition, 1u); AtomicStore(&slot->ReadFrom, next); // release lock with next chunk - delete readFrom; - return ret; - } else { + delete readFrom; + return ret; + } else { AtomicStore(&slot->ReadPosition, 0u); AtomicStore(&slot->ReadFrom, next); // release lock with new chunk - delete readFrom; - } - } else { + delete readFrom; + } + } else { // nothing in old chunk and no next chunk, just release lock with old chunk AtomicStore(&slot->ReadFrom, readFrom); - } - } - } - } - - return 0; // got nothing after full cycle, return - } - - void Push(T x, ui64 writerRotation) { + } + } + } + } + + return 0; // got nothing after full cycle, return + } + + void Push(T x, ui64 writerRotation) { TLockedWriter lock = LockWriter(writerRotation); WriteOne(lock, x); - } - - void PushBulk(T* x, ui32 xcount, ui64 writerRotation) { + } + + void PushBulk(T* x, ui32 xcount, ui64 writerRotation) { for (;;) { // Fill no more then one queue chunk per round - const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount); - + const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount); + { TLockedWriter lock = LockWriter(writerRotation++); for (T* end = x + xround; x != end; ++x) WriteOne(lock, *x); } - - if (xcount <= TChunk::EntriesCount) - break; - - xcount -= TChunk::EntriesCount; - } - } -}; + + if (xcount <= TChunk::EntriesCount) + break; + + xcount -= TChunk::EntriesCount; + } + } +}; diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make index a8bc36c769..37488c3962 100644 --- a/library/cpp/actors/util/ya.make +++ b/library/cpp/actors/util/ya.make @@ -1,37 +1,37 @@ -LIBRARY() - +LIBRARY() + OWNER( ddoarn g:kikimr ) - -SRCS( - affinity.cpp - affinity.h + +SRCS( + affinity.cpp + affinity.h cpumask.h datetime.h - defs.h + defs.h funnel_queue.h - futex.h - intrinsics.h + futex.h + intrinsics.h local_process_key.h - named_tuple.h - queue_chunk.h - queue_oneone_inplace.h + named_tuple.h + queue_chunk.h + queue_oneone_inplace.h recentwnd.h rope.h - should_continue.cpp - should_continue.h - thread.h - threadparkpad.cpp - threadparkpad.h - ticket_lock.h + should_continue.cpp + should_continue.h + thread.h + threadparkpad.cpp + threadparkpad.h + ticket_lock.h timerfd.h - unordered_cache.h -) - -PEERDIR( - util -) - -END() + unordered_cache.h +) + +PEERDIR( + util +) + +END() |