aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/util
diff options
context:
space:
mode:
authorddoarn <ddoarn@yandex-team.ru>2022-02-10 16:49:53 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:53 +0300
commit3bf10d3f40b502d181ef52f5c4602c98cb135360 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors/util
parent0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e (diff)
downloadydb-3bf10d3f40b502d181ef52f5c4602c98cb135360.tar.gz
Restoring authorship annotation for <ddoarn@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/util')
-rw-r--r--library/cpp/actors/util/affinity.cpp100
-rw-r--r--library/cpp/actors/util/affinity.h82
-rw-r--r--library/cpp/actors/util/defs.h32
-rw-r--r--library/cpp/actors/util/futex.h26
-rw-r--r--library/cpp/actors/util/intrinsics.h192
-rw-r--r--library/cpp/actors/util/named_tuple.h14
-rw-r--r--library/cpp/actors/util/queue_chunk.h54
-rw-r--r--library/cpp/actors/util/queue_oneone_inplace.h218
-rw-r--r--library/cpp/actors/util/should_continue.cpp46
-rw-r--r--library/cpp/actors/util/should_continue.h44
-rw-r--r--library/cpp/actors/util/thread.h2
-rw-r--r--library/cpp/actors/util/threadparkpad.cpp88
-rw-r--r--library/cpp/actors/util/threadparkpad.h32
-rw-r--r--library/cpp/actors/util/ticket_lock.h86
-rw-r--r--library/cpp/actors/util/unordered_cache.h134
-rw-r--r--library/cpp/actors/util/ya.make52
16 files changed, 601 insertions, 601 deletions
diff --git a/library/cpp/actors/util/affinity.cpp b/library/cpp/actors/util/affinity.cpp
index aab8219d6f..cc1b6e70ec 100644
--- a/library/cpp/actors/util/affinity.cpp
+++ b/library/cpp/actors/util/affinity.cpp
@@ -1,41 +1,41 @@
-#include "affinity.h"
-
-#ifdef _linux_
-#include <sched.h>
-#endif
-
-class TAffinity::TImpl {
-#ifdef _linux_
- cpu_set_t Mask;
-#endif
-public:
- TImpl() {
-#ifdef _linux_
- int ar = sched_getaffinity(0, sizeof(cpu_set_t), &Mask);
+#include "affinity.h"
+
+#ifdef _linux_
+#include <sched.h>
+#endif
+
+class TAffinity::TImpl {
+#ifdef _linux_
+ cpu_set_t Mask;
+#endif
+public:
+ TImpl() {
+#ifdef _linux_
+ int ar = sched_getaffinity(0, sizeof(cpu_set_t), &Mask);
Y_VERIFY_DEBUG(ar == 0);
-#endif
- }
-
+#endif
+ }
+
explicit TImpl(const ui8* cpus, ui32 size) {
-#ifdef _linux_
- CPU_ZERO(&Mask);
+#ifdef _linux_
+ CPU_ZERO(&Mask);
for (ui32 i = 0; i != size; ++i) {
if (cpus[i]) {
CPU_SET(i, &Mask);
}
}
-#else
+#else
Y_UNUSED(cpus);
Y_UNUSED(size);
-#endif
- }
-
- void Set() const {
-#ifdef _linux_
- int ar = sched_setaffinity(0, sizeof(cpu_set_t), &Mask);
+#endif
+ }
+
+ void Set() const {
+#ifdef _linux_
+ int ar = sched_setaffinity(0, sizeof(cpu_set_t), &Mask);
Y_VERIFY_DEBUG(ar == 0);
-#endif
- }
+#endif
+ }
operator TCpuMask() const {
TCpuMask result;
@@ -48,20 +48,20 @@ public:
return result;
}
-};
-
-TAffinity::TAffinity() {
-}
-
-TAffinity::~TAffinity() {
-}
-
-TAffinity::TAffinity(const ui8* x, ui32 sz) {
+};
+
+TAffinity::TAffinity() {
+}
+
+TAffinity::~TAffinity() {
+}
+
+TAffinity::TAffinity(const ui8* x, ui32 sz) {
if (x && sz) {
- Impl.Reset(new TImpl(x, sz));
+ Impl.Reset(new TImpl(x, sz));
}
-}
-
+}
+
TAffinity::TAffinity(const TCpuMask& mask) {
if (!mask.IsEmpty()) {
static_assert(sizeof(ui8) == sizeof(mask.Cpus[0]));
@@ -71,19 +71,19 @@ TAffinity::TAffinity(const TCpuMask& mask) {
}
}
-void TAffinity::Current() {
- Impl.Reset(new TImpl());
-}
-
-void TAffinity::Set() const {
+void TAffinity::Current() {
+ Impl.Reset(new TImpl());
+}
+
+void TAffinity::Set() const {
if (!!Impl) {
- Impl->Set();
+ Impl->Set();
}
-}
-
-bool TAffinity::Empty() const {
+}
+
+bool TAffinity::Empty() const {
return !Impl;
-}
+}
TAffinity::operator TCpuMask() const {
if (!!Impl) {
diff --git a/library/cpp/actors/util/affinity.h b/library/cpp/actors/util/affinity.h
index 0a20744240..ae106ed180 100644
--- a/library/cpp/actors/util/affinity.h
+++ b/library/cpp/actors/util/affinity.h
@@ -1,49 +1,49 @@
-#pragma once
-
-#include "defs.h"
+#pragma once
+
+#include "defs.h"
#include "cpumask.h"
-
+
// Platform-specific class to set or get thread affinity
-class TAffinity: public TThrRefBase, TNonCopyable {
- class TImpl;
- THolder<TImpl> Impl;
-
-public:
- TAffinity();
+class TAffinity: public TThrRefBase, TNonCopyable {
+ class TImpl;
+ THolder<TImpl> Impl;
+
+public:
+ TAffinity();
TAffinity(const ui8* cpus, ui32 size);
explicit TAffinity(const TCpuMask& mask);
- ~TAffinity();
-
- void Current();
- void Set() const;
- bool Empty() const;
+ ~TAffinity();
+
+ void Current();
+ void Set() const;
+ bool Empty() const;
operator TCpuMask() const;
-};
-
+};
+
// Scoped affinity setter
-class TAffinityGuard : TNonCopyable {
- bool Stacked;
- TAffinity OldAffinity;
-
-public:
- TAffinityGuard(const TAffinity* affinity) {
+class TAffinityGuard : TNonCopyable {
+ bool Stacked;
+ TAffinity OldAffinity;
+
+public:
+ TAffinityGuard(const TAffinity* affinity) {
Stacked = false;
- if (affinity && !affinity->Empty()) {
- OldAffinity.Current();
- affinity->Set();
- Stacked = true;
- }
- }
-
- ~TAffinityGuard() {
- Release();
- }
-
- void Release() {
- if (Stacked) {
- OldAffinity.Set();
- Stacked = false;
- }
- }
-};
+ if (affinity && !affinity->Empty()) {
+ OldAffinity.Current();
+ affinity->Set();
+ Stacked = true;
+ }
+ }
+
+ ~TAffinityGuard() {
+ Release();
+ }
+
+ void Release() {
+ if (Stacked) {
+ OldAffinity.Set();
+ Stacked = false;
+ }
+ }
+};
diff --git a/library/cpp/actors/util/defs.h b/library/cpp/actors/util/defs.h
index b88ce59ec6..5c3b57665b 100644
--- a/library/cpp/actors/util/defs.h
+++ b/library/cpp/actors/util/defs.h
@@ -1,16 +1,16 @@
-#pragma once
-
-// unique tag to fix pragma once gcc glueing: ./library/actors/util/defs.h
-
-#include <util/system/defaults.h>
-#include <util/generic/bt_exception.h>
-#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/generic/yexception.h>
-#include <util/system/atomic.h>
-#include <util/system/align.h>
-#include <util/generic/vector.h>
-#include <util/datetime/base.h>
-#include <util/generic/ylimits.h>
-#include "intrinsics.h"
+#pragma once
+
+// unique tag to fix pragma once gcc glueing: ./library/actors/util/defs.h
+
+#include <util/system/defaults.h>
+#include <util/generic/bt_exception.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/yexception.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+#include <util/generic/vector.h>
+#include <util/datetime/base.h>
+#include <util/generic/ylimits.h>
+#include "intrinsics.h"
diff --git a/library/cpp/actors/util/futex.h b/library/cpp/actors/util/futex.h
index 9f8a7b53bc..c193f8d128 100644
--- a/library/cpp/actors/util/futex.h
+++ b/library/cpp/actors/util/futex.h
@@ -1,13 +1,13 @@
-#pragma once
-
-#ifdef _linux_
-
-#include <linux/futex.h>
-#include <unistd.h>
-#include <sys/syscall.h>
-
-static long SysFutex(void* addr1, int op, int val1, struct timespec* timeout, void* addr2, int val3) {
- return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
-}
-
-#endif
+#pragma once
+
+#ifdef _linux_
+
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+static long SysFutex(void* addr1, int op, int val1, struct timespec* timeout, void* addr2, int val3) {
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+
+#endif
diff --git a/library/cpp/actors/util/intrinsics.h b/library/cpp/actors/util/intrinsics.h
index 983bf4788d..df07e36896 100644
--- a/library/cpp/actors/util/intrinsics.h
+++ b/library/cpp/actors/util/intrinsics.h
@@ -1,97 +1,97 @@
-#pragma once
-
-#include <util/system/defaults.h>
-#include <util/system/atomic.h>
-#include <util/system/spinlock.h>
-
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/system/atomic.h>
+#include <util/system/spinlock.h>
+
#include <library/cpp/sse/sse.h> // The header chooses appropriate SSE support
-
-static_assert(sizeof(TAtomic) == 8, "expect sizeof(TAtomic) == 8");
-
-// we need explicit 32 bit operations to keep cache-line friendly packs
-// so have to define some atomics additionaly to arcadia one
-#ifdef _win_
-#pragma intrinsic(_InterlockedCompareExchange)
-#pragma intrinsic(_InterlockedExchangeAdd)
-#pragma intrinsic(_InterlockedIncrement)
-#pragma intrinsic(_InterlockedDecrement)
-#endif
-
-inline bool AtomicUi32Cas(volatile ui32* a, ui32 exchange, ui32 compare) {
-#ifdef _win_
- return _InterlockedCompareExchange((volatile long*)a, exchange, compare) == (long)compare;
-#else
- ui32 expected = compare;
- return __atomic_compare_exchange_n(a, &expected, exchange, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
-#endif
-}
-
-inline ui32 AtomicUi32Add(volatile ui32* a, ui32 add) {
-#ifdef _win_
- return _InterlockedExchangeAdd((volatile long*)a, add) + add;
-#else
- return __atomic_add_fetch(a, add, __ATOMIC_SEQ_CST);
-#endif
-}
-
-inline ui32 AtomicUi32Sub(volatile ui32* a, ui32 sub) {
-#ifdef _win_
- return _InterlockedExchangeAdd((volatile long*)a, -(long)sub) - sub;
-#else
- return __atomic_sub_fetch(a, sub, __ATOMIC_SEQ_CST);
-#endif
-}
-
-inline ui32 AtomicUi32Increment(volatile ui32* a) {
-#ifdef _win_
- return _InterlockedIncrement((volatile long*)a);
-#else
- return __atomic_add_fetch(a, 1, __ATOMIC_SEQ_CST);
-#endif
-}
-
-inline ui32 AtomicUi32Decrement(volatile ui32* a) {
-#ifdef _win_
- return _InterlockedDecrement((volatile long*)a);
-#else
- return __atomic_sub_fetch(a, 1, __ATOMIC_SEQ_CST);
-#endif
-}
-
-template <typename T>
-inline void AtomicStore(volatile T* a, T x) {
- static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
-#ifdef _win_
- *a = x;
-#else
- __atomic_store_n(a, x, __ATOMIC_RELEASE);
-#endif
-}
-
-template <typename T>
-inline void RelaxedStore(volatile T* a, T x) {
- static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
-#ifdef _win_
- *a = x;
-#else
- __atomic_store_n(a, x, __ATOMIC_RELAXED);
-#endif
-}
-
-template <typename T>
-inline T AtomicLoad(volatile T* a) {
-#ifdef _win_
- return *a;
-#else
- return __atomic_load_n(a, __ATOMIC_ACQUIRE);
-#endif
-}
-
-template <typename T>
-inline T RelaxedLoad(volatile T* a) {
-#ifdef _win_
- return *a;
-#else
- return __atomic_load_n(a, __ATOMIC_RELAXED);
-#endif
-}
+
+static_assert(sizeof(TAtomic) == 8, "expect sizeof(TAtomic) == 8");
+
+// we need explicit 32 bit operations to keep cache-line friendly packs
+// so have to define some atomics additionaly to arcadia one
+#ifdef _win_
+#pragma intrinsic(_InterlockedCompareExchange)
+#pragma intrinsic(_InterlockedExchangeAdd)
+#pragma intrinsic(_InterlockedIncrement)
+#pragma intrinsic(_InterlockedDecrement)
+#endif
+
+inline bool AtomicUi32Cas(volatile ui32* a, ui32 exchange, ui32 compare) {
+#ifdef _win_
+ return _InterlockedCompareExchange((volatile long*)a, exchange, compare) == (long)compare;
+#else
+ ui32 expected = compare;
+ return __atomic_compare_exchange_n(a, &expected, exchange, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Add(volatile ui32* a, ui32 add) {
+#ifdef _win_
+ return _InterlockedExchangeAdd((volatile long*)a, add) + add;
+#else
+ return __atomic_add_fetch(a, add, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Sub(volatile ui32* a, ui32 sub) {
+#ifdef _win_
+ return _InterlockedExchangeAdd((volatile long*)a, -(long)sub) - sub;
+#else
+ return __atomic_sub_fetch(a, sub, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Increment(volatile ui32* a) {
+#ifdef _win_
+ return _InterlockedIncrement((volatile long*)a);
+#else
+ return __atomic_add_fetch(a, 1, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Decrement(volatile ui32* a) {
+#ifdef _win_
+ return _InterlockedDecrement((volatile long*)a);
+#else
+ return __atomic_sub_fetch(a, 1, __ATOMIC_SEQ_CST);
+#endif
+}
+
+template <typename T>
+inline void AtomicStore(volatile T* a, T x) {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+#ifdef _win_
+ *a = x;
+#else
+ __atomic_store_n(a, x, __ATOMIC_RELEASE);
+#endif
+}
+
+template <typename T>
+inline void RelaxedStore(volatile T* a, T x) {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+#ifdef _win_
+ *a = x;
+#else
+ __atomic_store_n(a, x, __ATOMIC_RELAXED);
+#endif
+}
+
+template <typename T>
+inline T AtomicLoad(volatile T* a) {
+#ifdef _win_
+ return *a;
+#else
+ return __atomic_load_n(a, __ATOMIC_ACQUIRE);
+#endif
+}
+
+template <typename T>
+inline T RelaxedLoad(volatile T* a) {
+#ifdef _win_
+ return *a;
+#else
+ return __atomic_load_n(a, __ATOMIC_RELAXED);
+#endif
+}
diff --git a/library/cpp/actors/util/named_tuple.h b/library/cpp/actors/util/named_tuple.h
index 5a8999c4ce..67f185bba8 100644
--- a/library/cpp/actors/util/named_tuple.h
+++ b/library/cpp/actors/util/named_tuple.h
@@ -2,29 +2,29 @@
#include "defs.h"
-template <typename TDerived>
+template <typename TDerived>
struct TNamedTupleBase {
- friend bool operator==(const TDerived& x, const TDerived& y) {
+ friend bool operator==(const TDerived& x, const TDerived& y) {
return x.ConvertToTuple() == y.ConvertToTuple();
}
- friend bool operator!=(const TDerived& x, const TDerived& y) {
+ friend bool operator!=(const TDerived& x, const TDerived& y) {
return x.ConvertToTuple() != y.ConvertToTuple();
}
- friend bool operator<(const TDerived& x, const TDerived& y) {
+ friend bool operator<(const TDerived& x, const TDerived& y) {
return x.ConvertToTuple() < y.ConvertToTuple();
}
- friend bool operator<=(const TDerived& x, const TDerived& y) {
+ friend bool operator<=(const TDerived& x, const TDerived& y) {
return x.ConvertToTuple() <= y.ConvertToTuple();
}
- friend bool operator>(const TDerived& x, const TDerived& y) {
+ friend bool operator>(const TDerived& x, const TDerived& y) {
return x.ConvertToTuple() > y.ConvertToTuple();
}
- friend bool operator>=(const TDerived& x, const TDerived& y) {
+ friend bool operator>=(const TDerived& x, const TDerived& y) {
return x.ConvertToTuple() >= y.ConvertToTuple();
}
};
diff --git a/library/cpp/actors/util/queue_chunk.h b/library/cpp/actors/util/queue_chunk.h
index 96657ee890..8a4e02d8cb 100644
--- a/library/cpp/actors/util/queue_chunk.h
+++ b/library/cpp/actors/util/queue_chunk.h
@@ -1,29 +1,29 @@
-#pragma once
-
-#include "defs.h"
-
-template <typename T, ui32 TSize, typename TDerived>
-struct TQueueChunkDerived {
- static const ui32 EntriesCount = (TSize - sizeof(TQueueChunkDerived*)) / sizeof(T);
+#pragma once
+
+#include "defs.h"
+
+template <typename T, ui32 TSize, typename TDerived>
+struct TQueueChunkDerived {
+ static const ui32 EntriesCount = (TSize - sizeof(TQueueChunkDerived*)) / sizeof(T);
static_assert(EntriesCount > 0, "expect EntriesCount > 0");
-
- volatile T Entries[EntriesCount];
- TDerived* volatile Next;
-
- TQueueChunkDerived() {
- memset(this, 0, sizeof(TQueueChunkDerived));
- }
-};
-
-template <typename T, ui32 TSize>
-struct TQueueChunk {
- static const ui32 EntriesCount = (TSize - sizeof(TQueueChunk*)) / sizeof(T);
+
+ volatile T Entries[EntriesCount];
+ TDerived* volatile Next;
+
+ TQueueChunkDerived() {
+ memset(this, 0, sizeof(TQueueChunkDerived));
+ }
+};
+
+template <typename T, ui32 TSize>
+struct TQueueChunk {
+ static const ui32 EntriesCount = (TSize - sizeof(TQueueChunk*)) / sizeof(T);
static_assert(EntriesCount > 0, "expect EntriesCount > 0");
-
- volatile T Entries[EntriesCount];
- TQueueChunk* volatile Next;
-
- TQueueChunk() {
- memset(this, 0, sizeof(TQueueChunk));
- }
-};
+
+ volatile T Entries[EntriesCount];
+ TQueueChunk* volatile Next;
+
+ TQueueChunk() {
+ memset(this, 0, sizeof(TQueueChunk));
+ }
+};
diff --git a/library/cpp/actors/util/queue_oneone_inplace.h b/library/cpp/actors/util/queue_oneone_inplace.h
index 48507a5235..d7ec8bb21c 100644
--- a/library/cpp/actors/util/queue_oneone_inplace.h
+++ b/library/cpp/actors/util/queue_oneone_inplace.h
@@ -1,118 +1,118 @@
-#pragma once
-
-#include "defs.h"
-#include "queue_chunk.h"
-
-template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
-class TOneOneQueueInplace : TNonCopyable {
+#pragma once
+
+#include "defs.h"
+#include "queue_chunk.h"
+
+template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
+class TOneOneQueueInplace : TNonCopyable {
static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer");
-
- TChunk* ReadFrom;
- ui32 ReadPosition;
- ui32 WritePosition;
- TChunk* WriteTo;
-
- friend class TReadIterator;
-
-public:
- class TReadIterator {
- TChunk* ReadFrom;
- ui32 ReadPosition;
-
- public:
- TReadIterator(TChunk* readFrom, ui32 readPosition)
- : ReadFrom(readFrom)
- , ReadPosition(readPosition)
- {
- }
-
- inline T Next() {
- TChunk* head = ReadFrom;
- if (ReadPosition != TChunk::EntriesCount) {
- return AtomicLoad(&head->Entries[ReadPosition++]);
- } else if (TChunk* next = AtomicLoad(&head->Next)) {
- ReadFrom = next;
- ReadPosition = 0;
- return Next();
- }
+
+ TChunk* ReadFrom;
+ ui32 ReadPosition;
+ ui32 WritePosition;
+ TChunk* WriteTo;
+
+ friend class TReadIterator;
+
+public:
+ class TReadIterator {
+ TChunk* ReadFrom;
+ ui32 ReadPosition;
+
+ public:
+ TReadIterator(TChunk* readFrom, ui32 readPosition)
+ : ReadFrom(readFrom)
+ , ReadPosition(readPosition)
+ {
+ }
+
+ inline T Next() {
+ TChunk* head = ReadFrom;
+ if (ReadPosition != TChunk::EntriesCount) {
+ return AtomicLoad(&head->Entries[ReadPosition++]);
+ } else if (TChunk* next = AtomicLoad(&head->Next)) {
+ ReadFrom = next;
+ ReadPosition = 0;
+ return Next();
+ }
return T{};
- }
- };
-
- TOneOneQueueInplace()
- : ReadFrom(new TChunk())
- , ReadPosition(0)
- , WritePosition(0)
- , WriteTo(ReadFrom)
- {
- }
-
- ~TOneOneQueueInplace() {
+ }
+ };
+
+ TOneOneQueueInplace()
+ : ReadFrom(new TChunk())
+ , ReadPosition(0)
+ , WritePosition(0)
+ , WriteTo(ReadFrom)
+ {
+ }
+
+ ~TOneOneQueueInplace() {
Y_VERIFY_DEBUG(Head() == 0);
- delete ReadFrom;
- }
-
- struct TPtrCleanDestructor {
+ delete ReadFrom;
+ }
+
+ struct TPtrCleanDestructor {
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
- while (T head = x->Pop())
- delete head;
- delete x;
- }
- };
-
- struct TCleanDestructor {
+ while (T head = x->Pop())
+ delete head;
+ delete x;
+ }
+ };
+
+ struct TCleanDestructor {
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
while (x->Pop() != nullptr)
- continue;
- delete x;
- }
- };
-
- struct TPtrCleanInplaceMallocDestructor {
- template <typename TPtrVal>
+ continue;
+ delete x;
+ }
+ };
+
+ struct TPtrCleanInplaceMallocDestructor {
+ template <typename TPtrVal>
static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept {
- while (TPtrVal* head = x->Pop()) {
- head->~TPtrVal();
- free(head);
- }
- delete x;
- }
- };
-
+ while (TPtrVal* head = x->Pop()) {
+ head->~TPtrVal();
+ free(head);
+ }
+ delete x;
+ }
+ };
+
void Push(T x) noexcept {
- if (WritePosition != TChunk::EntriesCount) {
- AtomicStore(&WriteTo->Entries[WritePosition], x);
- ++WritePosition;
- } else {
- TChunk* next = new TChunk();
- next->Entries[0] = x;
- AtomicStore(&WriteTo->Next, next);
- WriteTo = next;
- WritePosition = 1;
- }
- }
-
- T Head() {
- TChunk* head = ReadFrom;
- if (ReadPosition != TChunk::EntriesCount) {
- return AtomicLoad(&head->Entries[ReadPosition]);
- } else if (TChunk* next = AtomicLoad(&head->Next)) {
- ReadFrom = next;
- delete head;
- ReadPosition = 0;
- return Head();
- }
+ if (WritePosition != TChunk::EntriesCount) {
+ AtomicStore(&WriteTo->Entries[WritePosition], x);
+ ++WritePosition;
+ } else {
+ TChunk* next = new TChunk();
+ next->Entries[0] = x;
+ AtomicStore(&WriteTo->Next, next);
+ WriteTo = next;
+ WritePosition = 1;
+ }
+ }
+
+ T Head() {
+ TChunk* head = ReadFrom;
+ if (ReadPosition != TChunk::EntriesCount) {
+ return AtomicLoad(&head->Entries[ReadPosition]);
+ } else if (TChunk* next = AtomicLoad(&head->Next)) {
+ ReadFrom = next;
+ delete head;
+ ReadPosition = 0;
+ return Head();
+ }
return T{};
- }
-
- T Pop() {
- T ret = Head();
- if (ret)
- ++ReadPosition;
- return ret;
- }
-
- TReadIterator Iterator() {
- return TReadIterator(ReadFrom, ReadPosition);
- }
-};
+ }
+
+ T Pop() {
+ T ret = Head();
+ if (ret)
+ ++ReadPosition;
+ return ret;
+ }
+
+ TReadIterator Iterator() {
+ return TReadIterator(ReadFrom, ReadPosition);
+ }
+};
diff --git a/library/cpp/actors/util/should_continue.cpp b/library/cpp/actors/util/should_continue.cpp
index 38bb6a28b3..258e6a0aff 100644
--- a/library/cpp/actors/util/should_continue.cpp
+++ b/library/cpp/actors/util/should_continue.cpp
@@ -1,23 +1,23 @@
-#include "should_continue.h"
-
-void TProgramShouldContinue::ShouldRestart() {
- AtomicSet(State, Restart);
-}
-
-void TProgramShouldContinue::ShouldStop(int returnCode) {
- AtomicSet(ReturnCode, returnCode);
- AtomicSet(State, Stop);
-}
-
-TProgramShouldContinue::EState TProgramShouldContinue::PollState() {
- return static_cast<EState>(AtomicGet(State));
-}
-
-int TProgramShouldContinue::GetReturnCode() {
- return static_cast<int>(AtomicGet(ReturnCode));
-}
-
-void TProgramShouldContinue::Reset() {
- AtomicSet(ReturnCode, 0);
- AtomicSet(State, Continue);
-}
+#include "should_continue.h"
+
+void TProgramShouldContinue::ShouldRestart() {
+ AtomicSet(State, Restart);
+}
+
+void TProgramShouldContinue::ShouldStop(int returnCode) {
+ AtomicSet(ReturnCode, returnCode);
+ AtomicSet(State, Stop);
+}
+
+TProgramShouldContinue::EState TProgramShouldContinue::PollState() {
+ return static_cast<EState>(AtomicGet(State));
+}
+
+int TProgramShouldContinue::GetReturnCode() {
+ return static_cast<int>(AtomicGet(ReturnCode));
+}
+
+void TProgramShouldContinue::Reset() {
+ AtomicSet(ReturnCode, 0);
+ AtomicSet(State, Continue);
+}
diff --git a/library/cpp/actors/util/should_continue.h b/library/cpp/actors/util/should_continue.h
index 8fc7f9dab0..76acc40dc4 100644
--- a/library/cpp/actors/util/should_continue.h
+++ b/library/cpp/actors/util/should_continue.h
@@ -1,22 +1,22 @@
-#pragma once
-#include "defs.h"
-
-class TProgramShouldContinue {
-public:
- enum EState {
- Continue,
- Stop,
- Restart,
- };
-
- void ShouldRestart();
- void ShouldStop(int returnCode = 0);
-
- EState PollState();
- int GetReturnCode();
-
- void Reset();
-private:
- TAtomic ReturnCode = 0;
- TAtomic State = Continue;
-};
+#pragma once
+#include "defs.h"
+
+class TProgramShouldContinue {
+public:
+ enum EState {
+ Continue,
+ Stop,
+ Restart,
+ };
+
+ void ShouldRestart();
+ void ShouldStop(int returnCode = 0);
+
+ EState PollState();
+ int GetReturnCode();
+
+ void Reset();
+private:
+ TAtomic ReturnCode = 0;
+ TAtomic State = Continue;
+};
diff --git a/library/cpp/actors/util/thread.h b/library/cpp/actors/util/thread.h
index 0afbe10921..d742c8c585 100644
--- a/library/cpp/actors/util/thread.h
+++ b/library/cpp/actors/util/thread.h
@@ -8,7 +8,7 @@
#include <time.h>
inline void SetCurrentThreadName(const TString& name,
- const ui32 maxCharsFromProcessName = 8) {
+ const ui32 maxCharsFromProcessName = 8) {
#if defined(_linux_)
// linux limits threadname by 15 + \0
diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp
index 88813e270c..74069ff15b 100644
--- a/library/cpp/actors/util/threadparkpad.cpp
+++ b/library/cpp/actors/util/threadparkpad.cpp
@@ -1,15 +1,15 @@
-#include "threadparkpad.h"
-#include <util/system/winint.h>
-
-#ifdef _linux_
-
-#include "futex.h"
-
-namespace NActors {
+#include "threadparkpad.h"
+#include <util/system/winint.h>
+
+#ifdef _linux_
+
+#include "futex.h"
+
+namespace NActors {
class TThreadParkPad::TImpl {
volatile bool Interrupted;
int Futex;
-
+
public:
TImpl()
: Interrupted(false)
@@ -18,39 +18,39 @@ namespace NActors {
}
~TImpl() {
}
-
+
bool Park() noexcept {
__atomic_fetch_sub(&Futex, 1, __ATOMIC_SEQ_CST);
while (__atomic_load_n(&Futex, __ATOMIC_ACQUIRE) == -1)
SysFutex(&Futex, FUTEX_WAIT_PRIVATE, -1, nullptr, nullptr, 0);
return IsInterrupted();
}
-
+
void Unpark() noexcept {
const int old = __atomic_fetch_add(&Futex, 1, __ATOMIC_SEQ_CST);
if (old == -1)
SysFutex(&Futex, FUTEX_WAKE_PRIVATE, -1, nullptr, nullptr, 0);
}
-
+
void Interrupt() noexcept {
__atomic_store_n(&Interrupted, true, __ATOMIC_SEQ_CST);
Unpark();
}
-
+
bool IsInterrupted() const noexcept {
return __atomic_load_n(&Interrupted, __ATOMIC_ACQUIRE);
}
};
-
-#elif defined _win32_
+
+#elif defined _win32_
#include <util/generic/bt_exception.h>
-#include <util/generic/yexception.h>
-
-namespace NActors {
+#include <util/generic/yexception.h>
+
+namespace NActors {
class TThreadParkPad::TImpl {
TAtomic Interrupted;
HANDLE EvHandle;
-
+
public:
TImpl()
: Interrupted(false)
@@ -63,35 +63,35 @@ namespace NActors {
if (EvHandle)
::CloseHandle(EvHandle);
}
-
+
bool Park() noexcept {
::WaitForSingleObject(EvHandle, INFINITE);
return AtomicGet(Interrupted);
}
-
+
void Unpark() noexcept {
::SetEvent(EvHandle);
}
-
+
void Interrupt() noexcept {
AtomicSet(Interrupted, true);
Unpark();
}
-
+
bool IsInterrupted() const noexcept {
return AtomicGet(Interrupted);
}
};
-
-#else
-
-#include <util/system/event.h>
-
-namespace NActors {
+
+#else
+
+#include <util/system/event.h>
+
+namespace NActors {
class TThreadParkPad::TImpl {
TAtomic Interrupted;
TSystemEvent Ev;
-
+
public:
TImpl()
: Interrupted(false)
@@ -100,7 +100,7 @@ namespace NActors {
}
~TImpl() {
}
-
+
bool Park() noexcept {
Ev.Wait();
return AtomicGet(Interrupted);
@@ -123,26 +123,26 @@ namespace NActors {
TThreadParkPad::TThreadParkPad()
: Impl(new TThreadParkPad::TImpl())
- {
- }
+ {
+ }
TThreadParkPad::~TThreadParkPad() {
- }
-
+ }
+
bool TThreadParkPad::Park() noexcept {
return Impl->Park();
- }
-
+ }
+
void TThreadParkPad::Unpark() noexcept {
Impl->Unpark();
- }
-
+ }
+
void TThreadParkPad::Interrupt() noexcept {
Impl->Interrupt();
- }
-
+ }
+
bool TThreadParkPad::Interrupted() const noexcept {
return Impl->IsInterrupted();
- }
-
-}
+ }
+
+}
diff --git a/library/cpp/actors/util/threadparkpad.h b/library/cpp/actors/util/threadparkpad.h
index 30f9cffdb9..5b574ccf34 100644
--- a/library/cpp/actors/util/threadparkpad.h
+++ b/library/cpp/actors/util/threadparkpad.h
@@ -1,21 +1,21 @@
-#pragma once
-
-#include <util/generic/ptr.h>
-
-namespace NActors {
- class TThreadParkPad {
- private:
- class TImpl;
- THolder<TImpl> Impl;
-
- public:
- TThreadParkPad();
+#pragma once
+
+#include <util/generic/ptr.h>
+
+namespace NActors {
+ class TThreadParkPad {
+ private:
+ class TImpl;
+ THolder<TImpl> Impl;
+
+ public:
+ TThreadParkPad();
~TThreadParkPad();
-
+
bool Park() noexcept;
void Unpark() noexcept;
void Interrupt() noexcept;
bool Interrupted() const noexcept;
- };
-
-}
+ };
+
+}
diff --git a/library/cpp/actors/util/ticket_lock.h b/library/cpp/actors/util/ticket_lock.h
index 334922a088..3b1fa80393 100644
--- a/library/cpp/actors/util/ticket_lock.h
+++ b/library/cpp/actors/util/ticket_lock.h
@@ -1,48 +1,48 @@
-#pragma once
-
-#include "intrinsics.h"
-#include <util/system/guard.h>
-#include <util/system/yassert.h>
-
-class TTicketLock : TNonCopyable {
- ui32 TicketIn;
- ui32 TicketOut;
-
-public:
- TTicketLock()
- : TicketIn(0)
- , TicketOut(0)
- {
- }
-
+#pragma once
+
+#include "intrinsics.h"
+#include <util/system/guard.h>
+#include <util/system/yassert.h>
+
+class TTicketLock : TNonCopyable {
+ ui32 TicketIn;
+ ui32 TicketOut;
+
+public:
+ TTicketLock()
+ : TicketIn(0)
+ , TicketOut(0)
+ {
+ }
+
void Release() noexcept {
- AtomicUi32Increment(&TicketOut);
- }
-
+ AtomicUi32Increment(&TicketOut);
+ }
+
ui32 Acquire() noexcept {
- ui32 revolves = 0;
- const ui32 ticket = AtomicUi32Increment(&TicketIn) - 1;
- while (ticket != AtomicLoad(&TicketOut)) {
+ ui32 revolves = 0;
+ const ui32 ticket = AtomicUi32Increment(&TicketIn) - 1;
+ while (ticket != AtomicLoad(&TicketOut)) {
Y_VERIFY_DEBUG(ticket >= AtomicLoad(&TicketOut));
- SpinLockPause();
- ++revolves;
- }
- return revolves;
- }
-
+ SpinLockPause();
+ ++revolves;
+ }
+ return revolves;
+ }
+
bool TryAcquire() noexcept {
- const ui32 x = AtomicLoad(&TicketOut);
- if (x == AtomicLoad(&TicketIn) && AtomicUi32Cas(&TicketIn, x + 1, x))
- return true;
- else
- return false;
- }
-
+ const ui32 x = AtomicLoad(&TicketOut);
+ if (x == AtomicLoad(&TicketIn) && AtomicUi32Cas(&TicketIn, x + 1, x))
+ return true;
+ else
+ return false;
+ }
+
bool IsLocked() noexcept {
- const ui32 ticketIn = AtomicLoad(&TicketIn);
- const ui32 ticketOut = AtomicLoad(&TicketOut);
- return (ticketIn != ticketOut);
- }
-
- typedef ::TGuard<TTicketLock> TGuard;
-};
+ const ui32 ticketIn = AtomicLoad(&TicketIn);
+ const ui32 ticketOut = AtomicLoad(&TicketOut);
+ return (ticketIn != ticketOut);
+ }
+
+ typedef ::TGuard<TTicketLock> TGuard;
+};
diff --git a/library/cpp/actors/util/unordered_cache.h b/library/cpp/actors/util/unordered_cache.h
index 57b9c6663d..76f036c0cf 100644
--- a/library/cpp/actors/util/unordered_cache.h
+++ b/library/cpp/actors/util/unordered_cache.h
@@ -1,22 +1,22 @@
-#pragma once
-
-#include "defs.h"
+#pragma once
+
+#include "defs.h"
#include "queue_chunk.h"
-
+
template <typename T, ui32 Size = 512, ui32 ConcurrencyFactor = 1, typename TChunk = TQueueChunk<T, Size>>
-class TUnorderedCache : TNonCopyable {
+class TUnorderedCache : TNonCopyable {
static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
-
-public:
+
+public:
static constexpr ui32 Concurrency = ConcurrencyFactor * 4;
-
-private:
+
+private:
struct TReadSlot {
TChunk* volatile ReadFrom;
volatile ui32 ReadPosition;
char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line
};
-
+
struct TWriteSlot {
TChunk* volatile WriteTo;
volatile ui32 WritePosition;
@@ -31,7 +31,7 @@ private:
TWriteSlot WriteSlots[Concurrency];
static_assert(sizeof(TChunk*) == sizeof(TAtomic), "expect sizeof(TChunk*) == sizeof(TAtomic)");
-
+
private:
struct TLockedWriter {
TWriteSlot* Slot;
@@ -82,53 +82,53 @@ private:
private:
TLockedWriter LockWriter(ui64 writerRotation) {
ui32 cycle = 0;
- for (;;) {
+ for (;;) {
TWriteSlot* slot = &WriteSlots[writerRotation % Concurrency];
if (AtomicLoad(&slot->WriteTo) != nullptr) {
if (TChunk* writeTo = AtomicSwap(&slot->WriteTo, nullptr)) {
return TLockedWriter(slot, writeTo);
- }
- }
+ }
+ }
++writerRotation;
-
+
// Do a spinlock pause after a full cycle
if (++cycle == Concurrency) {
SpinLockPause();
cycle = 0;
}
}
- }
-
+ }
+
void WriteOne(TLockedWriter& lock, T x) {
Y_VERIFY_DEBUG(x != 0);
-
+
const ui32 pos = AtomicLoad(&lock.Slot->WritePosition);
- if (pos != TChunk::EntriesCount) {
+ if (pos != TChunk::EntriesCount) {
AtomicStore(&lock.Slot->WritePosition, pos + 1);
AtomicStore(&lock.WriteTo->Entries[pos], x);
- } else {
+ } else {
TChunk* next = new TChunk();
- AtomicStore(&next->Entries[0], x);
+ AtomicStore(&next->Entries[0], x);
AtomicStore(&lock.Slot->WritePosition, 1u);
AtomicStore(&lock.WriteTo->Next, next);
lock.WriteTo = next;
- }
- }
-
-public:
+ }
+ }
+
+public:
TUnorderedCache() {
for (ui32 i = 0; i < Concurrency; ++i) {
ReadSlots[i].ReadFrom = new TChunk();
ReadSlots[i].ReadPosition = 0;
-
+
WriteSlots[i].WriteTo = ReadSlots[i].ReadFrom;
WriteSlots[i].WritePosition = 0;
- }
- }
-
+ }
+ }
+
~TUnorderedCache() {
Y_VERIFY(!Pop(0));
-
+
for (ui64 i = 0; i < Concurrency; ++i) {
if (ReadSlots[i].ReadFrom) {
delete ReadSlots[i].ReadFrom;
@@ -139,63 +139,63 @@ public:
}
T Pop(ui64 readerRotation) noexcept {
- ui64 readerIndex = readerRotation;
- const ui64 endIndex = readerIndex + Concurrency;
- for (; readerIndex != endIndex; ++readerIndex) {
+ ui64 readerIndex = readerRotation;
+ const ui64 endIndex = readerIndex + Concurrency;
+ for (; readerIndex != endIndex; ++readerIndex) {
TReadSlot* slot = &ReadSlots[readerIndex % Concurrency];
if (AtomicLoad(&slot->ReadFrom) != nullptr) {
if (TChunk* readFrom = AtomicSwap(&slot->ReadFrom, nullptr)) {
const ui32 pos = AtomicLoad(&slot->ReadPosition);
- if (pos != TChunk::EntriesCount) {
- if (T ret = AtomicLoad(&readFrom->Entries[pos])) {
+ if (pos != TChunk::EntriesCount) {
+ if (T ret = AtomicLoad(&readFrom->Entries[pos])) {
AtomicStore(&slot->ReadPosition, pos + 1);
AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk
return ret; // found, return
- } else {
+ } else {
AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk
- }
- } else if (TChunk* next = AtomicLoad(&readFrom->Next)) {
- if (T ret = AtomicLoad(&next->Entries[0])) {
+ }
+ } else if (TChunk* next = AtomicLoad(&readFrom->Next)) {
+ if (T ret = AtomicLoad(&next->Entries[0])) {
AtomicStore(&slot->ReadPosition, 1u);
AtomicStore(&slot->ReadFrom, next); // release lock with next chunk
- delete readFrom;
- return ret;
- } else {
+ delete readFrom;
+ return ret;
+ } else {
AtomicStore(&slot->ReadPosition, 0u);
AtomicStore(&slot->ReadFrom, next); // release lock with new chunk
- delete readFrom;
- }
- } else {
+ delete readFrom;
+ }
+ } else {
// nothing in old chunk and no next chunk, just release lock with old chunk
AtomicStore(&slot->ReadFrom, readFrom);
- }
- }
- }
- }
-
- return 0; // got nothing after full cycle, return
- }
-
- void Push(T x, ui64 writerRotation) {
+ }
+ }
+ }
+ }
+
+ return 0; // got nothing after full cycle, return
+ }
+
+ void Push(T x, ui64 writerRotation) {
TLockedWriter lock = LockWriter(writerRotation);
WriteOne(lock, x);
- }
-
- void PushBulk(T* x, ui32 xcount, ui64 writerRotation) {
+ }
+
+ void PushBulk(T* x, ui32 xcount, ui64 writerRotation) {
for (;;) {
// Fill no more then one queue chunk per round
- const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount);
-
+ const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount);
+
{
TLockedWriter lock = LockWriter(writerRotation++);
for (T* end = x + xround; x != end; ++x)
WriteOne(lock, *x);
}
-
- if (xcount <= TChunk::EntriesCount)
- break;
-
- xcount -= TChunk::EntriesCount;
- }
- }
-};
+
+ if (xcount <= TChunk::EntriesCount)
+ break;
+
+ xcount -= TChunk::EntriesCount;
+ }
+ }
+};
diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make
index a8bc36c769..37488c3962 100644
--- a/library/cpp/actors/util/ya.make
+++ b/library/cpp/actors/util/ya.make
@@ -1,37 +1,37 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
ddoarn
g:kikimr
)
-
-SRCS(
- affinity.cpp
- affinity.h
+
+SRCS(
+ affinity.cpp
+ affinity.h
cpumask.h
datetime.h
- defs.h
+ defs.h
funnel_queue.h
- futex.h
- intrinsics.h
+ futex.h
+ intrinsics.h
local_process_key.h
- named_tuple.h
- queue_chunk.h
- queue_oneone_inplace.h
+ named_tuple.h
+ queue_chunk.h
+ queue_oneone_inplace.h
recentwnd.h
rope.h
- should_continue.cpp
- should_continue.h
- thread.h
- threadparkpad.cpp
- threadparkpad.h
- ticket_lock.h
+ should_continue.cpp
+ should_continue.h
+ thread.h
+ threadparkpad.cpp
+ threadparkpad.h
+ ticket_lock.h
timerfd.h
- unordered_cache.h
-)
-
-PEERDIR(
- util
-)
-
-END()
+ unordered_cache.h
+)
+
+PEERDIR(
+ util
+)
+
+END()