aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/threading
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/atomic/bool.h12
-rw-r--r--library/cpp/threading/chunk_queue/queue.h958
-rw-r--r--library/cpp/threading/chunk_queue/queue_ut.cpp284
-rw-r--r--library/cpp/threading/future/async.h14
-rw-r--r--library/cpp/threading/future/async_ut.cpp48
-rw-r--r--library/cpp/threading/future/core/future-inl.h1200
-rw-r--r--library/cpp/threading/future/core/future.h240
-rw-r--r--library/cpp/threading/future/future_ut.cpp474
-rw-r--r--library/cpp/threading/future/legacy_future.h104
-rw-r--r--library/cpp/threading/future/legacy_future_ut.cpp78
-rw-r--r--library/cpp/threading/future/perf/main.cpp20
-rw-r--r--library/cpp/threading/future/wait/wait-inl.h10
-rw-r--r--library/cpp/threading/future/wait/wait.cpp16
-rw-r--r--library/cpp/threading/future/wait/wait.h8
-rw-r--r--library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp2
-rw-r--r--library/cpp/threading/light_rw_lock/lightrwlock.cpp2
-rw-r--r--library/cpp/threading/light_rw_lock/lightrwlock.h144
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp2
-rw-r--r--library/cpp/threading/local_executor/local_executor.h6
-rw-r--r--library/cpp/threading/local_executor/ut/local_executor_ut.cpp492
-rw-r--r--library/cpp/threading/poor_man_openmp/thread_helper.cpp12
-rw-r--r--library/cpp/threading/poor_man_openmp/thread_helper.h108
-rw-r--r--library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp42
-rw-r--r--library/cpp/threading/poor_man_openmp/ut/ya.make14
-rw-r--r--library/cpp/threading/queue/mpsc_htswap.h12
-rw-r--r--library/cpp/threading/queue/mpsc_intrusive_unordered.h2
-rw-r--r--library/cpp/threading/queue/mpsc_read_as_filled.h14
-rw-r--r--library/cpp/threading/queue/mpsc_vinfarr_obstructive.h8
-rw-r--r--library/cpp/threading/queue/queue_ut.cpp2
-rw-r--r--library/cpp/threading/queue/tune.h16
-rw-r--r--library/cpp/threading/queue/unordered_ut.cpp8
-rw-r--r--library/cpp/threading/queue/ut_helpers.h12
-rw-r--r--library/cpp/threading/skip_list/compare.h114
-rw-r--r--library/cpp/threading/skip_list/perf/main.cpp586
-rw-r--r--library/cpp/threading/skip_list/skiplist.h642
-rw-r--r--library/cpp/threading/skip_list/skiplist_ut.cpp60
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler_ut.cpp24
37 files changed, 2895 insertions, 2895 deletions
diff --git a/library/cpp/threading/atomic/bool.h b/library/cpp/threading/atomic/bool.h
index ec8f75427b..d52544e762 100644
--- a/library/cpp/threading/atomic/bool.h
+++ b/library/cpp/threading/atomic/bool.h
@@ -1,7 +1,7 @@
-#pragma once
-
+#pragma once
+
#include <util/system/atomic.h>
-
+
namespace NAtomic {
class TBool {
public:
@@ -20,12 +20,12 @@ namespace NAtomic {
return AtomicGet(Val_);
}
- const TBool& operator=(bool val) noexcept {
+ const TBool& operator=(bool val) noexcept {
AtomicSet(Val_, val);
return *this;
}
- const TBool& operator=(const TBool& src) noexcept {
+ const TBool& operator=(const TBool& src) noexcept {
AtomicSet(Val_, AtomicGet(src.Val_));
return *this;
}
@@ -33,4 +33,4 @@ namespace NAtomic {
private:
TAtomic Val_ = 0;
};
-}
+}
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
index 1959b0258c..55859601a1 100644
--- a/library/cpp/threading/chunk_queue/queue.h
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -19,21 +19,21 @@ namespace NThreading {
// Platform helpers
#if !defined(PLATFORM_CACHE_LINE)
-#define PLATFORM_CACHE_LINE 64
+#define PLATFORM_CACHE_LINE 64
#endif
#if !defined(PLATFORM_PAGE_SIZE)
-#define PLATFORM_PAGE_SIZE 4 * 1024
+#define PLATFORM_PAGE_SIZE 4 * 1024
#endif
- template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
- struct TPadded: public T {
- char Pad[PadSize - sizeof(T) % PadSize];
+ template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
+ struct TPadded: public T {
+ char Pad[PadSize - sizeof(T) % PadSize];
- TPadded() {
- static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
- Y_UNUSED(Pad);
- }
+ TPadded() {
+ static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
+ Y_UNUSED(Pad);
+ }
template<typename... Args>
TPadded(Args&&... args)
@@ -42,280 +42,280 @@ namespace NThreading {
static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
Y_UNUSED(Pad);
}
- };
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Type helpers
+
+ namespace NImpl {
+ template <typename T>
+ struct TPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ *ptr = value;
+ }
+
+ static T Read(T* ptr) {
+ return *ptr;
+ }
+
+ static void Destroy(T* ptr) {
+ Y_UNUSED(ptr);
+ }
+ };
+
+ template <typename T>
+ struct TNonPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ new (ptr) T(std::forward<TT>(value));
+ }
+
+ static T Read(T* ptr) {
+ return std::move(*ptr);
+ }
+
+ static void Destroy(T* ptr) {
+ (void)ptr; /* Make MSVC happy. */
+ ptr->~T();
+ }
+ };
+
+ template <typename T>
+ using TTypeHelper = std::conditional_t<
+ TTypeTraits<T>::IsPod,
+ TPodTypeHelper<T>,
+ TNonPodTypeHelper<T>>;
+
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // One producer/one consumer chunked queue.
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TOneOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TChunk;
+
+ struct TChunkHeader {
+ size_t Count = 0;
+ TChunk* Next = nullptr;
+ };
+
+ struct TChunk: public TChunkHeader {
+ static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
+
+ char Entries[MaxCount * sizeof(T)];
+
+ TChunk() {
+ Y_UNUSED(Entries); // uninitialized
+ }
+
+ ~TChunk() {
+ for (size_t i = 0; i < this->Count; ++i) {
+ TTypeHelper::Destroy(GetPtr(i));
+ }
+ }
+
+ T* GetPtr(size_t i) {
+ return (T*)Entries + i;
+ }
+ };
+
+ struct TWriterState {
+ TChunk* Chunk = nullptr;
+ };
+
+ struct TReaderState {
+ TChunk* Chunk = nullptr;
+ size_t Count = 0;
+ };
+
+ private:
+ TPadded<TWriterState> Writer;
+ TPadded<TReaderState> Reader;
+
+ public:
+ using TItem = T;
+
+ TOneOneQueue() {
+ Writer.Chunk = Reader.Chunk = new TChunk();
+ }
+
+ ~TOneOneQueue() {
+ DeleteChunks(Reader.Chunk);
+ }
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ T* ptr = PrepareWrite();
+ Y_ASSERT(ptr);
+ TTypeHelper::Write(ptr, std::forward<TT>(value));
+ CompleteWrite();
+ }
+
+ bool Dequeue(T& value) {
+ if (T* ptr = PrepareRead()) {
+ value = TTypeHelper::Read(ptr);
+ CompleteRead();
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ return !PrepareRead();
+ }
+
+ protected:
+ T* PrepareWrite() {
+ TChunk* chunk = Writer.Chunk;
+ Y_ASSERT(chunk && !chunk->Next);
+
+ if (chunk->Count != TChunk::MaxCount) {
+ return chunk->GetPtr(chunk->Count);
+ }
+
+ chunk = new TChunk();
+ AtomicSet(Writer.Chunk->Next, chunk);
+ Writer.Chunk = chunk;
+ return chunk->GetPtr(0);
+ }
+
+ void CompleteWrite() {
+ AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
+ }
+
+ T* PrepareRead() {
+ TChunk* chunk = Reader.Chunk;
+ Y_ASSERT(chunk);
+
+ for (;;) {
+ size_t writerCount = AtomicGet(chunk->Count);
+ if (Reader.Count != writerCount) {
+ return chunk->GetPtr(Reader.Count);
+ }
+
+ if (writerCount != TChunk::MaxCount) {
+ return nullptr;
+ }
+
+ chunk = AtomicGet(chunk->Next);
+ if (!chunk) {
+ return nullptr;
+ }
+
+ delete Reader.Chunk;
+ Reader.Chunk = chunk;
+ Reader.Count = 0;
+ }
+ }
- ////////////////////////////////////////////////////////////////////////////////
- // Type helpers
+ void CompleteRead() {
+ ++Reader.Count;
+ }
- namespace NImpl {
- template <typename T>
- struct TPodTypeHelper {
- template <typename TT>
- static void Write(T* ptr, TT&& value) {
- *ptr = value;
- }
+ private:
+ static void DeleteChunks(TChunk* chunk) {
+ while (chunk) {
+ TChunk* next = chunk->Next;
+ delete chunk;
+ chunk = next;
+ }
+ }
+ };
- static T Read(T* ptr) {
- return *ptr;
- }
-
- static void Destroy(T* ptr) {
- Y_UNUSED(ptr);
- }
- };
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Provides FIFO guaranties for each producer.
- template <typename T>
- struct TNonPodTypeHelper {
- template <typename TT>
- static void Write(T* ptr, TT&& value) {
- new (ptr) T(std::forward<TT>(value));
- }
-
- static T Read(T* ptr) {
- return std::move(*ptr);
- }
-
- static void Destroy(T* ptr) {
- (void)ptr; /* Make MSVC happy. */
- ptr->~T();
- }
- };
-
- template <typename T>
- using TTypeHelper = std::conditional_t<
- TTypeTraits<T>::IsPod,
- TPodTypeHelper<T>,
- TNonPodTypeHelper<T>>;
-
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- // One producer/one consumer chunked queue.
-
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TOneOneQueue: private TNonCopyable {
- using TTypeHelper = NImpl::TTypeHelper<T>;
-
- struct TChunk;
-
- struct TChunkHeader {
- size_t Count = 0;
- TChunk* Next = nullptr;
- };
-
- struct TChunk: public TChunkHeader {
- static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
-
- char Entries[MaxCount * sizeof(T)];
-
- TChunk() {
- Y_UNUSED(Entries); // uninitialized
- }
-
- ~TChunk() {
- for (size_t i = 0; i < this->Count; ++i) {
- TTypeHelper::Destroy(GetPtr(i));
- }
- }
-
- T* GetPtr(size_t i) {
- return (T*)Entries + i;
- }
- };
-
- struct TWriterState {
- TChunk* Chunk = nullptr;
- };
-
- struct TReaderState {
- TChunk* Chunk = nullptr;
- size_t Count = 0;
- };
-
- private:
- TPadded<TWriterState> Writer;
- TPadded<TReaderState> Reader;
-
- public:
- using TItem = T;
-
- TOneOneQueue() {
- Writer.Chunk = Reader.Chunk = new TChunk();
- }
-
- ~TOneOneQueue() {
- DeleteChunks(Reader.Chunk);
- }
-
- template <typename TT>
- void Enqueue(TT&& value) {
- T* ptr = PrepareWrite();
- Y_ASSERT(ptr);
- TTypeHelper::Write(ptr, std::forward<TT>(value));
- CompleteWrite();
- }
-
- bool Dequeue(T& value) {
- if (T* ptr = PrepareRead()) {
- value = TTypeHelper::Read(ptr);
- CompleteRead();
- return true;
- }
- return false;
- }
-
- bool IsEmpty() {
- return !PrepareRead();
- }
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TManyOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
- protected:
- T* PrepareWrite() {
- TChunk* chunk = Writer.Chunk;
- Y_ASSERT(chunk && !chunk->Next);
-
- if (chunk->Count != TChunk::MaxCount) {
- return chunk->GetPtr(chunk->Count);
- }
-
- chunk = new TChunk();
- AtomicSet(Writer.Chunk->Next, chunk);
- Writer.Chunk = chunk;
- return chunk->GetPtr(0);
- }
-
- void CompleteWrite() {
- AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
- }
+ struct TEntry {
+ T Value;
+ ui64 Tag;
+ };
+
+ struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
+ TAtomic WriteLock = 0;
- T* PrepareRead() {
- TChunk* chunk = Reader.Chunk;
- Y_ASSERT(chunk);
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
+ };
- for (;;) {
- size_t writerCount = AtomicGet(chunk->Count);
- if (Reader.Count != writerCount) {
- return chunk->GetPtr(Reader.Count);
- }
-
- if (writerCount != TChunk::MaxCount) {
- return nullptr;
- }
-
- chunk = AtomicGet(chunk->Next);
- if (!chunk) {
- return nullptr;
- }
-
- delete Reader.Chunk;
- Reader.Chunk = chunk;
- Reader.Count = 0;
- }
- }
-
- void CompleteRead() {
- ++Reader.Count;
- }
-
- private:
- static void DeleteChunks(TChunk* chunk) {
- while (chunk) {
- TChunk* next = chunk->Next;
- delete chunk;
- chunk = next;
- }
- }
- };
-
- ////////////////////////////////////////////////////////////////////////////////
- // Multiple producers/single consumer partitioned queue.
- // Provides FIFO guaranties for each producer.
-
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TManyOneQueue: private TNonCopyable {
- using TTypeHelper = NImpl::TTypeHelper<T>;
-
- struct TEntry {
- T Value;
- ui64 Tag;
- };
-
- struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
- TAtomic WriteLock = 0;
-
- using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
- using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
-
- using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
- using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
- };
-
- private:
- union {
- TAtomic WriteTag = 0;
- char Pad[PLATFORM_CACHE_LINE];
- };
-
- TQueueType Queues[Concurrency];
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- ui64 tag = NextTag();
- while (!TryEnqueue(std::forward<TT>(value), tag)) {
- SpinLockPause();
- }
- }
-
- bool Dequeue(T& value) {
- size_t index = 0;
- if (TEntry* entry = PrepareRead(index)) {
- value = TTypeHelper::Read(&entry->Value);
- Queues[index].CompleteRead();
- return true;
- }
- return false;
- }
+ private:
+ union {
+ TAtomic WriteTag = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ ui64 tag = NextTag();
+ while (!TryEnqueue(std::forward<TT>(value), tag)) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t index = 0;
+ if (TEntry* entry = PrepareRead(index)) {
+ value = TTypeHelper::Read(&entry->Value);
+ Queues[index].CompleteRead();
+ return true;
+ }
+ return false;
+ }
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- if (!Queues[i].IsEmpty()) {
- return false;
- }
- }
- return true;
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
}
- private:
- ui64 NextTag() {
- // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
- // return GetCycleCount();
- return AtomicIncrement(WriteTag);
- }
+ private:
+ ui64 NextTag() {
+ // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
+ // return GetCycleCount();
+ return AtomicIncrement(WriteTag);
+ }
- template <typename TT>
- bool TryEnqueue(TT&& value, ui64 tag) {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- TEntry* entry = queue.PrepareWrite();
- Y_ASSERT(entry);
- TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
- entry->Tag = tag;
- queue.CompleteWrite();
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
+ template <typename TT>
+ bool TryEnqueue(TT&& value, ui64 tag) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ TEntry* entry = queue.PrepareWrite();
+ Y_ASSERT(entry);
+ TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
+ entry->Tag = tag;
+ queue.CompleteWrite();
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
}
- return false;
+ return false;
}
- TEntry* PrepareRead(size_t& index) {
- TEntry* entry = nullptr;
- ui64 tag = Max();
+ TEntry* PrepareRead(size_t& index) {
+ TEntry* entry = nullptr;
+ ui64 tag = Max();
- for (size_t i = 0; i < Concurrency; ++i) {
+ for (size_t i = 0; i < Concurrency; ++i) {
TEntry* e = Queues[i].PrepareRead();
if (e && e->Tag < tag) {
index = i;
@@ -323,246 +323,246 @@ namespace NThreading {
tag = e->Tag;
}
}
-
- if (entry) {
- // need second pass to catch updates within already scanned range
- size_t candidate = index;
- for (size_t i = 0; i < candidate; ++i) {
- TEntry* e = Queues[i].PrepareRead();
- if (e && e->Tag < tag) {
- index = i;
- entry = e;
- tag = e->Tag;
- }
- }
- }
-
- return entry;
+
+ if (entry) {
+ // need second pass to catch updates within already scanned range
+ size_t candidate = index;
+ for (size_t i = 0; i < candidate; ++i) {
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
+ }
+
+ return entry;
}
- };
+ };
- ////////////////////////////////////////////////////////////////////////////////
- // Concurrent many-many queue with strong FIFO guaranties.
- // Writers will not block readers (and vice versa), but will block each other.
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many queue with strong FIFO guaranties.
+ // Writers will not block readers (and vice versa), but will block each other.
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
- class TManyManyQueue: private TNonCopyable {
- private:
- TPadded<TLock> WriteLock;
- TPadded<TLock> ReadLock;
-
- TOneOneQueue<T, ChunkSize> Queue;
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- with_lock (WriteLock) {
- Queue.Enqueue(std::forward<TT>(value));
- }
- }
-
- bool Dequeue(T& value) {
- with_lock (ReadLock) {
- return Queue.Dequeue(value);
- }
- }
-
- bool IsEmpty() {
- with_lock (ReadLock) {
- return Queue.IsEmpty();
- }
- }
- };
-
- ////////////////////////////////////////////////////////////////////////////////
- // Multiple producers/single consumer partitioned queue.
- // Because of random partitioning reordering possible - FIFO not guaranteed!
-
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TRelaxedManyOneQueue: private TNonCopyable {
- struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- TAtomic WriteLock = 0;
- };
-
- private:
- union {
- size_t ReadPos = 0;
- char Pad[PLATFORM_CACHE_LINE];
- };
-
- TQueueType Queues[Concurrency];
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- while (!TryEnqueue(std::forward<TT>(value))) {
- SpinLockPause();
- }
- }
-
- bool Dequeue(T& value) {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[ReadPos++ % Concurrency];
- if (queue.Dequeue(value)) {
- return true;
- }
- }
- return false;
- }
-
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- if (!Queues[i].IsEmpty()) {
- return false;
- }
- }
- return true;
- }
-
- private:
- template <typename TT>
- bool TryEnqueue(TT&& value) {
- size_t writePos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
- }
- return false;
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ class TManyManyQueue: private TNonCopyable {
+ private:
+ TPadded<TLock> WriteLock;
+ TPadded<TLock> ReadLock;
+
+ TOneOneQueue<T, ChunkSize> Queue;
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ with_lock (WriteLock) {
+ Queue.Enqueue(std::forward<TT>(value));
+ }
}
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Concurrent many-many partitioned queue.
- // Because of random partitioning reordering possible - FIFO not guaranteed!
-
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TRelaxedManyManyQueue: private TNonCopyable {
- struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- union {
- TAtomic WriteLock = 0;
- char Pad1[PLATFORM_CACHE_LINE];
- };
- union {
- TAtomic ReadLock = 0;
- char Pad2[PLATFORM_CACHE_LINE];
- };
+ bool Dequeue(T& value) {
+ with_lock (ReadLock) {
+ return Queue.Dequeue(value);
+ }
+ }
+
+ bool IsEmpty() {
+ with_lock (ReadLock) {
+ return Queue.IsEmpty();
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyOneQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ TAtomic WriteLock = 0;
+ };
+
+ private:
+ union {
+ size_t ReadPos = 0;
+ char Pad[PLATFORM_CACHE_LINE];
};
- private:
- TQueueType Queues[Concurrency];
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- while (!TryEnqueue(std::forward<TT>(value))) {
- SpinLockPause();
- }
- }
-
- bool Dequeue(T& value) {
- size_t readPos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[readPos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
- bool dequeued = queue.Dequeue(value);
- AtomicUnlock(&queue.ReadLock);
- if (dequeued) {
- return true;
- }
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[ReadPos++ % Concurrency];
+ if (queue.Dequeue(value)) {
+ return true;
}
}
- return false;
+ return false;
}
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
- bool empty = queue.IsEmpty();
- AtomicUnlock(&queue.ReadLock);
- if (!empty) {
- return false;
- }
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
}
}
- return true;
+ return true;
}
- private:
- template <typename TT>
- bool TryEnqueue(TT&& value) {
- size_t writePos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
}
- return false;
+ return false;
}
- };
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyManyQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ union {
+ TAtomic WriteLock = 0;
+ char Pad1[PLATFORM_CACHE_LINE];
+ };
+ union {
+ TAtomic ReadLock = 0;
+ char Pad2[PLATFORM_CACHE_LINE];
+ };
+ };
- ////////////////////////////////////////////////////////////////////////////////
- // Simple wrapper to deal with AutoPtrs
+ private:
+ TQueueType Queues[Concurrency];
- template <typename T, typename TImpl>
- class TAutoQueueBase: private TNonCopyable {
- private:
- TImpl Impl;
+ public:
+ using TItem = T;
- public:
- using TItem = TAutoPtr<T>;
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
- ~TAutoQueueBase() {
- TAutoPtr<T> value;
- while (Dequeue(value)) {
- // do nothing
- }
+ bool Dequeue(T& value) {
+ size_t readPos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[readPos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool dequeued = queue.Dequeue(value);
+ AtomicUnlock(&queue.ReadLock);
+ if (dequeued) {
+ return true;
+ }
+ }
+ }
+ return false;
}
- void Enqueue(TAutoPtr<T> value) {
- Impl.Enqueue(value.Get());
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool empty = queue.IsEmpty();
+ AtomicUnlock(&queue.ReadLock);
+ if (!empty) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Simple wrapper to deal with AutoPtrs
+
+ template <typename T, typename TImpl>
+ class TAutoQueueBase: private TNonCopyable {
+ private:
+ TImpl Impl;
+
+ public:
+ using TItem = TAutoPtr<T>;
+
+ ~TAutoQueueBase() {
+ TAutoPtr<T> value;
+ while (Dequeue(value)) {
+ // do nothing
+ }
+ }
+
+ void Enqueue(TAutoPtr<T> value) {
+ Impl.Enqueue(value.Get());
Y_UNUSED(value.Release());
- }
+ }
- bool Dequeue(TAutoPtr<T>& value) {
- T* ptr = nullptr;
- if (Impl.Dequeue(ptr)) {
- value.Reset(ptr);
- return true;
- }
- return false;
+ bool Dequeue(TAutoPtr<T>& value) {
+ T* ptr = nullptr;
+ if (Impl.Dequeue(ptr)) {
+ value.Reset(ptr);
+ return true;
+ }
+ return false;
}
- bool IsEmpty() {
- return Impl.IsEmpty();
- }
- };
+ bool IsEmpty() {
+ return Impl.IsEmpty();
+ }
+ };
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
- using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
-}
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
+}
diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp
index 406b71dd4c..8cb36d8dd1 100644
--- a/library/cpp/threading/chunk_queue/queue_ut.cpp
+++ b/library/cpp/threading/chunk_queue/queue_ut.cpp
@@ -5,55 +5,55 @@
#include <util/generic/set.h>
namespace NThreading {
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
Y_UNIT_TEST_SUITE(TOneOneQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
- TOneOneQueue<int> queue;
+ TOneOneQueue<int> queue;
- int result = 0;
- UNIT_ASSERT(queue.IsEmpty());
- UNIT_ASSERT(!queue.Dequeue(result));
-}
+ int result = 0;
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
Y_UNIT_TEST(ShouldReturnEntries) {
- TOneOneQueue<int> queue;
- queue.Enqueue(1);
- queue.Enqueue(2);
- queue.Enqueue(3);
+ TOneOneQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
- int result = 0;
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 1);
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 1);
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 2);
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 2);
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 3);
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 3);
- UNIT_ASSERT(queue.IsEmpty());
- UNIT_ASSERT(!queue.Dequeue(result));
-}
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
Y_UNIT_TEST(ShouldStoreMultipleChunks) {
- TOneOneQueue<int, 100> queue;
- for (int i = 0; i < 1000; ++i) {
- queue.Enqueue(i);
+ TOneOneQueue<int, 100> queue;
+ for (int i = 0; i < 1000; ++i) {
+ queue.Enqueue(i);
}
- for (int i = 0; i < 1000; ++i) {
- int result = 0;
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, i);
+ for (int i = 0; i < 1000; ++i) {
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, i);
}
-}
-}
-;
+}
+}
+;
////////////////////////////////////////////////////////////////////////////////
@@ -61,35 +61,35 @@ Y_UNIT_TEST_SUITE(TManyOneQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
TManyOneQueue<int> queue;
-int result;
-UNIT_ASSERT(queue.IsEmpty());
-UNIT_ASSERT(!queue.Dequeue(result));
-}
+int result;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
Y_UNIT_TEST(ShouldReturnEntries) {
- TManyOneQueue<int> queue;
- queue.Enqueue(1);
- queue.Enqueue(2);
- queue.Enqueue(3);
-
- int result = 0;
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 1);
-
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 2);
-
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 3);
-
- UNIT_ASSERT(queue.IsEmpty());
- UNIT_ASSERT(!queue.Dequeue(result));
-}
-}
-;
+ TManyOneQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 1);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 2);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 3);
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
////////////////////////////////////////////////////////////////////////////////
@@ -97,35 +97,35 @@ Y_UNIT_TEST_SUITE(TManyManyQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
TManyManyQueue<int> queue;
-int result = 0;
-UNIT_ASSERT(queue.IsEmpty());
-UNIT_ASSERT(!queue.Dequeue(result));
-}
+int result = 0;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
Y_UNIT_TEST(ShouldReturnEntries) {
- TManyManyQueue<int> queue;
- queue.Enqueue(1);
- queue.Enqueue(2);
- queue.Enqueue(3);
-
- int result = 0;
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 1);
-
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 2);
-
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT_EQUAL(result, 3);
-
- UNIT_ASSERT(queue.IsEmpty());
- UNIT_ASSERT(!queue.Dequeue(result));
-}
-}
-;
+ TManyManyQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 1);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 2);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 3);
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
////////////////////////////////////////////////////////////////////////////////
@@ -133,37 +133,37 @@ Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
TRelaxedManyOneQueue<int> queue;
-int result;
-UNIT_ASSERT(queue.IsEmpty());
-UNIT_ASSERT(!queue.Dequeue(result));
-}
+int result;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
Y_UNIT_TEST(ShouldReturnEntries) {
- TSet<int> items = {1, 2, 3};
+ TSet<int> items = {1, 2, 3};
- TRelaxedManyOneQueue<int> queue;
- for (int item : items) {
- queue.Enqueue(item);
- }
+ TRelaxedManyOneQueue<int> queue;
+ for (int item : items) {
+ queue.Enqueue(item);
+ }
- int result = 0;
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT(items.erase(result));
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT(items.erase(result));
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT(items.erase(result));
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
- UNIT_ASSERT(queue.IsEmpty());
- UNIT_ASSERT(!queue.Dequeue(result));
-}
-}
-;
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
////////////////////////////////////////////////////////////////////////////////
@@ -171,35 +171,35 @@ Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
TRelaxedManyManyQueue<int> queue;
-int result = 0;
-UNIT_ASSERT(queue.IsEmpty());
-UNIT_ASSERT(!queue.Dequeue(result));
-}
+int result = 0;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
Y_UNIT_TEST(ShouldReturnEntries) {
- TSet<int> items = {1, 2, 3};
-
- TRelaxedManyManyQueue<int> queue;
- for (int item : items) {
- queue.Enqueue(item);
- }
-
- int result = 0;
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT(items.erase(result));
-
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT(items.erase(result));
-
- UNIT_ASSERT(!queue.IsEmpty());
- UNIT_ASSERT(queue.Dequeue(result));
- UNIT_ASSERT(items.erase(result));
-
- UNIT_ASSERT(queue.IsEmpty());
- UNIT_ASSERT(!queue.Dequeue(result));
-}
-}
-;
-}
+ TSet<int> items = {1, 2, 3};
+
+ TRelaxedManyManyQueue<int> queue;
+ for (int item : items) {
+ queue.Enqueue(item);
+ }
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
+}
diff --git a/library/cpp/threading/future/async.h b/library/cpp/threading/future/async.h
index f964d2dc88..8543fdd5c6 100644
--- a/library/cpp/threading/future/async.h
+++ b/library/cpp/threading/future/async.h
@@ -2,11 +2,11 @@
#include "future.h"
-#include <util/generic/function.h>
+#include <util/generic/function.h>
#include <util/thread/pool.h>
namespace NThreading {
- /**
+ /**
* @brief Asynchronously executes @arg func in @arg queue returning a future for the result.
*
* @arg func should be a callable object with signature T().
@@ -17,15 +17,15 @@ namespace NThreading {
* If you want to use another queue for execution just write an overload, @see ExtensionExample
* unittest.
*/
- template <typename Func>
+ template <typename Func>
TFuture<TFutureType<TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) {
- auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>();
+ auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>();
auto lambda = [promise, func = std::forward<Func>(func)]() mutable {
- NImpl::SetValue(promise, func);
+ NImpl::SetValue(promise, func);
};
queue.SafeAddFunc(std::move(lambda));
- return promise.GetFuture();
- }
+ return promise.GetFuture();
+ }
}
diff --git a/library/cpp/threading/future/async_ut.cpp b/library/cpp/threading/future/async_ut.cpp
index a452965dbc..a3699744e4 100644
--- a/library/cpp/threading/future/async_ut.cpp
+++ b/library/cpp/threading/future/async_ut.cpp
@@ -6,13 +6,13 @@
#include <util/generic/vector.h>
namespace {
- struct TMySuperTaskQueue {
- };
+ struct TMySuperTaskQueue {
+ };
}
namespace NThreading {
- /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace
+ /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace
* so that we can call it in the way
*
* TMySuperTaskQueue queue;
@@ -20,38 +20,38 @@ namespace NThreading {
*
* See also ExtensionExample unittest.
*/
- template <typename Func>
- TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) {
- return MakeFuture(func());
- }
+ template <typename Func>
+ TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) {
+ return MakeFuture(func());
+ }
}
Y_UNIT_TEST_SUITE(Async) {
Y_UNIT_TEST(ExtensionExample) {
- TMySuperTaskQueue queue;
- auto future = NThreading::Async([]() { return 5; }, queue);
- future.Wait();
- UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
- }
+ TMySuperTaskQueue queue;
+ auto future = NThreading::Async([]() { return 5; }, queue);
+ future.Wait();
+ UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
+ }
Y_UNIT_TEST(WorksWithIMtpQueue) {
auto queue = MakeHolder<TThreadPool>();
- queue->Start(1);
+ queue->Start(1);
- auto future = NThreading::Async([]() { return 5; }, *queue);
- future.Wait();
- UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
- }
+ auto future = NThreading::Async([]() { return 5; }, *queue);
+ future.Wait();
+ UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
+ }
Y_UNIT_TEST(ProperlyDeducesFutureType) {
- // Compileability test
+ // Compileability test
auto queue = CreateThreadPool(1);
- NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue);
- NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue);
- NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue);
- NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue);
- NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue);
- }
+ NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue);
+ NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue);
+ NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue);
+ NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue);
+ NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue);
+ }
}
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h
index a0e06c1891..5fd4296a93 100644
--- a/library/cpp/threading/future/core/future-inl.h
+++ b/library/cpp/threading/future/core/future-inl.h
@@ -2,92 +2,92 @@
#if !defined(INCLUDE_FUTURE_INL_H)
#error "you should never include future-inl.h directly"
-#endif // INCLUDE_FUTURE_INL_H
+#endif // INCLUDE_FUTURE_INL_H
namespace NThreading {
- namespace NImpl {
- ////////////////////////////////////////////////////////////////////////////////
+ namespace NImpl {
+ ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- using TCallback = std::function<void(const TFuture<T>&)>;
+ template <typename T>
+ using TCallback = std::function<void(const TFuture<T>&)>;
- template <typename T>
- using TCallbackList = TVector<TCallback<T>>; // TODO: small vector
+ template <typename T>
+ using TCallbackList = TVector<TCallback<T>>; // TODO: small vector
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
enum class TError {
Error
};
- template <typename T>
- class TFutureState: public TAtomicRefCount<TFutureState<T>> {
- enum {
- NotReady,
- ExceptionSet,
- ValueMoved, // keep the ordering of this and following values
- ValueSet,
- ValueRead,
- };
-
- private:
- mutable TAtomic State;
- TAdaptiveLock StateLock;
-
- TCallbackList<T> Callbacks;
+ template <typename T>
+ class TFutureState: public TAtomicRefCount<TFutureState<T>> {
+ enum {
+ NotReady,
+ ExceptionSet,
+ ValueMoved, // keep the ordering of this and following values
+ ValueSet,
+ ValueRead,
+ };
+
+ private:
+ mutable TAtomic State;
+ TAdaptiveLock StateLock;
+
+ TCallbackList<T> Callbacks;
mutable THolder<TSystemEvent> ReadyEvent;
- std::exception_ptr Exception;
-
- union {
- char NullValue;
- T Value;
- };
-
- void AccessValue(TDuration timeout, int acquireState) const {
- int state = AtomicGet(State);
- if (Y_UNLIKELY(state == NotReady)) {
- if (timeout == TDuration::Zero()) {
- ythrow TFutureException() << "value not set";
- }
-
- if (!Wait(timeout)) {
- ythrow TFutureException() << "wait timeout";
- }
-
- state = AtomicGet(State);
- }
-
+ std::exception_ptr Exception;
+
+ union {
+ char NullValue;
+ T Value;
+ };
+
+ void AccessValue(TDuration timeout, int acquireState) const {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state == NotReady)) {
+ if (timeout == TDuration::Zero()) {
+ ythrow TFutureException() << "value not set";
+ }
+
+ if (!Wait(timeout)) {
+ ythrow TFutureException() << "wait timeout";
+ }
+
+ state = AtomicGet(State);
+ }
+
TryRethrowWithState(state);
-
- switch (AtomicGetAndCas(&State, acquireState, ValueSet)) {
- case ValueSet:
- break;
- case ValueRead:
- if (acquireState != ValueRead) {
- ythrow TFutureException() << "value being read";
- }
- break;
- case ValueMoved:
- ythrow TFutureException() << "value was moved";
- default:
- Y_ASSERT(state == ValueSet);
- }
- }
-
- public:
- TFutureState()
- : State(NotReady)
- , NullValue(0)
- {
- }
-
- template <typename TT>
- TFutureState(TT&& value)
- : State(ValueSet)
- , Value(std::forward<TT>(value))
- {
- }
+
+ switch (AtomicGetAndCas(&State, acquireState, ValueSet)) {
+ case ValueSet:
+ break;
+ case ValueRead:
+ if (acquireState != ValueRead) {
+ ythrow TFutureException() << "value being read";
+ }
+ break;
+ case ValueMoved:
+ ythrow TFutureException() << "value was moved";
+ default:
+ Y_ASSERT(state == ValueSet);
+ }
+ }
+
+ public:
+ TFutureState()
+ : State(NotReady)
+ , NullValue(0)
+ {
+ }
+
+ template <typename TT>
+ TFutureState(TT&& value)
+ : State(ValueSet)
+ , Value(std::forward<TT>(value))
+ {
+ }
TFutureState(std::exception_ptr exception, TError)
: State(ExceptionSet)
@@ -96,14 +96,14 @@ namespace NThreading {
{
}
- ~TFutureState() {
- if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead
- Value.~T();
- }
- }
+ ~TFutureState() {
+ if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead
+ Value.~T();
+ }
+ }
- bool HasValue() const {
- return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead
+ bool HasValue() const {
+ return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead
}
void TryRethrow() const {
@@ -111,22 +111,22 @@ namespace NThreading {
TryRethrowWithState(state);
}
- bool HasException() const {
- return AtomicGet(State) == ExceptionSet;
- }
+ bool HasException() const {
+ return AtomicGet(State) == ExceptionSet;
+ }
- const T& GetValue(TDuration timeout = TDuration::Zero()) const {
- AccessValue(timeout, ValueRead);
- return Value;
- }
+ const T& GetValue(TDuration timeout = TDuration::Zero()) const {
+ AccessValue(timeout, ValueRead);
+ return Value;
+ }
- T ExtractValue(TDuration timeout = TDuration::Zero()) {
- AccessValue(timeout, ValueMoved);
- return std::move(Value);
- }
+ T ExtractValue(TDuration timeout = TDuration::Zero()) {
+ AccessValue(timeout, ValueMoved);
+ return std::move(Value);
+ }
- template <typename TT>
- void SetValue(TT&& value) {
+ template <typename TT>
+ void SetValue(TT&& value) {
bool success = TrySetValue(std::forward<TT>(value));
if (Y_UNLIKELY(!success)) {
ythrow TFutureException() << "value already set";
@@ -136,37 +136,37 @@ namespace NThreading {
template <typename TT>
bool TrySetValue(TT&& value) {
TSystemEvent* readyEvent = nullptr;
- TCallbackList<T> callbacks;
+ TCallbackList<T> callbacks;
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (Y_UNLIKELY(state != NotReady)) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
return false;
- }
+ }
+
+ new (&Value) T(std::forward<TT>(value));
- new (&Value) T(std::forward<TT>(value));
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
- readyEvent = ReadyEvent.Get();
- callbacks = std::move(Callbacks);
+ AtomicSet(State, ValueSet);
+ }
- AtomicSet(State, ValueSet);
- }
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
- if (readyEvent) {
- readyEvent->Signal();
- }
-
- if (callbacks) {
- TFuture<T> temp(this);
- for (auto& callback : callbacks) {
- callback(temp);
- }
- }
+ if (callbacks) {
+ TFuture<T> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
return true;
}
- void SetException(std::exception_ptr e) {
+ void SetException(std::exception_ptr e) {
bool success = TrySetException(std::move(e));
if (Y_UNLIKELY(!success)) {
ythrow TFutureException() << "value already set";
@@ -175,73 +175,73 @@ namespace NThreading {
bool TrySetException(std::exception_ptr e) {
TSystemEvent* readyEvent;
- TCallbackList<T> callbacks;
+ TCallbackList<T> callbacks;
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (Y_UNLIKELY(state != NotReady)) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
return false;
- }
-
- Exception = std::move(e);
-
- readyEvent = ReadyEvent.Get();
- callbacks = std::move(Callbacks);
-
- AtomicSet(State, ExceptionSet);
- }
-
- if (readyEvent) {
- readyEvent->Signal();
- }
-
- if (callbacks) {
- TFuture<T> temp(this);
- for (auto& callback : callbacks) {
- callback(temp);
- }
- }
+ }
+
+ Exception = std::move(e);
+
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
+
+ AtomicSet(State, ExceptionSet);
+ }
+
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
+
+ if (callbacks) {
+ TFuture<T> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
return true;
}
- template <typename F>
- bool Subscribe(F&& func) {
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (state == NotReady) {
- Callbacks.emplace_back(std::forward<F>(func));
- return true;
- }
- }
- return false;
- }
+ template <typename F>
+ bool Subscribe(F&& func) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state == NotReady) {
+ Callbacks.emplace_back(std::forward<F>(func));
+ return true;
+ }
+ }
+ return false;
+ }
- void Wait() const {
- Wait(TInstant::Max());
+ void Wait() const {
+ Wait(TInstant::Max());
}
- bool Wait(TDuration timeout) const {
- return Wait(timeout.ToDeadLine());
- }
+ bool Wait(TDuration timeout) const {
+ return Wait(timeout.ToDeadLine());
+ }
- bool Wait(TInstant deadline) const {
+ bool Wait(TInstant deadline) const {
TSystemEvent* readyEvent = nullptr;
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (state != NotReady) {
- return true;
- }
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state != NotReady) {
+ return true;
+ }
- if (!ReadyEvent) {
+ if (!ReadyEvent) {
ReadyEvent.Reset(new TSystemEvent());
- }
- readyEvent = ReadyEvent.Get();
- }
+ }
+ readyEvent = ReadyEvent.Get();
+ }
- Y_ASSERT(readyEvent);
- return readyEvent->WaitD(deadline);
+ Y_ASSERT(readyEvent);
+ return readyEvent->WaitD(deadline);
}
void TryRethrowWithState(int state) const {
@@ -250,31 +250,31 @@ namespace NThreading {
std::rethrow_exception(Exception);
}
}
- };
+ };
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <>
- class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> {
- enum {
- NotReady,
- ValueSet,
- ExceptionSet,
- };
+ template <>
+ class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> {
+ enum {
+ NotReady,
+ ValueSet,
+ ExceptionSet,
+ };
- private:
- TAtomic State;
- TAdaptiveLock StateLock;
+ private:
+ TAtomic State;
+ TAdaptiveLock StateLock;
- TCallbackList<void> Callbacks;
+ TCallbackList<void> Callbacks;
mutable THolder<TSystemEvent> ReadyEvent;
- std::exception_ptr Exception;
-
- public:
- TFutureState(bool valueSet = false)
- : State(valueSet ? ValueSet : NotReady)
- {
+ std::exception_ptr Exception;
+
+ public:
+ TFutureState(bool valueSet = false)
+ : State(valueSet ? ValueSet : NotReady)
+ {
}
TFutureState(std::exception_ptr exception, TError)
@@ -283,8 +283,8 @@ namespace NThreading {
{
}
- bool HasValue() const {
- return AtomicGet(State) == ValueSet;
+ bool HasValue() const {
+ return AtomicGet(State) == ValueSet;
}
void TryRethrow() const {
@@ -292,30 +292,30 @@ namespace NThreading {
TryRethrowWithState(state);
}
- bool HasException() const {
- return AtomicGet(State) == ExceptionSet;
- }
+ bool HasException() const {
+ return AtomicGet(State) == ExceptionSet;
+ }
- void GetValue(TDuration timeout = TDuration::Zero()) const {
- int state = AtomicGet(State);
- if (Y_UNLIKELY(state == NotReady)) {
- if (timeout == TDuration::Zero()) {
- ythrow TFutureException() << "value not set";
- }
+ void GetValue(TDuration timeout = TDuration::Zero()) const {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state == NotReady)) {
+ if (timeout == TDuration::Zero()) {
+ ythrow TFutureException() << "value not set";
+ }
- if (!Wait(timeout)) {
- ythrow TFutureException() << "wait timeout";
- }
+ if (!Wait(timeout)) {
+ ythrow TFutureException() << "wait timeout";
+ }
- state = AtomicGet(State);
- }
+ state = AtomicGet(State);
+ }
TryRethrowWithState(state);
- Y_ASSERT(state == ValueSet);
- }
+ Y_ASSERT(state == ValueSet);
+ }
- void SetValue() {
+ void SetValue() {
bool success = TrySetValue();
if (Y_UNLIKELY(!success)) {
ythrow TFutureException() << "value already set";
@@ -324,35 +324,35 @@ namespace NThreading {
bool TrySetValue() {
TSystemEvent* readyEvent = nullptr;
- TCallbackList<void> callbacks;
+ TCallbackList<void> callbacks;
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (Y_UNLIKELY(state != NotReady)) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
return false;
- }
+ }
- readyEvent = ReadyEvent.Get();
- callbacks = std::move(Callbacks);
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
- AtomicSet(State, ValueSet);
- }
+ AtomicSet(State, ValueSet);
+ }
- if (readyEvent) {
- readyEvent->Signal();
- }
-
- if (callbacks) {
- TFuture<void> temp(this);
- for (auto& callback : callbacks) {
- callback(temp);
- }
- }
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
+
+ if (callbacks) {
+ TFuture<void> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
return true;
}
- void SetException(std::exception_ptr e) {
+ void SetException(std::exception_ptr e) {
bool success = TrySetException(std::move(e));
if (Y_UNLIKELY(!success)) {
ythrow TFutureException() << "value already set";
@@ -361,73 +361,73 @@ namespace NThreading {
bool TrySetException(std::exception_ptr e) {
TSystemEvent* readyEvent = nullptr;
- TCallbackList<void> callbacks;
+ TCallbackList<void> callbacks;
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (Y_UNLIKELY(state != NotReady)) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
return false;
- }
+ }
- Exception = std::move(e);
+ Exception = std::move(e);
- readyEvent = ReadyEvent.Get();
- callbacks = std::move(Callbacks);
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
- AtomicSet(State, ExceptionSet);
- }
+ AtomicSet(State, ExceptionSet);
+ }
- if (readyEvent) {
- readyEvent->Signal();
- }
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
- if (callbacks) {
- TFuture<void> temp(this);
- for (auto& callback : callbacks) {
- callback(temp);
- }
- }
+ if (callbacks) {
+ TFuture<void> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
return true;
- }
+ }
- template <typename F>
- bool Subscribe(F&& func) {
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (state == NotReady) {
- Callbacks.emplace_back(std::forward<F>(func));
- return true;
- }
- }
- return false;
- }
+ template <typename F>
+ bool Subscribe(F&& func) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state == NotReady) {
+ Callbacks.emplace_back(std::forward<F>(func));
+ return true;
+ }
+ }
+ return false;
+ }
- void Wait() const {
- Wait(TInstant::Max());
+ void Wait() const {
+ Wait(TInstant::Max());
}
- bool Wait(TDuration timeout) const {
- return Wait(timeout.ToDeadLine());
- }
+ bool Wait(TDuration timeout) const {
+ return Wait(timeout.ToDeadLine());
+ }
- bool Wait(TInstant deadline) const {
+ bool Wait(TInstant deadline) const {
TSystemEvent* readyEvent = nullptr;
-
- with_lock (StateLock) {
- int state = AtomicGet(State);
- if (state != NotReady) {
- return true;
- }
-
- if (!ReadyEvent) {
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state != NotReady) {
+ return true;
+ }
+
+ if (!ReadyEvent) {
ReadyEvent.Reset(new TSystemEvent());
- }
- readyEvent = ReadyEvent.Get();
- }
-
- Y_ASSERT(readyEvent);
- return readyEvent->WaitD(deadline);
+ }
+ readyEvent = ReadyEvent.Get();
+ }
+
+ Y_ASSERT(readyEvent);
+ return readyEvent->WaitD(deadline);
}
void TryRethrowWithState(int state) const {
@@ -436,53 +436,53 @@ namespace NThreading {
std::rethrow_exception(Exception);
}
}
- };
+ };
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- inline void SetValueImpl(TPromise<T>& promise, const T& value) {
- promise.SetValue(value);
- }
+ template <typename T>
+ inline void SetValueImpl(TPromise<T>& promise, const T& value) {
+ promise.SetValue(value);
+ }
- template <typename T>
- inline void SetValueImpl(TPromise<T>& promise, T&& value) {
- promise.SetValue(std::move(value));
+ template <typename T>
+ inline void SetValueImpl(TPromise<T>& promise, T&& value) {
+ promise.SetValue(std::move(value));
}
- template <typename T>
+ template <typename T>
inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future,
std::enable_if_t<!std::is_void<T>::value, bool> = false) {
- future.Subscribe([=](const TFuture<T>& f) mutable {
+ future.Subscribe([=](const TFuture<T>& f) mutable {
T const* value;
- try {
+ try {
value = &f.GetValue();
- } catch (...) {
- promise.SetException(std::current_exception());
+ } catch (...) {
+ promise.SetException(std::current_exception());
return;
- }
+ }
promise.SetValue(*value);
- });
+ });
}
template <typename T>
inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) {
future.Subscribe([=](const TFuture<T>& f) mutable {
- try {
+ try {
f.TryRethrow();
- } catch (...) {
- promise.SetException(std::current_exception());
+ } catch (...) {
+ promise.SetException(std::current_exception());
return;
- }
+ }
promise.SetValue();
- });
- }
-
- template <typename T, typename F>
- inline void SetValue(TPromise<T>& promise, F&& func) {
- try {
- SetValueImpl(promise, func());
- } catch (...) {
+ });
+ }
+
+ template <typename T, typename F>
+ inline void SetValue(TPromise<T>& promise, F&& func) {
+ try {
+ SetValueImpl(promise, func());
+ } catch (...) {
const bool success = promise.TrySetException(std::current_exception());
if (Y_UNLIKELY(!success)) {
throw;
@@ -490,21 +490,21 @@ namespace NThreading {
}
}
- template <typename F>
- inline void SetValue(TPromise<void>& promise, F&& func,
- std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) {
- try {
- func();
- } catch (...) {
- promise.SetException(std::current_exception());
+ template <typename F>
+ inline void SetValue(TPromise<void>& promise, F&& func,
+ std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) {
+ try {
+ func();
+ } catch (...) {
+ promise.SetException(std::current_exception());
return;
}
promise.SetValue();
}
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
class TFutureStateId {
private:
@@ -532,45 +532,45 @@ namespace NThreading {
////////////////////////////////////////////////////////////////////////////////
- template <typename T>
+ template <typename T>
inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
- : State(state)
+ : State(state)
{
}
- template <typename T>
- inline void TFuture<T>::Swap(TFuture<T>& other) {
- State.Swap(other.State);
- }
+ template <typename T>
+ inline void TFuture<T>::Swap(TFuture<T>& other) {
+ State.Swap(other.State);
+ }
- template <typename T>
- inline bool TFuture<T>::HasValue() const {
- return State && State->HasValue();
- }
+ template <typename T>
+ inline bool TFuture<T>::HasValue() const {
+ return State && State->HasValue();
+ }
- template <typename T>
- inline const T& TFuture<T>::GetValue(TDuration timeout) const {
- EnsureInitialized();
- return State->GetValue(timeout);
+ template <typename T>
+ inline const T& TFuture<T>::GetValue(TDuration timeout) const {
+ EnsureInitialized();
+ return State->GetValue(timeout);
}
- template <typename T>
- inline T TFuture<T>::ExtractValue(TDuration timeout) {
- EnsureInitialized();
- return State->ExtractValue(timeout);
- }
+ template <typename T>
+ inline T TFuture<T>::ExtractValue(TDuration timeout) {
+ EnsureInitialized();
+ return State->ExtractValue(timeout);
+ }
- template <typename T>
- inline const T& TFuture<T>::GetValueSync() const {
- return GetValue(TDuration::Max());
- }
+ template <typename T>
+ inline const T& TFuture<T>::GetValueSync() const {
+ return GetValue(TDuration::Max());
+ }
- template <typename T>
- inline T TFuture<T>::ExtractValueSync() {
- return ExtractValue(TDuration::Max());
- }
+ template <typename T>
+ inline T TFuture<T>::ExtractValueSync() {
+ return ExtractValue(TDuration::Max());
+ }
- template <typename T>
+ template <typename T>
inline void TFuture<T>::TryRethrow() const {
if (State) {
State->TryRethrow();
@@ -578,40 +578,40 @@ namespace NThreading {
}
template <typename T>
- inline bool TFuture<T>::HasException() const {
- return State && State->HasException();
- }
-
- template <typename T>
- inline void TFuture<T>::Wait() const {
- EnsureInitialized();
- return State->Wait();
- }
-
- template <typename T>
- inline bool TFuture<T>::Wait(TDuration timeout) const {
- EnsureInitialized();
- return State->Wait(timeout);
- }
-
- template <typename T>
- inline bool TFuture<T>::Wait(TInstant deadline) const {
- EnsureInitialized();
- return State->Wait(deadline);
- }
-
- template <typename T>
- template <typename F>
- inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const {
- EnsureInitialized();
- if (!State->Subscribe(std::forward<F>(func))) {
- func(*this);
- }
- return *this;
- }
-
- template <typename T>
- template <typename F>
+ inline bool TFuture<T>::HasException() const {
+ return State && State->HasException();
+ }
+
+ template <typename T>
+ inline void TFuture<T>::Wait() const {
+ EnsureInitialized();
+ return State->Wait();
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::Wait(TDuration timeout) const {
+ EnsureInitialized();
+ return State->Wait(timeout);
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::Wait(TInstant deadline) const {
+ EnsureInitialized();
+ return State->Wait(deadline);
+ }
+
+ template <typename T>
+ template <typename F>
+ inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const {
+ EnsureInitialized();
+ if (!State->Subscribe(std::forward<F>(func))) {
+ func(*this);
+ }
+ return *this;
+ }
+
+ template <typename T>
+ template <typename F>
inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept {
return Subscribe(std::forward<F>(func));
}
@@ -623,59 +623,59 @@ namespace NThreading {
auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>();
Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable {
NImpl::SetValue(promise, [&]() { return func(future); });
- });
- return promise;
- }
-
- template <typename T>
- inline TFuture<void> TFuture<T>::IgnoreResult() const {
- auto promise = NewPromise();
- Subscribe([=](const TFuture<T>& future) mutable {
+ });
+ return promise;
+ }
+
+ template <typename T>
+ inline TFuture<void> TFuture<T>::IgnoreResult() const {
+ auto promise = NewPromise();
+ Subscribe([=](const TFuture<T>& future) mutable {
NImpl::SetValueImpl(promise, future);
- });
- return promise;
- }
+ });
+ return promise;
+ }
- template <typename T>
- inline bool TFuture<T>::Initialized() const {
- return bool(State);
+ template <typename T>
+ inline bool TFuture<T>::Initialized() const {
+ return bool(State);
}
- template <typename T>
+ template <typename T>
inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept {
return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
}
template <typename T>
- inline void TFuture<T>::EnsureInitialized() const {
- if (!State) {
- ythrow TFutureException() << "state not initialized";
+ inline void TFuture<T>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
}
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
- : State(state)
- {
- }
+ : State(state)
+ {
+ }
- inline void TFuture<void>::Swap(TFuture<void>& other) {
- State.Swap(other.State);
- }
+ inline void TFuture<void>::Swap(TFuture<void>& other) {
+ State.Swap(other.State);
+ }
- inline bool TFuture<void>::HasValue() const {
- return State && State->HasValue();
- }
+ inline bool TFuture<void>::HasValue() const {
+ return State && State->HasValue();
+ }
- inline void TFuture<void>::GetValue(TDuration timeout) const {
- EnsureInitialized();
- State->GetValue(timeout);
- }
+ inline void TFuture<void>::GetValue(TDuration timeout) const {
+ EnsureInitialized();
+ State->GetValue(timeout);
+ }
- inline void TFuture<void>::GetValueSync() const {
- GetValue(TDuration::Max());
- }
+ inline void TFuture<void>::GetValueSync() const {
+ GetValue(TDuration::Max());
+ }
inline void TFuture<void>::TryRethrow() const {
if (State) {
@@ -683,35 +683,35 @@ namespace NThreading {
}
}
- inline bool TFuture<void>::HasException() const {
- return State && State->HasException();
- }
+ inline bool TFuture<void>::HasException() const {
+ return State && State->HasException();
+ }
- inline void TFuture<void>::Wait() const {
- EnsureInitialized();
- return State->Wait();
- }
+ inline void TFuture<void>::Wait() const {
+ EnsureInitialized();
+ return State->Wait();
+ }
- inline bool TFuture<void>::Wait(TDuration timeout) const {
- EnsureInitialized();
- return State->Wait(timeout);
- }
+ inline bool TFuture<void>::Wait(TDuration timeout) const {
+ EnsureInitialized();
+ return State->Wait(timeout);
+ }
- inline bool TFuture<void>::Wait(TInstant deadline) const {
- EnsureInitialized();
- return State->Wait(deadline);
- }
+ inline bool TFuture<void>::Wait(TInstant deadline) const {
+ EnsureInitialized();
+ return State->Wait(deadline);
+ }
- template <typename F>
- inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const {
- EnsureInitialized();
- if (!State->Subscribe(std::forward<F>(func))) {
- func(*this);
- }
- return *this;
- }
+ template <typename F>
+ inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const {
+ EnsureInitialized();
+ if (!State->Subscribe(std::forward<F>(func))) {
+ func(*this);
+ }
+ return *this;
+ }
- template <typename F>
+ template <typename F>
inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept {
return Subscribe(std::forward<F>(func));
}
@@ -722,82 +722,82 @@ namespace NThreading {
auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>();
Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable {
NImpl::SetValue(promise, [&]() { return func(future); });
- });
- return promise;
- }
-
- template <typename R>
- inline TFuture<R> TFuture<void>::Return(const R& value) const {
- auto promise = NewPromise<R>();
- Subscribe([=](const TFuture<void>& future) mutable {
- try {
+ });
+ return promise;
+ }
+
+ template <typename R>
+ inline TFuture<R> TFuture<void>::Return(const R& value) const {
+ auto promise = NewPromise<R>();
+ Subscribe([=](const TFuture<void>& future) mutable {
+ try {
future.TryRethrow();
- } catch (...) {
- promise.SetException(std::current_exception());
+ } catch (...) {
+ promise.SetException(std::current_exception());
return;
- }
+ }
promise.SetValue(value);
- });
- return promise;
+ });
+ return promise;
}
- inline bool TFuture<void>::Initialized() const {
- return bool(State);
- }
+ inline bool TFuture<void>::Initialized() const {
+ return bool(State);
+ }
inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept {
return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
}
- inline void TFuture<void>::EnsureInitialized() const {
- if (!State) {
- ythrow TFutureException() << "state not initialized";
+ inline void TFuture<void>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
}
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
+ template <typename T>
inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept
- : State(state)
- {
- }
-
- template <typename T>
- inline void TPromise<T>::Swap(TPromise<T>& other) {
- State.Swap(other.State);
- }
-
- template <typename T>
- inline const T& TPromise<T>::GetValue() const {
- EnsureInitialized();
- return State->GetValue();
- }
-
- template <typename T>
- inline T TPromise<T>::ExtractValue() {
- EnsureInitialized();
- return State->ExtractValue();
- }
-
- template <typename T>
- inline bool TPromise<T>::HasValue() const {
- return State && State->HasValue();
- }
-
- template <typename T>
- inline void TPromise<T>::SetValue(const T& value) {
- EnsureInitialized();
- State->SetValue(value);
- }
-
- template <typename T>
- inline void TPromise<T>::SetValue(T&& value) {
- EnsureInitialized();
- State->SetValue(std::move(value));
- }
-
- template <typename T>
+ : State(state)
+ {
+ }
+
+ template <typename T>
+ inline void TPromise<T>::Swap(TPromise<T>& other) {
+ State.Swap(other.State);
+ }
+
+ template <typename T>
+ inline const T& TPromise<T>::GetValue() const {
+ EnsureInitialized();
+ return State->GetValue();
+ }
+
+ template <typename T>
+ inline T TPromise<T>::ExtractValue() {
+ EnsureInitialized();
+ return State->ExtractValue();
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::HasValue() const {
+ return State && State->HasValue();
+ }
+
+ template <typename T>
+ inline void TPromise<T>::SetValue(const T& value) {
+ EnsureInitialized();
+ State->SetValue(value);
+ }
+
+ template <typename T>
+ inline void TPromise<T>::SetValue(T&& value) {
+ EnsureInitialized();
+ State->SetValue(std::move(value));
+ }
+
+ template <typename T>
inline bool TPromise<T>::TrySetValue(const T& value) {
EnsureInitialized();
return State->TrySetValue(value);
@@ -817,75 +817,75 @@ namespace NThreading {
}
template <typename T>
- inline bool TPromise<T>::HasException() const {
- return State && State->HasException();
- }
+ inline bool TPromise<T>::HasException() const {
+ return State && State->HasException();
+ }
- template <typename T>
- inline void TPromise<T>::SetException(const TString& e) {
- EnsureInitialized();
- State->SetException(std::make_exception_ptr(yexception() << e));
- }
+ template <typename T>
+ inline void TPromise<T>::SetException(const TString& e) {
+ EnsureInitialized();
+ State->SetException(std::make_exception_ptr(yexception() << e));
+ }
- template <typename T>
- inline void TPromise<T>::SetException(std::exception_ptr e) {
- EnsureInitialized();
- State->SetException(std::move(e));
- }
+ template <typename T>
+ inline void TPromise<T>::SetException(std::exception_ptr e) {
+ EnsureInitialized();
+ State->SetException(std::move(e));
+ }
- template <typename T>
+ template <typename T>
inline bool TPromise<T>::TrySetException(std::exception_ptr e) {
EnsureInitialized();
return State->TrySetException(std::move(e));
}
template <typename T>
- inline TFuture<T> TPromise<T>::GetFuture() const {
- EnsureInitialized();
- return TFuture<T>(State);
- }
+ inline TFuture<T> TPromise<T>::GetFuture() const {
+ EnsureInitialized();
+ return TFuture<T>(State);
+ }
- template <typename T>
- inline TPromise<T>::operator TFuture<T>() const {
- return GetFuture();
- }
+ template <typename T>
+ inline TPromise<T>::operator TFuture<T>() const {
+ return GetFuture();
+ }
- template <typename T>
- inline bool TPromise<T>::Initialized() const {
- return bool(State);
- }
+ template <typename T>
+ inline bool TPromise<T>::Initialized() const {
+ return bool(State);
+ }
- template <typename T>
- inline void TPromise<T>::EnsureInitialized() const {
- if (!State) {
- ythrow TFutureException() << "state not initialized";
- }
- }
+ template <typename T>
+ inline void TPromise<T>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
+ }
+ }
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept
- : State(state)
- {
- }
+ : State(state)
+ {
+ }
- inline void TPromise<void>::Swap(TPromise<void>& other) {
- State.Swap(other.State);
- }
+ inline void TPromise<void>::Swap(TPromise<void>& other) {
+ State.Swap(other.State);
+ }
- inline void TPromise<void>::GetValue() const {
- EnsureInitialized();
- State->GetValue();
- }
+ inline void TPromise<void>::GetValue() const {
+ EnsureInitialized();
+ State->GetValue();
+ }
- inline bool TPromise<void>::HasValue() const {
- return State && State->HasValue();
- }
+ inline bool TPromise<void>::HasValue() const {
+ return State && State->HasValue();
+ }
- inline void TPromise<void>::SetValue() {
- EnsureInitialized();
- State->SetValue();
- }
+ inline void TPromise<void>::SetValue() {
+ EnsureInitialized();
+ State->SetValue();
+ }
inline bool TPromise<void>::TrySetValue() {
EnsureInitialized();
@@ -898,78 +898,78 @@ namespace NThreading {
}
}
- inline bool TPromise<void>::HasException() const {
- return State && State->HasException();
- }
+ inline bool TPromise<void>::HasException() const {
+ return State && State->HasException();
+ }
- inline void TPromise<void>::SetException(const TString& e) {
- EnsureInitialized();
- State->SetException(std::make_exception_ptr(yexception() << e));
- }
+ inline void TPromise<void>::SetException(const TString& e) {
+ EnsureInitialized();
+ State->SetException(std::make_exception_ptr(yexception() << e));
+ }
- inline void TPromise<void>::SetException(std::exception_ptr e) {
- EnsureInitialized();
- State->SetException(std::move(e));
- }
+ inline void TPromise<void>::SetException(std::exception_ptr e) {
+ EnsureInitialized();
+ State->SetException(std::move(e));
+ }
inline bool TPromise<void>::TrySetException(std::exception_ptr e) {
EnsureInitialized();
return State->TrySetException(std::move(e));
}
- inline TFuture<void> TPromise<void>::GetFuture() const {
- EnsureInitialized();
- return TFuture<void>(State);
- }
+ inline TFuture<void> TPromise<void>::GetFuture() const {
+ EnsureInitialized();
+ return TFuture<void>(State);
+ }
- inline TPromise<void>::operator TFuture<void>() const {
- return GetFuture();
- }
+ inline TPromise<void>::operator TFuture<void>() const {
+ return GetFuture();
+ }
- inline bool TPromise<void>::Initialized() const {
- return bool(State);
- }
+ inline bool TPromise<void>::Initialized() const {
+ return bool(State);
+ }
- inline void TPromise<void>::EnsureInitialized() const {
- if (!State) {
- ythrow TFutureException() << "state not initialized";
- }
- }
+ inline void TPromise<void>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
+ }
+ }
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- inline TPromise<T> NewPromise() {
- return {new NImpl::TFutureState<T>()};
+ template <typename T>
+ inline TPromise<T> NewPromise() {
+ return {new NImpl::TFutureState<T>()};
}
- inline TPromise<void> NewPromise() {
- return {new NImpl::TFutureState<void>()};
- }
+ inline TPromise<void> NewPromise() {
+ return {new NImpl::TFutureState<void>()};
+ }
- template <typename T>
- inline TFuture<T> MakeFuture(const T& value) {
- return {new NImpl::TFutureState<T>(value)};
- }
+ template <typename T>
+ inline TFuture<T> MakeFuture(const T& value) {
+ return {new NImpl::TFutureState<T>(value)};
+ }
- template <typename T>
- inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) {
- return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))};
- }
+ template <typename T>
+ inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) {
+ return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))};
+ }
- template <typename T>
- inline TFuture<T> MakeFuture() {
- struct TCache {
- TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())};
+ template <typename T>
+ inline TFuture<T> MakeFuture() {
+ struct TCache {
+ TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())};
TCache() {
// Immediately advance state from ValueSet to ValueRead.
// This should prevent corrupting shared value with an ExtractValue() call.
Y_UNUSED(Instance.GetValue());
}
- };
- return Singleton<TCache>()->Instance;
- }
+ };
+ return Singleton<TCache>()->Instance;
+ }
template <typename T>
inline TFuture<T> MakeErrorFuture(std::exception_ptr exception)
@@ -977,10 +977,10 @@ namespace NThreading {
return {new NImpl::TFutureState<T>(std::move(exception), NImpl::TError::Error)};
}
- inline TFuture<void> MakeFuture() {
- struct TCache {
- TFuture<void> Instance{new NImpl::TFutureState<void>(true)};
- };
- return Singleton<TCache>()->Instance;
- }
+ inline TFuture<void> MakeFuture() {
+ struct TCache {
+ TFuture<void> Instance{new NImpl::TFutureState<void>(true)};
+ };
+ return Singleton<TCache>()->Instance;
+ }
}
diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h
index 12623389ca..2e82bb953e 100644
--- a/library/cpp/threading/future/core/future.h
+++ b/library/cpp/threading/future/core/future.h
@@ -12,51 +12,51 @@
#include <util/system/spinlock.h>
namespace NThreading {
- ////////////////////////////////////////////////////////////////////////////////
-
- struct TFutureException: public yexception {};
-
- // creates unset promise
- template <typename T>
- TPromise<T> NewPromise();
- TPromise<void> NewPromise();
-
- // creates preset future
- template <typename T>
- TFuture<T> MakeFuture(const T& value);
- template <typename T>
- TFuture<std::remove_reference_t<T>> MakeFuture(T&& value);
- template <typename T>
- TFuture<T> MakeFuture();
+ ////////////////////////////////////////////////////////////////////////////////
+
+ struct TFutureException: public yexception {};
+
+ // creates unset promise
+ template <typename T>
+ TPromise<T> NewPromise();
+ TPromise<void> NewPromise();
+
+ // creates preset future
+ template <typename T>
+ TFuture<T> MakeFuture(const T& value);
+ template <typename T>
+ TFuture<std::remove_reference_t<T>> MakeFuture(T&& value);
+ template <typename T>
+ TFuture<T> MakeFuture();
template <typename T>
TFuture<T> MakeErrorFuture(std::exception_ptr exception);
- TFuture<void> MakeFuture();
+ TFuture<void> MakeFuture();
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- namespace NImpl {
- template <typename T>
- class TFutureState;
+ namespace NImpl {
+ template <typename T>
+ class TFutureState;
- template <typename T>
- struct TFutureType {
- using TType = T;
- };
+ template <typename T>
+ struct TFutureType {
+ using TType = T;
+ };
- template <typename T>
- struct TFutureType<TFuture<T>> {
- using TType = typename TFutureType<T>::TType;
- };
+ template <typename T>
+ struct TFutureType<TFuture<T>> {
+ using TType = typename TFutureType<T>::TType;
+ };
template <typename F, typename T>
struct TFutureCallResult {
// NOTE: separate class for msvc compatibility
using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>()));
};
- }
+ }
- template <typename F>
- using TFutureType = typename NImpl::TFutureType<F>::TType;
+ template <typename F>
+ using TFutureType = typename NImpl::TFutureType<F>::TType;
template <typename F, typename T>
using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType;
@@ -64,16 +64,16 @@ namespace NThreading {
//! Type of the future/promise state identifier
class TFutureStateId;
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- class TFuture {
- using TFutureState = NImpl::TFutureState<T>;
+ template <typename T>
+ class TFuture {
+ using TFutureState = NImpl::TFutureState<T>;
- private:
- TIntrusivePtr<TFutureState> State;
+ private:
+ TIntrusivePtr<TFutureState> State;
- public:
+ public:
using value_type = T;
TFuture() noexcept = default;
@@ -83,54 +83,54 @@ namespace NThreading {
TFuture<T>& operator=(const TFuture<T>& other) noexcept = default;
TFuture<T>& operator=(TFuture<T>&& other) noexcept = default;
- void Swap(TFuture<T>& other);
+ void Swap(TFuture<T>& other);
- bool Initialized() const;
+ bool Initialized() const;
- bool HasValue() const;
- const T& GetValue(TDuration timeout = TDuration::Zero()) const;
- const T& GetValueSync() const;
- T ExtractValue(TDuration timeout = TDuration::Zero());
- T ExtractValueSync();
+ bool HasValue() const;
+ const T& GetValue(TDuration timeout = TDuration::Zero()) const;
+ const T& GetValueSync() const;
+ T ExtractValue(TDuration timeout = TDuration::Zero());
+ T ExtractValueSync();
void TryRethrow() const;
- bool HasException() const;
+ bool HasException() const;
- void Wait() const;
- bool Wait(TDuration timeout) const;
- bool Wait(TInstant deadline) const;
+ void Wait() const;
+ bool Wait(TDuration timeout) const;
+ bool Wait(TInstant deadline) const;
- template <typename F>
- const TFuture<T>& Subscribe(F&& callback) const;
+ template <typename F>
+ const TFuture<T>& Subscribe(F&& callback) const;
// precondition: EnsureInitialized() passes
// postcondition: std::terminate is highly unlikely
- template <typename F>
+ template <typename F>
const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept;
template <typename F>
TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const;
- TFuture<void> IgnoreResult() const;
+ TFuture<void> IgnoreResult() const;
//! If the future is initialized returns the future state identifier. Otherwise returns an empty optional
/** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death
**/
TMaybe<TFutureStateId> StateId() const noexcept;
- void EnsureInitialized() const;
- };
+ void EnsureInitialized() const;
+ };
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <>
- class TFuture<void> {
- using TFutureState = NImpl::TFutureState<void>;
+ template <>
+ class TFuture<void> {
+ using TFutureState = NImpl::TFutureState<void>;
- private:
+ private:
TIntrusivePtr<TFutureState> State = nullptr;
- public:
+ public:
using value_type = void;
TFuture() noexcept = default;
@@ -140,34 +140,34 @@ namespace NThreading {
TFuture<void>& operator=(const TFuture<void>& other) noexcept = default;
TFuture<void>& operator=(TFuture<void>&& other) noexcept = default;
- void Swap(TFuture<void>& other);
+ void Swap(TFuture<void>& other);
- bool Initialized() const;
+ bool Initialized() const;
- bool HasValue() const;
- void GetValue(TDuration timeout = TDuration::Zero()) const;
- void GetValueSync() const;
+ bool HasValue() const;
+ void GetValue(TDuration timeout = TDuration::Zero()) const;
+ void GetValueSync() const;
void TryRethrow() const;
- bool HasException() const;
+ bool HasException() const;
- void Wait() const;
- bool Wait(TDuration timeout) const;
- bool Wait(TInstant deadline) const;
+ void Wait() const;
+ bool Wait(TDuration timeout) const;
+ bool Wait(TInstant deadline) const;
- template <typename F>
- const TFuture<void>& Subscribe(F&& callback) const;
+ template <typename F>
+ const TFuture<void>& Subscribe(F&& callback) const;
// precondition: EnsureInitialized() passes
// postcondition: std::terminate is highly unlikely
- template <typename F>
+ template <typename F>
const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept;
template <typename F>
TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const;
- template <typename R>
- TFuture<R> Return(const R& value) const;
+ template <typename R>
+ TFuture<R> Return(const R& value) const;
TFuture<void> IgnoreResult() const {
return *this;
@@ -178,19 +178,19 @@ namespace NThreading {
**/
TMaybe<TFutureStateId> StateId() const noexcept;
- void EnsureInitialized() const;
- };
+ void EnsureInitialized() const;
+ };
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- class TPromise {
- using TFutureState = NImpl::TFutureState<T>;
+ template <typename T>
+ class TPromise {
+ using TFutureState = NImpl::TFutureState<T>;
- private:
+ private:
TIntrusivePtr<TFutureState> State = nullptr;
- public:
+ public:
TPromise() noexcept = default;
TPromise(const TPromise<T>& other) noexcept = default;
TPromise(TPromise<T>&& other) noexcept = default;
@@ -198,43 +198,43 @@ namespace NThreading {
TPromise<T>& operator=(const TPromise<T>& other) noexcept = default;
TPromise<T>& operator=(TPromise<T>&& other) noexcept = default;
- void Swap(TPromise<T>& other);
+ void Swap(TPromise<T>& other);
- bool Initialized() const;
+ bool Initialized() const;
- bool HasValue() const;
- const T& GetValue() const;
- T ExtractValue();
+ bool HasValue() const;
+ const T& GetValue() const;
+ T ExtractValue();
- void SetValue(const T& value);
- void SetValue(T&& value);
+ void SetValue(const T& value);
+ void SetValue(T&& value);
bool TrySetValue(const T& value);
bool TrySetValue(T&& value);
void TryRethrow() const;
- bool HasException() const;
- void SetException(const TString& e);
- void SetException(std::exception_ptr e);
+ bool HasException() const;
+ void SetException(const TString& e);
+ void SetException(std::exception_ptr e);
bool TrySetException(std::exception_ptr e);
- TFuture<T> GetFuture() const;
- operator TFuture<T>() const;
+ TFuture<T> GetFuture() const;
+ operator TFuture<T>() const;
- private:
- void EnsureInitialized() const;
- };
+ private:
+ void EnsureInitialized() const;
+ };
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- template <>
- class TPromise<void> {
- using TFutureState = NImpl::TFutureState<void>;
+ template <>
+ class TPromise<void> {
+ using TFutureState = NImpl::TFutureState<void>;
- private:
- TIntrusivePtr<TFutureState> State;
+ private:
+ TIntrusivePtr<TFutureState> State;
- public:
+ public:
TPromise() noexcept = default;
TPromise(const TPromise<void>& other) noexcept = default;
TPromise(TPromise<void>&& other) noexcept = default;
@@ -242,30 +242,30 @@ namespace NThreading {
TPromise<void>& operator=(const TPromise<void>& other) noexcept = default;
TPromise<void>& operator=(TPromise<void>&& other) noexcept = default;
- void Swap(TPromise<void>& other);
+ void Swap(TPromise<void>& other);
- bool Initialized() const;
+ bool Initialized() const;
- bool HasValue() const;
- void GetValue() const;
+ bool HasValue() const;
+ void GetValue() const;
- void SetValue();
+ void SetValue();
bool TrySetValue();
void TryRethrow() const;
- bool HasException() const;
- void SetException(const TString& e);
- void SetException(std::exception_ptr e);
+ bool HasException() const;
+ void SetException(const TString& e);
+ void SetException(std::exception_ptr e);
bool TrySetException(std::exception_ptr e);
- TFuture<void> GetFuture() const;
- operator TFuture<void>() const;
+ TFuture<void> GetFuture() const;
+ operator TFuture<void>() const;
- private:
- void EnsureInitialized() const;
- };
+ private:
+ void EnsureInitialized() const;
+ };
-}
+}
#define INCLUDE_FUTURE_INL_H
#include "future-inl.h"
diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp
index 636b113f2f..05950a568d 100644
--- a/library/cpp/threading/future/future_ut.cpp
+++ b/library/cpp/threading/future/future_ut.cpp
@@ -62,180 +62,180 @@ namespace {
}
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
Y_UNIT_TEST_SUITE(TFutureTest) {
Y_UNIT_TEST(ShouldInitiallyHasNoValue) {
- TPromise<int> promise;
- UNIT_ASSERT(!promise.HasValue());
+ TPromise<int> promise;
+ UNIT_ASSERT(!promise.HasValue());
- promise = NewPromise<int>();
- UNIT_ASSERT(!promise.HasValue());
+ promise = NewPromise<int>();
+ UNIT_ASSERT(!promise.HasValue());
- TFuture<int> future;
- UNIT_ASSERT(!future.HasValue());
+ TFuture<int> future;
+ UNIT_ASSERT(!future.HasValue());
- future = promise.GetFuture();
- UNIT_ASSERT(!future.HasValue());
- }
+ future = promise.GetFuture();
+ UNIT_ASSERT(!future.HasValue());
+ }
Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) {
- TPromise<void> promise;
- UNIT_ASSERT(!promise.HasValue());
+ TPromise<void> promise;
+ UNIT_ASSERT(!promise.HasValue());
- promise = NewPromise();
- UNIT_ASSERT(!promise.HasValue());
+ promise = NewPromise();
+ UNIT_ASSERT(!promise.HasValue());
- TFuture<void> future;
- UNIT_ASSERT(!future.HasValue());
+ TFuture<void> future;
+ UNIT_ASSERT(!future.HasValue());
- future = promise.GetFuture();
- UNIT_ASSERT(!future.HasValue());
- }
+ future = promise.GetFuture();
+ UNIT_ASSERT(!future.HasValue());
+ }
Y_UNIT_TEST(ShouldStoreValue) {
- TPromise<int> promise = NewPromise<int>();
- promise.SetValue(123);
- UNIT_ASSERT(promise.HasValue());
- UNIT_ASSERT_EQUAL(promise.GetValue(), 123);
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.GetValue(), 123);
- TFuture<int> future = promise.GetFuture();
- UNIT_ASSERT(future.HasValue());
- UNIT_ASSERT_EQUAL(future.GetValue(), 123);
+ TFuture<int> future = promise.GetFuture();
+ UNIT_ASSERT(future.HasValue());
+ UNIT_ASSERT_EQUAL(future.GetValue(), 123);
- future = MakeFuture(345);
- UNIT_ASSERT(future.HasValue());
- UNIT_ASSERT_EQUAL(future.GetValue(), 345);
- }
+ future = MakeFuture(345);
+ UNIT_ASSERT(future.HasValue());
+ UNIT_ASSERT_EQUAL(future.GetValue(), 345);
+ }
Y_UNIT_TEST(ShouldStoreValueVoid) {
- TPromise<void> promise = NewPromise();
- promise.SetValue();
- UNIT_ASSERT(promise.HasValue());
+ TPromise<void> promise = NewPromise();
+ promise.SetValue();
+ UNIT_ASSERT(promise.HasValue());
- TFuture<void> future = promise.GetFuture();
- UNIT_ASSERT(future.HasValue());
+ TFuture<void> future = promise.GetFuture();
+ UNIT_ASSERT(future.HasValue());
- future = MakeFuture();
- UNIT_ASSERT(future.HasValue());
- }
+ future = MakeFuture();
+ UNIT_ASSERT(future.HasValue());
+ }
- struct TTestCallback {
- int Value;
+ struct TTestCallback {
+ int Value;
- TTestCallback(int value)
- : Value(value)
- {
- }
+ TTestCallback(int value)
+ : Value(value)
+ {
+ }
- void Callback(const TFuture<int>& future) {
- Value += future.GetValue();
- }
+ void Callback(const TFuture<int>& future) {
+ Value += future.GetValue();
+ }
- int Func(const TFuture<int>& future) {
- return (Value += future.GetValue());
- }
+ int Func(const TFuture<int>& future) {
+ return (Value += future.GetValue());
+ }
- void VoidFunc(const TFuture<int>& future) {
- future.GetValue();
- }
+ void VoidFunc(const TFuture<int>& future) {
+ future.GetValue();
+ }
- TFuture<int> FutureFunc(const TFuture<int>& future) {
- return MakeFuture(Value += future.GetValue());
- }
+ TFuture<int> FutureFunc(const TFuture<int>& future) {
+ return MakeFuture(Value += future.GetValue());
+ }
- TPromise<void> Signal = NewPromise();
- TFuture<void> FutureVoidFunc(const TFuture<int>& future) {
- future.GetValue();
- return Signal;
- }
- };
+ TPromise<void> Signal = NewPromise();
+ TFuture<void> FutureVoidFunc(const TFuture<int>& future) {
+ future.GetValue();
+ return Signal;
+ }
+ };
Y_UNIT_TEST(ShouldInvokeCallback) {
- TPromise<int> promise = NewPromise<int>();
+ TPromise<int> promise = NewPromise<int>();
- TTestCallback callback(123);
- TFuture<int> future = promise.GetFuture()
- .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); });
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture()
+ .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); });
- promise.SetValue(456);
- UNIT_ASSERT_EQUAL(future.GetValue(), 456);
- UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
- }
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 456);
+ UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
+ }
Y_UNIT_TEST(ShouldApplyFunc) {
- TPromise<int> promise = NewPromise<int>();
+ TPromise<int> promise = NewPromise<int>();
- TTestCallback callback(123);
- TFuture<int> future = promise.GetFuture()
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture()
.Apply([&](const auto& theFuture) { return callback.Func(theFuture); });
- promise.SetValue(456);
- UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);
- UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
- }
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);
+ UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
+ }
Y_UNIT_TEST(ShouldApplyVoidFunc) {
- TPromise<int> promise = NewPromise<int>();
+ TPromise<int> promise = NewPromise<int>();
- TTestCallback callback(123);
- TFuture<void> future = promise.GetFuture()
+ TTestCallback callback(123);
+ TFuture<void> future = promise.GetFuture()
.Apply([&](const auto& theFuture) { return callback.VoidFunc(theFuture); });
- promise.SetValue(456);
- UNIT_ASSERT(future.HasValue());
- }
+ promise.SetValue(456);
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldApplyFutureFunc) {
- TPromise<int> promise = NewPromise<int>();
+ TPromise<int> promise = NewPromise<int>();
- TTestCallback callback(123);
- TFuture<int> future = promise.GetFuture()
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture()
.Apply([&](const auto& theFuture) { return callback.FutureFunc(theFuture); });
- promise.SetValue(456);
- UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);
- UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
- }
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);
+ UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
+ }
Y_UNIT_TEST(ShouldApplyFutureVoidFunc) {
- TPromise<int> promise = NewPromise<int>();
+ TPromise<int> promise = NewPromise<int>();
- TTestCallback callback(123);
- TFuture<void> future = promise.GetFuture()
+ TTestCallback callback(123);
+ TFuture<void> future = promise.GetFuture()
.Apply([&](const auto& theFuture) { return callback.FutureVoidFunc(theFuture); });
- promise.SetValue(456);
- UNIT_ASSERT(!future.HasValue());
+ promise.SetValue(456);
+ UNIT_ASSERT(!future.HasValue());
- callback.Signal.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ callback.Signal.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldIgnoreResultIfAsked) {
- TPromise<int> promise = NewPromise<int>();
+ TPromise<int> promise = NewPromise<int>();
- TTestCallback callback(123);
- TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42);
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42);
- promise.SetValue(456);
- UNIT_ASSERT_EQUAL(future.GetValue(), 42);
- }
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 42);
+ }
- class TCustomException: public yexception {
- };
+ class TCustomException: public yexception {
+ };
Y_UNIT_TEST(ShouldRethrowException) {
- TPromise<int> promise = NewPromise<int>();
- try {
- ythrow TCustomException();
- } catch (...) {
- promise.SetException(std::current_exception());
- }
-
- UNIT_ASSERT(!promise.HasValue());
- UNIT_ASSERT(promise.HasException());
- UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException);
+ TPromise<int> promise = NewPromise<int>();
+ try {
+ ythrow TCustomException();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ }
+
+ UNIT_ASSERT(!promise.HasValue());
+ UNIT_ASSERT(promise.HasException());
+ UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException);
UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException);
}
@@ -261,36 +261,36 @@ namespace {
Y_UNIT_TEST(ShouldWaitExceptionOrAll) {
- TPromise<void> promise1 = NewPromise();
- TPromise<void> promise2 = NewPromise();
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
TFuture<void> future = WaitExceptionOrAll(promise1, promise2);
- UNIT_ASSERT(!future.HasValue());
+ UNIT_ASSERT(!future.HasValue());
- promise1.SetValue();
- UNIT_ASSERT(!future.HasValue());
+ promise1.SetValue();
+ UNIT_ASSERT(!future.HasValue());
- promise2.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitExceptionOrAllVector) {
- TPromise<void> promise1 = NewPromise();
- TPromise<void> promise2 = NewPromise();
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
- TVector<TFuture<void>> promises;
- promises.push_back(promise1);
- promises.push_back(promise2);
+ TVector<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
TFuture<void> future = WaitExceptionOrAll(promises);
- UNIT_ASSERT(!future.HasValue());
+ UNIT_ASSERT(!future.HasValue());
- promise1.SetValue();
- UNIT_ASSERT(!future.HasValue());
+ promise1.SetValue();
+ UNIT_ASSERT(!future.HasValue());
- promise2.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorWithValueType) {
TPromise<int> promise1 = NewPromise<int>();
@@ -311,47 +311,47 @@ namespace {
}
Y_UNIT_TEST(ShouldWaitExceptionOrAllList) {
- TPromise<void> promise1 = NewPromise();
- TPromise<void> promise2 = NewPromise();
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
- std::list<TFuture<void>> promises;
- promises.push_back(promise1);
- promises.push_back(promise2);
+ std::list<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
TFuture<void> future = WaitExceptionOrAll(promises);
- UNIT_ASSERT(!future.HasValue());
+ UNIT_ASSERT(!future.HasValue());
- promise1.SetValue();
- UNIT_ASSERT(!future.HasValue());
+ promise1.SetValue();
+ UNIT_ASSERT(!future.HasValue());
- promise2.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorEmpty) {
- TVector<TFuture<void>> promises;
+ TVector<TFuture<void>> promises;
TFuture<void> future = WaitExceptionOrAll(promises);
- UNIT_ASSERT(future.HasValue());
- }
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitAnyVector) {
- TPromise<void> promise1 = NewPromise();
- TPromise<void> promise2 = NewPromise();
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
- TVector<TFuture<void>> promises;
- promises.push_back(promise1);
- promises.push_back(promise2);
+ TVector<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
- TFuture<void> future = WaitAny(promises);
- UNIT_ASSERT(!future.HasValue());
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(!future.HasValue());
- promise1.SetValue();
- UNIT_ASSERT(future.HasValue());
+ promise1.SetValue();
+ UNIT_ASSERT(future.HasValue());
- promise2.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitAnyVectorWithValueType) {
@@ -373,112 +373,112 @@ namespace {
}
Y_UNIT_TEST(ShouldWaitAnyList) {
- TPromise<void> promise1 = NewPromise();
- TPromise<void> promise2 = NewPromise();
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
- std::list<TFuture<void>> promises;
- promises.push_back(promise1);
- promises.push_back(promise2);
+ std::list<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
- TFuture<void> future = WaitAny(promises);
- UNIT_ASSERT(!future.HasValue());
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(!future.HasValue());
- promise1.SetValue();
- UNIT_ASSERT(future.HasValue());
+ promise1.SetValue();
+ UNIT_ASSERT(future.HasValue());
- promise2.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) {
- TVector<TFuture<void>> promises;
+ TVector<TFuture<void>> promises;
- TFuture<void> future = WaitAny(promises);
- UNIT_ASSERT(future.HasValue());
- }
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldWaitAny) {
- TPromise<void> promise1 = NewPromise();
- TPromise<void> promise2 = NewPromise();
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
- TFuture<void> future = WaitAny(promise1, promise2);
- UNIT_ASSERT(!future.HasValue());
+ TFuture<void> future = WaitAny(promise1, promise2);
+ UNIT_ASSERT(!future.HasValue());
- promise1.SetValue();
- UNIT_ASSERT(future.HasValue());
+ promise1.SetValue();
+ UNIT_ASSERT(future.HasValue());
- promise2.SetValue();
- UNIT_ASSERT(future.HasValue());
- }
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) {
- // compileability test
- struct TRec {
- explicit TRec(int) {
- }
- };
+ // compileability test
+ struct TRec {
+ explicit TRec(int) {
+ }
+ };
- auto promise = NewPromise<TRec>();
- promise.SetValue(TRec(1));
+ auto promise = NewPromise<TRec>();
+ promise.SetValue(TRec(1));
- auto future = MakeFuture(TRec(1));
- const auto& rec = future.GetValue();
- Y_UNUSED(rec);
- }
+ auto future = MakeFuture(TRec(1));
+ const auto& rec = future.GetValue();
+ Y_UNUSED(rec);
+ }
Y_UNIT_TEST(ShouldStoreMovableTypes) {
- // compileability test
- struct TRec : TMoveOnly {
- explicit TRec(int) {
- }
- };
+ // compileability test
+ struct TRec : TMoveOnly {
+ explicit TRec(int) {
+ }
+ };
- auto promise = NewPromise<TRec>();
- promise.SetValue(TRec(1));
+ auto promise = NewPromise<TRec>();
+ promise.SetValue(TRec(1));
- auto future = MakeFuture(TRec(1));
- const auto& rec = future.GetValue();
- Y_UNUSED(rec);
- }
+ auto future = MakeFuture(TRec(1));
+ const auto& rec = future.GetValue();
+ Y_UNUSED(rec);
+ }
Y_UNIT_TEST(ShouldMoveMovableTypes) {
- // compileability test
- struct TRec : TMoveOnly {
- explicit TRec(int) {
- }
- };
+ // compileability test
+ struct TRec : TMoveOnly {
+ explicit TRec(int) {
+ }
+ };
- auto promise = NewPromise<TRec>();
- promise.SetValue(TRec(1));
+ auto promise = NewPromise<TRec>();
+ promise.SetValue(TRec(1));
- auto future = MakeFuture(TRec(1));
- auto rec = future.ExtractValue();
- Y_UNUSED(rec);
- }
+ auto future = MakeFuture(TRec(1));
+ auto rec = future.ExtractValue();
+ Y_UNUSED(rec);
+ }
Y_UNIT_TEST(ShouldNotExtractAfterGet) {
- TPromise<int> promise = NewPromise<int>();
- promise.SetValue(123);
- UNIT_ASSERT(promise.HasValue());
- UNIT_ASSERT_EQUAL(promise.GetValue(), 123);
- UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
- }
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.GetValue(), 123);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
+ }
Y_UNIT_TEST(ShouldNotGetAfterExtract) {
- TPromise<int> promise = NewPromise<int>();
- promise.SetValue(123);
- UNIT_ASSERT(promise.HasValue());
- UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);
- UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException);
- }
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException);
+ }
Y_UNIT_TEST(ShouldNotExtractAfterExtract) {
- TPromise<int> promise = NewPromise<int>();
- promise.SetValue(123);
- UNIT_ASSERT(promise.HasValue());
- UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);
- UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
- }
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
+ }
Y_UNIT_TEST(ShouldNotExtractFromSharedDefault) {
UNIT_CHECK_GENERATED_EXCEPTION(MakeFuture<int>().ExtractValue(), TFutureException);
diff --git a/library/cpp/threading/future/legacy_future.h b/library/cpp/threading/future/legacy_future.h
index c699aadf5c..6f1eabad73 100644
--- a/library/cpp/threading/future/legacy_future.h
+++ b/library/cpp/threading/future/legacy_future.h
@@ -4,79 +4,79 @@
#include "future.h"
#include <util/thread/factory.h>
-
+
#include <functional>
-
+
namespace NThreading {
template <typename TR, bool IgnoreException>
class TLegacyFuture: public IThreadFactory::IThreadAble, TNonCopyable {
- public:
- typedef TR(TFunctionSignature)();
- using TFunctionObjectType = std::function<TFunctionSignature>;
- using TResult = typename TFunctionObjectType::result_type;
+ public:
+ typedef TR(TFunctionSignature)();
+ using TFunctionObjectType = std::function<TFunctionSignature>;
+ using TResult = typename TFunctionObjectType::result_type;
- private:
- TFunctionObjectType Func_;
- TPromise<TResult> Result_;
+ private:
+ TFunctionObjectType Func_;
+ TPromise<TResult> Result_;
THolder<IThreadFactory::IThread> Thread_;
- public:
+ public:
inline TLegacyFuture(const TFunctionObjectType func, IThreadFactory* pool = SystemThreadFactory())
- : Func_(func)
- , Result_(NewPromise<TResult>())
- , Thread_(pool->Run(this))
- {
- }
+ : Func_(func)
+ , Result_(NewPromise<TResult>())
+ , Thread_(pool->Run(this))
+ {
+ }
- inline ~TLegacyFuture() override {
- this->Join();
- }
+ inline ~TLegacyFuture() override {
+ this->Join();
+ }
- inline TResult Get() {
- this->Join();
- return Result_.GetValue();
- }
+ inline TResult Get() {
+ this->Join();
+ return Result_.GetValue();
+ }
- private:
- inline void Join() {
- if (Thread_) {
- Thread_->Join();
- Thread_.Destroy();
- }
+ private:
+ inline void Join() {
+ if (Thread_) {
+ Thread_->Join();
+ Thread_.Destroy();
+ }
}
- template <typename Result, bool IgnoreException_>
- struct TExecutor {
- static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) {
- if (IgnoreException_) {
- try {
- promise.SetValue(func());
- } catch (...) {
- }
- } else {
+ template <typename Result, bool IgnoreException_>
+ struct TExecutor {
+ static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) {
+ if (IgnoreException_) {
+ try {
+ promise.SetValue(func());
+ } catch (...) {
+ }
+ } else {
promise.SetValue(func());
}
}
- };
+ };
- template <bool IgnoreException_>
- struct TExecutor<void, IgnoreException_> {
- static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) {
- if (IgnoreException_) {
- try {
- func();
- promise.SetValue();
- } catch (...) {
- }
- } else {
+ template <bool IgnoreException_>
+ struct TExecutor<void, IgnoreException_> {
+ static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) {
+ if (IgnoreException_) {
+ try {
+ func();
+ promise.SetValue();
+ } catch (...) {
+ }
+ } else {
func();
promise.SetValue();
}
}
- };
-
- void DoExecute() override {
- TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_);
+ };
+
+ void DoExecute() override {
+ TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_);
}
};
diff --git a/library/cpp/threading/future/legacy_future_ut.cpp b/library/cpp/threading/future/legacy_future_ut.cpp
index 96b46ccebf..ff63db1725 100644
--- a/library/cpp/threading/future/legacy_future_ut.cpp
+++ b/library/cpp/threading/future/legacy_future_ut.cpp
@@ -4,69 +4,69 @@
namespace NThreading {
Y_UNIT_TEST_SUITE(TLegacyFutureTest) {
- int intf() {
- return 17;
- }
+ int intf() {
+ return 17;
+ }
Y_UNIT_TEST(TestIntFunction) {
- TLegacyFuture<int> f((&intf));
- UNIT_ASSERT_VALUES_EQUAL(17, f.Get());
- }
+ TLegacyFuture<int> f((&intf));
+ UNIT_ASSERT_VALUES_EQUAL(17, f.Get());
+ }
- static int r;
+ static int r;
- void voidf() {
- r = 18;
- }
+ void voidf() {
+ r = 18;
+ }
Y_UNIT_TEST(TestVoidFunction) {
- r = 0;
- TLegacyFuture<> f((&voidf));
- f.Get();
- UNIT_ASSERT_VALUES_EQUAL(18, r);
- }
+ r = 0;
+ TLegacyFuture<> f((&voidf));
+ f.Get();
+ UNIT_ASSERT_VALUES_EQUAL(18, r);
+ }
- struct TSampleClass {
- int mValue;
+ struct TSampleClass {
+ int mValue;
- TSampleClass(int value)
- : mValue(value)
- {
- }
+ TSampleClass(int value)
+ : mValue(value)
+ {
+ }
- int Calc() {
- return mValue + 1;
- }
- };
+ int Calc() {
+ return mValue + 1;
+ }
+ };
Y_UNIT_TEST(TestMethod) {
- TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3)));
- UNIT_ASSERT_VALUES_EQUAL(4, f11.Get());
+ TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3)));
+ UNIT_ASSERT_VALUES_EQUAL(4, f11.Get());
TLegacyFuture<int> f12(std::bind(&TSampleClass::Calc, TSampleClass(3)), SystemThreadFactory());
- UNIT_ASSERT_VALUES_EQUAL(4, f12.Get());
+ UNIT_ASSERT_VALUES_EQUAL(4, f12.Get());
- TSampleClass c(5);
+ TSampleClass c(5);
- TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c)));
- UNIT_ASSERT_VALUES_EQUAL(6, f21.Get());
+ TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c)));
+ UNIT_ASSERT_VALUES_EQUAL(6, f21.Get());
TLegacyFuture<int> f22(std::bind(&TSampleClass::Calc, std::ref(c)), SystemThreadFactory());
- UNIT_ASSERT_VALUES_EQUAL(6, f22.Get());
- }
+ UNIT_ASSERT_VALUES_EQUAL(6, f22.Get());
+ }
struct TSomeThreadPool: public IThreadFactory {};
Y_UNIT_TEST(TestFunction) {
- std::function<int()> f((&intf));
+ std::function<int()> f((&intf));
- UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get());
+ UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get());
UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f, SystemThreadFactory()).Get());
- if (false) {
- TSomeThreadPool* q = nullptr;
- TLegacyFuture<int>(f, q); // just check compiles, do not start
- }
+ if (false) {
+ TSomeThreadPool* q = nullptr;
+ TLegacyFuture<int>(f, q); // just check compiles, do not start
+ }
}
}
diff --git a/library/cpp/threading/future/perf/main.cpp b/library/cpp/threading/future/perf/main.cpp
index c7da5a51f3..5a0690af47 100644
--- a/library/cpp/threading/future/perf/main.cpp
+++ b/library/cpp/threading/future/perf/main.cpp
@@ -7,44 +7,44 @@
using namespace NThreading;
template <typename T>
-void TestAllocPromise(const NBench::NCpu::TParams& iface) {
- for (const auto it : xrange(iface.Iterations())) {
+void TestAllocPromise(const NBench::NCpu::TParams& iface) {
+ for (const auto it : xrange(iface.Iterations())) {
Y_UNUSED(it);
Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>());
}
}
template <typename T>
-TPromise<T> SetPromise(T value) {
+TPromise<T> SetPromise(T value) {
auto promise = NewPromise<T>();
promise.SetValue(value);
return promise;
}
template <typename T>
-void TestSetPromise(const NBench::NCpu::TParams& iface, T value) {
- for (const auto it : xrange(iface.Iterations())) {
+void TestSetPromise(const NBench::NCpu::TParams& iface, T value) {
+ for (const auto it : xrange(iface.Iterations())) {
Y_UNUSED(it);
Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value));
}
}
-Y_CPU_BENCHMARK(AllocPromiseVoid, iface) {
+Y_CPU_BENCHMARK(AllocPromiseVoid, iface) {
TestAllocPromise<void>(iface);
}
-Y_CPU_BENCHMARK(AllocPromiseUI64, iface) {
+Y_CPU_BENCHMARK(AllocPromiseUI64, iface) {
TestAllocPromise<ui64>(iface);
}
-Y_CPU_BENCHMARK(AllocPromiseStroka, iface) {
+Y_CPU_BENCHMARK(AllocPromiseStroka, iface) {
TestAllocPromise<TString>(iface);
}
-Y_CPU_BENCHMARK(SetPromiseUI64, iface) {
+Y_CPU_BENCHMARK(SetPromiseUI64, iface) {
TestSetPromise<ui64>(iface, 1234567890ull);
}
-Y_CPU_BENCHMARK(SetPromiseStroka, iface) {
+Y_CPU_BENCHMARK(SetPromiseStroka, iface) {
TestSetPromise<TString>(iface, "test test test");
}
diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h
index 9d056ff777..2753d5446c 100644
--- a/library/cpp/threading/future/wait/wait-inl.h
+++ b/library/cpp/threading/future/wait/wait-inl.h
@@ -2,10 +2,10 @@
#if !defined(INCLUDE_FUTURE_INL_H)
#error "you should never include wait-inl.h directly"
-#endif // INCLUDE_FUTURE_INL_H
+#endif // INCLUDE_FUTURE_INL_H
namespace NThreading {
- namespace NImpl {
+ namespace NImpl {
template <typename TContainer>
TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) {
TVector<TFuture<void>> voidFutures;
@@ -17,19 +17,19 @@ namespace NThreading {
return voidFutures;
}
- }
+ }
template <typename TContainer>
[[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) {
return WaitAll(NImpl::ToVoidFutures(futures));
}
- template <typename TContainer>
+ template <typename TContainer>
[[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) {
return WaitExceptionOrAll(NImpl::ToVoidFutures(futures));
}
- template <typename TContainer>
+ template <typename TContainer>
[[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) {
return WaitAny(NImpl::ToVoidFutures(futures));
}
diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp
index e0a1c3bbd3..a173833a7f 100644
--- a/library/cpp/threading/future/wait/wait.cpp
+++ b/library/cpp/threading/future/wait/wait.cpp
@@ -31,13 +31,13 @@ namespace NThreading {
TWaitGroup<WaitPolicy> wg;
for (const auto& fut : futures) {
wg.Add(fut);
- }
+ }
return std::move(wg).Finish();
}
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
TFuture<void> WaitAll(const TFuture<void>& f1) {
return WaitGeneric<TWaitPolicy::TAll>(f1);
@@ -56,25 +56,25 @@ namespace NThreading {
TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) {
return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1);
- }
+ }
TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) {
return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2);
- }
+ }
TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) {
return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures);
}
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
TFuture<void> WaitAny(const TFuture<void>& f1) {
return WaitGeneric<TWaitPolicy::TAny>(f1);
- }
+ }
TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) {
return WaitGeneric<TWaitPolicy::TAny>(f1, f2);
- }
+ }
TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) {
return WaitGeneric<TWaitPolicy::TAny>(futures);
diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h
index 60ec5b6a63..6ff7d57baa 100644
--- a/library/cpp/threading/future/wait/wait.h
+++ b/library/cpp/threading/future/wait/wait.h
@@ -25,16 +25,16 @@ namespace NThreading {
[[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1);
[[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2);
[[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures);
- template <typename TContainer>
+ template <typename TContainer>
[[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures);
- // waits for any future
+ // waits for any future
[[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1);
[[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2);
[[nodiscard]] TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures);
- template <typename TContainer>
+ template <typename TContainer>
[[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures);
-}
+}
#define INCLUDE_FUTURE_INL_H
#include "wait-inl.h"
diff --git a/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp b/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp
index 4faa53fc2a..c3027ea544 100644
--- a/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp
+++ b/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp
@@ -28,7 +28,7 @@ static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) {
}
#elif defined(__GNUC__)
#else
-#error unsupported platform
+#error unsupported platform
#endif
class TPosixRWLock {
diff --git a/library/cpp/threading/light_rw_lock/lightrwlock.cpp b/library/cpp/threading/light_rw_lock/lightrwlock.cpp
index 107c8ec3bf..fbb63fd47f 100644
--- a/library/cpp/threading/light_rw_lock/lightrwlock.cpp
+++ b/library/cpp/threading/light_rw_lock/lightrwlock.cpp
@@ -79,7 +79,7 @@ void TLightRWLock::WaitForUntrappedAndAcquireRead() {
}
skip_lock_try:
- if (AtomicLoad(UnshareFutex_) && (AtomicLoad(Counter_) & 0x7FFFFFFF) == 0) {
+ if (AtomicLoad(UnshareFutex_) && (AtomicLoad(Counter_) & 0x7FFFFFFF) == 0) {
SequenceStore(UnshareFutex_, 0);
FutexWake(UnshareFutex_, 1);
}
diff --git a/library/cpp/threading/light_rw_lock/lightrwlock.h b/library/cpp/threading/light_rw_lock/lightrwlock.h
index 4c648fc2a3..931a1817bc 100644
--- a/library/cpp/threading/light_rw_lock/lightrwlock.h
+++ b/library/cpp/threading/light_rw_lock/lightrwlock.h
@@ -1,5 +1,5 @@
#pragma once
-
+
#include <util/system/rwlock.h>
#include <util/system/sanitizers.h>
@@ -34,55 +34,55 @@
#include <errno.h>
namespace NS_LightRWLock {
- static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) {
- return __atomic_fetch_add(&item, value, __ATOMIC_SEQ_CST);
- }
+ static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) {
+ return __atomic_fetch_add(&item, value, __ATOMIC_SEQ_CST);
+ }
#if defined(_x86_64_) || defined(_i386_)
- static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) {
- char ret;
- __asm__ __volatile__(
- "lock bts %2,%0\n"
- "setc %1\n"
- : "+m"(item), "=rm"(ret)
- : "r"(bit)
- : "cc");
+ static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) {
+ char ret;
+ __asm__ __volatile__(
+ "lock bts %2,%0\n"
+ "setc %1\n"
+ : "+m"(item), "=rm"(ret)
+ : "r"(bit)
+ : "cc");
// msan doesn't treat ret as initialized
NSan::Unpoison(&ret, sizeof(ret));
- return ret;
- }
+ return ret;
+ }
- static char Y_FORCE_INLINE AtomicClearBit(volatile int& item, unsigned bit) {
- char ret;
- __asm__ __volatile__(
- "lock btc %2,%0\n"
- "setc %1\n"
- : "+m"(item), "=rm"(ret)
- : "r"(bit)
- : "cc");
+ static char Y_FORCE_INLINE AtomicClearBit(volatile int& item, unsigned bit) {
+ char ret;
+ __asm__ __volatile__(
+ "lock btc %2,%0\n"
+ "setc %1\n"
+ : "+m"(item), "=rm"(ret)
+ : "r"(bit)
+ : "cc");
// msan doesn't treat ret as initialized
NSan::Unpoison(&ret, sizeof(ret));
- return ret;
- }
+ return ret;
+ }
#else
- static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) {
- int prev = __atomic_fetch_or(&item, 1 << bit, __ATOMIC_SEQ_CST);
- return (prev & (1 << bit)) != 0 ? 1 : 0;
- }
+ static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) {
+ int prev = __atomic_fetch_or(&item, 1 << bit, __ATOMIC_SEQ_CST);
+ return (prev & (1 << bit)) != 0 ? 1 : 0;
+ }
- static char Y_FORCE_INLINE
- AtomicClearBit(volatile int& item, unsigned bit) {
- int prev = __atomic_fetch_and(&item, ~(1 << bit), __ATOMIC_SEQ_CST);
- return (prev & (1 << bit)) != 0 ? 1 : 0;
- }
+ static char Y_FORCE_INLINE
+ AtomicClearBit(volatile int& item, unsigned bit) {
+ int prev = __atomic_fetch_and(&item, ~(1 << bit), __ATOMIC_SEQ_CST);
+ return (prev & (1 << bit)) != 0 ? 1 : 0;
+ }
#endif
#if defined(_x86_64_) || defined(_i386_) || defined (__aarch64__) || defined (__powerpc64__)
@@ -100,42 +100,42 @@ namespace NS_LightRWLock {
#endif
- template <typename TInt>
- static void Y_FORCE_INLINE AtomicStore(volatile TInt& var, TInt value) {
- __atomic_store_n(&var, value, __ATOMIC_RELEASE);
- }
-
- template <typename TInt>
- static void Y_FORCE_INLINE SequenceStore(volatile TInt& var, TInt value) {
- __atomic_store_n(&var, value, __ATOMIC_SEQ_CST);
- }
-
- template <typename TInt>
- static TInt Y_FORCE_INLINE AtomicLoad(const volatile TInt& var) {
- return __atomic_load_n(&var, __ATOMIC_ACQUIRE);
- }
-
- static void Y_FORCE_INLINE FutexWait(volatile int& fvar, int value) {
- for (;;) {
- int result =
- syscall(SYS_futex, &fvar, FUTEX_WAIT_PRIVATE, value, NULL, NULL, 0);
- if (Y_UNLIKELY(result == -1)) {
- if (errno == EWOULDBLOCK)
- return;
- if (errno == EINTR)
- continue;
- Y_FAIL("futex error");
- }
+ template <typename TInt>
+ static void Y_FORCE_INLINE AtomicStore(volatile TInt& var, TInt value) {
+ __atomic_store_n(&var, value, __ATOMIC_RELEASE);
+ }
+
+ template <typename TInt>
+ static void Y_FORCE_INLINE SequenceStore(volatile TInt& var, TInt value) {
+ __atomic_store_n(&var, value, __ATOMIC_SEQ_CST);
+ }
+
+ template <typename TInt>
+ static TInt Y_FORCE_INLINE AtomicLoad(const volatile TInt& var) {
+ return __atomic_load_n(&var, __ATOMIC_ACQUIRE);
+ }
+
+ static void Y_FORCE_INLINE FutexWait(volatile int& fvar, int value) {
+ for (;;) {
+ int result =
+ syscall(SYS_futex, &fvar, FUTEX_WAIT_PRIVATE, value, NULL, NULL, 0);
+ if (Y_UNLIKELY(result == -1)) {
+ if (errno == EWOULDBLOCK)
+ return;
+ if (errno == EINTR)
+ continue;
+ Y_FAIL("futex error");
+ }
}
}
- static void Y_FORCE_INLINE FutexWake(volatile int& fvar, int amount) {
- const int result =
- syscall(SYS_futex, &fvar, FUTEX_WAKE_PRIVATE, amount, NULL, NULL, 0);
- if (Y_UNLIKELY(result == -1))
- Y_FAIL("futex error");
- }
-
+ static void Y_FORCE_INLINE FutexWake(volatile int& fvar, int amount) {
+ const int result =
+ syscall(SYS_futex, &fvar, FUTEX_WAKE_PRIVATE, amount, NULL, NULL, 0);
+ if (Y_UNLIKELY(result == -1))
+ Y_FAIL("futex error");
+ }
+
}
class alignas(64) TLightRWLock {
@@ -145,8 +145,8 @@ public:
, TrappedFutex_(0)
, UnshareFutex_(0)
, SpinCount_(spinCount)
- {
- }
+ {
+ }
TLightRWLock(const TLightRWLock&) = delete;
void operator=(const TLightRWLock&) = delete;
@@ -208,10 +208,10 @@ private:
class TLightRWLock: public TRWMutex {
public:
- TLightRWLock() {
- }
- TLightRWLock(ui32) {
- }
+ TLightRWLock() {
+ }
+ TLightRWLock(ui32) {
+ }
};
#endif
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index f2b825681f..1d3fbb4bf4 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -40,7 +40,7 @@ namespace {
NPar::TLocallyExecutableFunction Exec;
int FirstId, LastId;
TVector<NThreading::TPromise<void>> Promises;
-
+
public:
TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
: Exec(std::move(exec))
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
index 33476722b8..c1c824f67c 100644
--- a/library/cpp/threading/local_executor/local_executor.h
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -5,7 +5,7 @@
#include <util/generic/cast.h>
#include <util/generic/fwd.h>
#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
+#include <util/generic/ptr.h>
#include <util/generic/singleton.h>
#include <util/generic/ymath.h>
@@ -135,9 +135,9 @@ namespace NPar {
//
TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
- template <typename TBody>
+ template <typename TBody>
static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
- return [=](int blockId) {
+ return [=](int blockId) {
const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
for (int i = blockFirstId; i < blockLastId; ++i) {
diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
index ccc833c1b9..ac5737717c 100644
--- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
+++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
@@ -15,315 +15,315 @@ static const int DefaultThreadsCount = 41;
static const int DefaultRangeSize = 999;
Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
- bool AllOf(const TVector<int>& vec, int value){
+ bool AllOf(const TVector<int>& vec, int value){
return AllOf(vec, [value](int element) { return value == element; });
-}
-
-void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(threads);
- TAtomic signal = 0;
- TVector<int> data(rangeSize, 0);
- TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
- UNIT_ASSERT(data[i] == 0);
- while (AtomicGet(signal) == 0)
- ;
- data[i] += 1;
- },
- 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
- UNIT_ASSERT(AllOf(data, 0));
- for (auto& future : futures)
- UNIT_ASSERT(!future.HasValue());
- AtomicSet(signal, 1);
- for (auto& future : futures) {
- future.GetValueSync();
+}
+
+void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threads);
+ TAtomic signal = 0;
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data[i] += 1;
+ },
+ 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data, 0));
+ for (auto& future : futures)
+ UNIT_ASSERT(!future.HasValue());
+ AtomicSet(signal, 1);
+ for (auto& future : futures) {
+ future.GetValueSync();
}
- UNIT_ASSERT(AllOf(data, 1));
-}
+ UNIT_ASSERT(AllOf(data, 1));
+}
Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
- AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);
-}
+ AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);
+}
Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
- AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);
-}
+ AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);
+}
Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
- AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);
-}
+ AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);
+}
Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
- AsyncRunAndWaitFuturesReady(1, 1);
-}
+ AsyncRunAndWaitFuturesReady(1, 1);
+}
Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(DefaultThreadsCount);
- TAtomic signal = 0;
- TVector<int> data1(DefaultRangeSize, 0);
- TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
- UNIT_ASSERT(data1[i] == 0);
- while (AtomicGet(signal) == 0)
- ;
- data1[i] += 1;
- },
- 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
- TVector<int> data2(DefaultRangeSize, 0);
- TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
- UNIT_ASSERT(data2[i] == 0);
- while (AtomicGet(signal) == 0)
- ;
- data2[i] += 2;
- },
- 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
- UNIT_ASSERT(AllOf(data1, 0));
- UNIT_ASSERT(AllOf(data2, 0));
- AtomicSet(signal, 1);
- for (int i = 0; i < DefaultRangeSize; ++i) {
- futures1[i].GetValueSync();
- futures2[i].GetValueSync();
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ TAtomic signal = 0;
+ TVector<int> data1(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
+ UNIT_ASSERT(data1[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data1[i] += 1;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+ TVector<int> data2(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
+ UNIT_ASSERT(data2[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data2[i] += 2;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data1, 0));
+ UNIT_ASSERT(AllOf(data2, 0));
+ AtomicSet(signal, 1);
+ for (int i = 0; i < DefaultRangeSize; ++i) {
+ futures1[i].GetValueSync();
+ futures2[i].GetValueSync();
}
- UNIT_ASSERT(AllOf(data1, 1));
- UNIT_ASSERT(AllOf(data2, 2));
-}
-
-void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(threadsCount);
- TAtomic signal = 0;
- TVector<int> data(rangeSize, 0);
- TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
- UNIT_ASSERT(data[i] == 0);
- while (AtomicGet(signal) == 0)
- ;
- data[i] += 1;
- throw 10000 + i;
- },
- 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
- UNIT_ASSERT(AllOf(data, 0));
- UNIT_ASSERT(futures.ysize() == rangeSize);
- AtomicSet(signal, 1);
- int exceptionsCaught = 0;
- for (int i = 0; i < rangeSize; ++i) {
- try {
- futures[i].GetValueSync();
- } catch (int& e) {
- if (e == 10000 + i) {
- ++exceptionsCaught;
+ UNIT_ASSERT(AllOf(data1, 1));
+ UNIT_ASSERT(AllOf(data2, 2));
+}
+
+void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threadsCount);
+ TAtomic signal = 0;
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data[i] += 1;
+ throw 10000 + i;
+ },
+ 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data, 0));
+ UNIT_ASSERT(futures.ysize() == rangeSize);
+ AtomicSet(signal, 1);
+ int exceptionsCaught = 0;
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ futures[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 10000 + i) {
+ ++exceptionsCaught;
}
}
}
- UNIT_ASSERT(exceptionsCaught == rangeSize);
- UNIT_ASSERT(AllOf(data, 1));
-}
+ UNIT_ASSERT(exceptionsCaught == rangeSize);
+ UNIT_ASSERT(AllOf(data, 1));
+}
Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
- AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);
-}
+ AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);
+}
Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
- AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);
-}
+ AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);
+}
Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
- AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);
-}
+ AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);
+}
Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
- AsyncRunRangeAndWaitExceptions(1, 1);
-}
+ AsyncRunRangeAndWaitExceptions(1, 1);
+}
Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(DefaultThreadsCount);
- TAtomic signal = 0;
- TVector<int> data1(DefaultRangeSize, 0);
- TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
- UNIT_ASSERT(data1[i] == 0);
- while (AtomicGet(signal) == 0)
- ;
- data1[i] += 1;
- throw 15000 + i;
- },
- 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY);
- TVector<int> data2(DefaultRangeSize, 0);
- TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
- UNIT_ASSERT(data2[i] == 0);
- while (AtomicGet(signal) == 0)
- ;
- data2[i] += 2;
- throw 16000 + i;
- },
- 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
-
- UNIT_ASSERT(AllOf(data1, 0));
- UNIT_ASSERT(AllOf(data2, 0));
- UNIT_ASSERT(futures1.size() == DefaultRangeSize);
- UNIT_ASSERT(futures2.size() == DefaultRangeSize);
- AtomicSet(signal, 1);
- int exceptionsCaught = 0;
- for (int i = 0; i < DefaultRangeSize; ++i) {
- try {
- futures1[i].GetValueSync();
- } catch (int& e) {
- if (e == 15000 + i) {
- ++exceptionsCaught;
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ TAtomic signal = 0;
+ TVector<int> data1(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
+ UNIT_ASSERT(data1[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data1[i] += 1;
+ throw 15000 + i;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY);
+ TVector<int> data2(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
+ UNIT_ASSERT(data2[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data2[i] += 2;
+ throw 16000 + i;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+
+ UNIT_ASSERT(AllOf(data1, 0));
+ UNIT_ASSERT(AllOf(data2, 0));
+ UNIT_ASSERT(futures1.size() == DefaultRangeSize);
+ UNIT_ASSERT(futures2.size() == DefaultRangeSize);
+ AtomicSet(signal, 1);
+ int exceptionsCaught = 0;
+ for (int i = 0; i < DefaultRangeSize; ++i) {
+ try {
+ futures1[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 15000 + i) {
+ ++exceptionsCaught;
}
- }
- try {
- futures2[i].GetValueSync();
- } catch (int& e) {
- if (e == 16000 + i) {
- ++exceptionsCaught;
+ }
+ try {
+ futures2[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 16000 + i) {
+ ++exceptionsCaught;
}
}
}
- UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize);
- UNIT_ASSERT(AllOf(data1, 1));
- UNIT_ASSERT(AllOf(data2, 2));
-}
-
-void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) {
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(threadsCount);
- TVector<int> data(rangeSize, 0);
- TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) {
- UNIT_ASSERT(data[i] == 0);
- data[i] += 1;
- throw 30000 + i;
- },
- 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
- UNIT_ASSERT(AllOf(data, 1));
- int exceptionsCaught = 0;
- for (int i = 0; i < rangeSize; ++i) {
- try {
- futures[i].GetValueSync();
- } catch (int& e) {
- if (e == 30000 + i) {
- ++exceptionsCaught;
+ UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize);
+ UNIT_ASSERT(AllOf(data1, 1));
+ UNIT_ASSERT(AllOf(data2, 2));
+}
+
+void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threadsCount);
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ data[i] += 1;
+ throw 30000 + i;
+ },
+ 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(data, 1));
+ int exceptionsCaught = 0;
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ futures[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 30000 + i) {
+ ++exceptionsCaught;
}
}
}
- UNIT_ASSERT(exceptionsCaught == rangeSize);
- UNIT_ASSERT(AllOf(data, 1));
-}
+ UNIT_ASSERT(exceptionsCaught == rangeSize);
+ UNIT_ASSERT(AllOf(data, 1));
+}
Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
- RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);
-}
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);
+}
Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
- RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);
-}
+ RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);
+}
Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
- RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);
-}
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);
+}
Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
- RunRangeAndCheckExceptionsWithWaitComplete(1, 1);
-}
+ RunRangeAndCheckExceptionsWithWaitComplete(1, 1);
+}
Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
- RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);
-}
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);
+}
Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
- RunRangeAndCheckExceptionsWithWaitComplete(1, 0);
-}
-}
-;
+ RunRangeAndCheckExceptionsWithWaitComplete(1, 0);
+}
+}
+;
Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
- void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){
- AtomicSet(processed, 0);
-TLocalExecutor localExecutor;
-localExecutor.RunAdditionalThreads(threadsCount);
-localExecutor.ExecRangeWithThrow([&processed](int) {
- AtomicAdd(processed, 1);
- throw TTestException();
-},
- rangeStart, rangeStart + rangeSize, flags);
-}
+ void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){
+ AtomicSet(processed, 0);
+TLocalExecutor localExecutor;
+localExecutor.RunAdditionalThreads(threadsCount);
+localExecutor.ExecRangeWithThrow([&processed](int) {
+ AtomicAdd(processed, 1);
+ throw TTestException();
+},
+ rangeStart, rangeStart + rangeSize, flags);
+}
Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
- TAtomic processed = 0;
- UNIT_ASSERT_EXCEPTION(
- RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,
- TLocalExecutor::EFlags::WAIT_COMPLETE, processed),
- TTestException);
- UNIT_ASSERT(AtomicGet(processed) == 40);
-}
-
-void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {
- TAtomic processed = 0;
- UNIT_ASSERT_EXCEPTION(
- RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed),
- TTestException);
- UNIT_ASSERT(AtomicGet(processed) == rangeSize);
-}
+ TAtomic processed = 0;
+ UNIT_ASSERT_EXCEPTION(
+ RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE, processed),
+ TTestException);
+ UNIT_ASSERT(AtomicGet(processed) == 40);
+}
+
+void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {
+ TAtomic processed = 0;
+ UNIT_ASSERT_EXCEPTION(
+ RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed),
+ TTestException);
+ UNIT_ASSERT(AtomicGet(processed) == rangeSize);
+}
Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
- ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
- TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);
-}
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);
+}
Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
- ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
- TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);
-}
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);
+}
Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
- ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
- TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);
-}
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);
+}
Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
- ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
- TLocalExecutor::EFlags::WAIT_COMPLETE);
-}
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
- ThrowAndCatchTTestException(DefaultRangeSize, 0,
- TLocalExecutor::EFlags::WAIT_COMPLETE);
-}
+ ThrowAndCatchTTestException(DefaultRangeSize, 0,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
- ThrowAndCatchTTestException(DefaultRangeSize, 1,
- TLocalExecutor::EFlags::WAIT_COMPLETE);
-}
-
-void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) {
- localExecutor.ExecRangeWithThrow([](int) {
- throw TTestException();
- },
- 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE);
-}
-
-void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(DefaultThreadsCount);
- localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) {
- AtomicAdd(processed1, 1);
- UNIT_ASSERT_EXCEPTION(
- ThrowsTTestExceptionFromNested(localExecutor),
- TTestException);
- AtomicAdd(processed2, 1);
- },
- 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
-}
+ ThrowAndCatchTTestException(DefaultRangeSize, 1,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) {
+ localExecutor.ExecRangeWithThrow([](int) {
+ throw TTestException();
+ },
+ 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) {
+ AtomicAdd(processed1, 1);
+ UNIT_ASSERT_EXCEPTION(
+ ThrowsTTestExceptionFromNested(localExecutor),
+ TTestException);
+ AtomicAdd(processed2, 1);
+ },
+ 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
- TAtomic processed1 = 0;
- TAtomic processed2 = 0;
- UNIT_ASSERT_NO_EXCEPTION(
- CatchTTestExceptionFromNested(processed1, processed2));
- UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize);
- UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize);
-}
-}
-;
+ TAtomic processed1 = 0;
+ TAtomic processed2 = 0;
+ UNIT_ASSERT_NO_EXCEPTION(
+ CatchTTestExceptionFromNested(processed1, processed2));
+ UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize);
+ UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize);
+}
+}
+;
Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.cpp b/library/cpp/threading/poor_man_openmp/thread_helper.cpp
index b4ec5c7879..34cb6507b9 100644
--- a/library/cpp/threading/poor_man_openmp/thread_helper.cpp
+++ b/library/cpp/threading/poor_man_openmp/thread_helper.cpp
@@ -1,7 +1,7 @@
#include "thread_helper.h"
-
-#include <util/generic/singleton.h>
-
-TMtpQueueHelper& TMtpQueueHelper::Instance() {
- return *Singleton<TMtpQueueHelper>();
-}
+
+#include <util/generic/singleton.h>
+
+TMtpQueueHelper& TMtpQueueHelper::Instance() {
+ return *Singleton<TMtpQueueHelper>();
+}
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.h b/library/cpp/threading/poor_man_openmp/thread_helper.h
index 1536c186cb..0ecee0590b 100644
--- a/library/cpp/threading/poor_man_openmp/thread_helper.h
+++ b/library/cpp/threading/poor_man_openmp/thread_helper.h
@@ -2,17 +2,17 @@
#include <util/thread/pool.h>
#include <util/generic/utility.h>
-#include <util/generic/yexception.h>
+#include <util/generic/yexception.h>
#include <util/system/info.h>
#include <util/system/atomic.h>
#include <util/system/condvar.h>
#include <util/system/mutex.h>
-#include <util/stream/output.h>
+#include <util/stream/output.h>
#include <functional>
-#include <cstdlib>
+#include <cstdlib>
-class TMtpQueueHelper {
+class TMtpQueueHelper {
public:
TMtpQueueHelper() {
SetThreadCount(NSystemInfo::CachedNumberOfCpus());
@@ -27,79 +27,79 @@ public:
ThreadCount = threads;
q = CreateThreadPool(ThreadCount);
}
-
- static TMtpQueueHelper& Instance();
-
+
+ static TMtpQueueHelper& Instance();
+
private:
size_t ThreadCount;
TAutoPtr<IThreadPool> q;
};
-namespace NYmp {
+namespace NYmp {
inline void SetThreadCount(size_t threads) {
- TMtpQueueHelper::Instance().SetThreadCount(threads);
+ TMtpQueueHelper::Instance().SetThreadCount(threads);
}
inline size_t GetThreadCount() {
- return TMtpQueueHelper::Instance().GetThreadCount();
+ return TMtpQueueHelper::Instance().GetThreadCount();
}
- template <typename T>
+ template <typename T>
inline void ParallelForStaticChunk(T begin, T end, size_t chunkSize, std::function<void(T)> func) {
- chunkSize = Max<size_t>(chunkSize, 1);
-
- size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();
+ chunkSize = Max<size_t>(chunkSize, 1);
+
+ size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();
IThreadPool* queue = TMtpQueueHelper::Instance().Get();
TCondVar cv;
TMutex mutex;
TAtomic counter = threadCount;
- std::exception_ptr err;
-
- for (size_t i = 0; i < threadCount; ++i) {
- queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() {
- try {
- T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize);
-
- while (currentChunkStart < end) {
- T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize);
-
- for (T val = currentChunkStart; val < currentChunkEnd; ++val) {
- func(val);
- }
-
- currentChunkStart += chunkSize * threadCount;
+ std::exception_ptr err;
+
+ for (size_t i = 0; i < threadCount; ++i) {
+ queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() {
+ try {
+ T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize);
+
+ while (currentChunkStart < end) {
+ T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize);
+
+ for (T val = currentChunkStart; val < currentChunkEnd; ++val) {
+ func(val);
+ }
+
+ currentChunkStart += chunkSize * threadCount;
+ }
+ } catch (...) {
+ with_lock (mutex) {
+ err = std::current_exception();
+ }
+ }
+
+ with_lock (mutex) {
+ if (AtomicDecrement(counter) == 0) {
+ //last one
+ cv.Signal();
}
- } catch (...) {
- with_lock (mutex) {
- err = std::current_exception();
- }
}
-
- with_lock (mutex) {
- if (AtomicDecrement(counter) == 0) {
- //last one
- cv.Signal();
- }
- }
});
}
-
- with_lock (mutex) {
- while (AtomicGet(counter) > 0) {
- cv.WaitI(mutex);
- }
+
+ with_lock (mutex) {
+ while (AtomicGet(counter) > 0) {
+ cv.WaitI(mutex);
+ }
+ }
+
+ if (err) {
+ std::rethrow_exception(err);
}
-
- if (err) {
- std::rethrow_exception(err);
- }
}
- template <typename T>
+ template <typename T>
inline void ParallelForStaticAutoChunk(T begin, T end, std::function<void(T)> func) {
- const size_t taskSize = end - begin;
- const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();
-
+ const size_t taskSize = end - begin;
+ const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();
+
ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func);
}
-}
+}
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp
index 79c7a14b5e..7417636864 100644
--- a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp
+++ b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp
@@ -1,26 +1,26 @@
-#include "thread_helper.h"
-
+#include "thread_helper.h"
+
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <util/generic/string.h>
-#include <util/generic/yexception.h>
-
+#include <util/generic/yexception.h>
+
Y_UNIT_TEST_SUITE(TestMP) {
Y_UNIT_TEST(TestErr) {
- std::function<void(int)> f = [](int x) {
- if (x == 5) {
- ythrow yexception() << "oops";
- }
- };
-
+ std::function<void(int)> f = [](int x) {
+ if (x == 5) {
+ ythrow yexception() << "oops";
+ }
+ };
+
TString s;
-
- try {
- NYmp::ParallelForStaticAutoChunk(0, 10, f);
- } catch (...) {
- s = CurrentExceptionMessage();
- }
-
- UNIT_ASSERT(s.find("oops") > 0);
- }
-}
+
+ try {
+ NYmp::ParallelForStaticAutoChunk(0, 10, f);
+ } catch (...) {
+ s = CurrentExceptionMessage();
+ }
+
+ UNIT_ASSERT(s.find("oops") > 0);
+ }
+}
diff --git a/library/cpp/threading/poor_man_openmp/ut/ya.make b/library/cpp/threading/poor_man_openmp/ut/ya.make
index 7305d14b99..6d7aa123ed 100644
--- a/library/cpp/threading/poor_man_openmp/ut/ya.make
+++ b/library/cpp/threading/poor_man_openmp/ut/ya.make
@@ -1,12 +1,12 @@
UNITTEST_FOR(library/cpp/threading/poor_man_openmp)
-
+
OWNER(
pg
agorodilov
)
-
-SRCS(
- thread_helper_ut.cpp
-)
-
-END()
+
+SRCS(
+ thread_helper_ut.cpp
+)
+
+END()
diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h
index 420b1e8829..c42caa7ac0 100644
--- a/library/cpp/threading/queue/mpsc_htswap.h
+++ b/library/cpp/threading/queue/mpsc_htswap.h
@@ -28,8 +28,8 @@ namespace NThreading {
namespace NHTSwapPrivate {
template <typename T, typename TTuneup>
struct TNode
- : public TTuneup::TNodeBase,
- public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
+ : public TTuneup::TNodeBase,
+ public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
TNode(const T& item) {
this->Next = nullptr;
this->Item = item;
@@ -60,7 +60,7 @@ namespace NThreading {
template <typename T, typename TTuneup>
class THTSwapQueueImpl
- : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
+ : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
protected:
using TTunedNode = TNode<T, TTuneup>;
@@ -124,9 +124,9 @@ namespace NThreading {
DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
- template <typename T = void*, typename... TParams>
+ template <typename T = void*, typename... TParams>
class THTSwapQueue
- : public NHTSwapPrivate::THTSwapQueueImpl<T,
- TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
+ : public NHTSwapPrivate::THTSwapQueueImpl<T,
+ TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
};
}
diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h
index 97e6131dd4..6ac7537ae9 100644
--- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h
+++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h
@@ -25,7 +25,7 @@ namespace NThreading {
void Push(void* node) noexcept {
Push(reinterpret_cast<TIntrusiveNode*>(node));
}
-
+
private:
TIntrusiveNode* HeadForCaS = nullptr;
TIntrusiveNode* HeadForSwap = nullptr;
diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h
index 621517328e..be33ba5a58 100644
--- a/library/cpp/threading/queue/mpsc_read_as_filled.h
+++ b/library/cpp/threading/queue/mpsc_read_as_filled.h
@@ -132,7 +132,7 @@ namespace NThreading {
TMsgBunch* volatile NextToken;
/* this push can return PUSH_RESULT_BLOCKED */
- inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
+ inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {
if (Y_UNLIKELY(slot < FirstSlot)) {
return PUSH_RESULT_BACKWARD;
}
@@ -194,7 +194,7 @@ namespace NThreading {
// the object could be destroyed after this method
inline void SetNextToken(TMsgBunch* next) {
AtomicSet(NextToken, next);
- if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) {
+ if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) {
Release(this);
next->DecrementToken();
}
@@ -317,8 +317,8 @@ namespace NThreading {
}
};
- template <typename TWBucket = TWriteBucket<>,
- template <typename, typename...> class TContainer = TDeque>
+ template <typename TWBucket = TWriteBucket<>,
+ template <typename, typename...> class TContainer = TDeque>
class TReadBucket {
public:
using TAux = typename TWBucket::TUsingAux;
@@ -543,7 +543,7 @@ namespace NThreading {
static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;
using TBunchBase = TEmpty;
- template <typename TElem, typename... TRest>
+ template <typename TElem, typename... TRest>
using TContainer = TDeque<TElem, TRest...>;
static constexpr bool DeleteItems = true;
@@ -556,7 +556,7 @@ namespace NThreading {
DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);
DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems);
- template <typename TItem = void, typename... TParams>
+ template <typename TItem = void, typename... TParams>
class TReadAsFilledQueue {
private:
using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>;
@@ -565,7 +565,7 @@ namespace NThreading {
using TBunchBase = typename TTuned::TBunchBase;
- template <typename TElem, typename... TRest>
+ template <typename TElem, typename... TRest>
using TContainer =
typename TTuned::template TContainer<TElem, TRest...>;
diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
index 4c85bef6ec..5f91f1b5a8 100644
--- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
+++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h
@@ -469,7 +469,7 @@ namespace NThreading {
DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);
DeclareTuneTypeParam(TObstructiveQueueAux, TAux);
- template <typename TItem = void, typename... TParams>
+ template <typename TItem = void, typename... TParams>
class TObstructiveConsumerAuxQueue {
private:
using TTuned =
@@ -522,7 +522,7 @@ namespace NThreading {
template <typename TItem = void, bool DeleteItems = true>
class TObstructiveConsumerQueue
- : public TObstructiveConsumerAuxQueue<TItem,
- TObstructiveQueueDeleteItems<DeleteItems>> {
+ : public TObstructiveConsumerAuxQueue<TItem,
+ TObstructiveQueueDeleteItems<DeleteItems>> {
};
-}
+}
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp
index a55f952cbc..80eca147da 100644
--- a/library/cpp/threading/queue/queue_ut.cpp
+++ b/library/cpp/threading/queue/queue_ut.cpp
@@ -12,7 +12,7 @@ private:
UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
- /*
+ /*
UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h
index 1072342620..50fc3dc17c 100644
--- a/library/cpp/threading/queue/tune.h
+++ b/library/cpp/threading/queue/tune.h
@@ -96,14 +96,14 @@
}; \
}
-#define DeclareTuneContainer(TParamName, InternalName) \
- template <template <typename, typename...> class TNewContainer> \
- struct TParamName { \
- template <typename TBase> \
- struct TApply: public TBase { \
- template <typename TElem, typename... TRest> \
- using InternalName = TNewContainer<TElem, TRest...>; \
- }; \
+#define DeclareTuneContainer(TParamName, InternalName) \
+ template <template <typename, typename...> class TNewContainer> \
+ struct TParamName { \
+ template <typename TBase> \
+ struct TApply: public TBase { \
+ template <typename TElem, typename... TRest> \
+ using InternalName = TNewContainer<TElem, TRest...>; \
+ }; \
}
namespace NTunePrivate {
diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp
index 49ebd4a1cf..a43b7f520e 100644
--- a/library/cpp/threading/queue/unordered_ut.cpp
+++ b/library/cpp/threading/queue/unordered_ut.cpp
@@ -59,9 +59,9 @@ public:
class TWorker: public ISimpleThread {
public:
TWorker(
- TQueueType* queues_,
- ui16 mine,
- TAtomic* pushDone)
+ TQueueType* queues_,
+ ui16 mine,
+ TAtomic* pushDone)
: Queues(queues_)
, MineQueue(mine)
, PushDone(pushDone)
@@ -132,7 +132,7 @@ public:
for (ui32 i = 0; i < COUNT; ++i) {
workers[i]->Join();
all.insert(all.begin(),
- workers[i]->Received.begin(), workers[i]->Received.end());
+ workers[i]->Received.begin(), workers[i]->Received.end());
}
std::sort(all.begin(), all.end());
diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h
index b017763794..2756b52601 100644
--- a/library/cpp/threading/queue/ut_helpers.h
+++ b/library/cpp/threading/queue/ut_helpers.h
@@ -13,11 +13,11 @@ struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {
};
struct TBasicObstructiveConsumer
- : public NThreading::TObstructiveConsumerQueue<> {
+ : public NThreading::TObstructiveConsumerQueue<> {
};
struct TBasicMPSCIntrusiveUnordered
- : public NThreading::TMPSCIntrusiveUnordered {
+ : public NThreading::TMPSCIntrusiveUnordered {
};
struct TIntrusiveLink: public NThreading::TIntrusiveNode {
@@ -30,11 +30,11 @@ struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {
}
};
-#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate) \
- UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>); \
- UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \
+#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate) \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>); \
+ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \
UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>)
-#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate) \
+#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate) \
UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \
UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>);
diff --git a/library/cpp/threading/skip_list/compare.h b/library/cpp/threading/skip_list/compare.h
index 336582a1b8..ac98b3e1ce 100644
--- a/library/cpp/threading/skip_list/compare.h
+++ b/library/cpp/threading/skip_list/compare.h
@@ -4,74 +4,74 @@
#include <util/str_stl.h>
namespace NThreading {
- namespace NImpl {
- Y_HAS_MEMBER(compare);
- Y_HAS_MEMBER(Compare);
+ namespace NImpl {
+ Y_HAS_MEMBER(compare);
+ Y_HAS_MEMBER(Compare);
template <typename T>
- inline int CompareImpl(const T& l, const T& r) {
- if (l < r) {
- return -1;
- } else if (r < l) {
- return +1;
- } else {
- return 0;
- }
+ inline int CompareImpl(const T& l, const T& r) {
+ if (l < r) {
+ return -1;
+ } else if (r < l) {
+ return +1;
+ } else {
+ return 0;
+ }
}
- template <bool val>
- struct TSmallCompareSelector {
- template <typename T>
- static inline int Compare(const T& l, const T& r) {
- return CompareImpl(l, r);
- }
- };
+ template <bool val>
+ struct TSmallCompareSelector {
+ template <typename T>
+ static inline int Compare(const T& l, const T& r) {
+ return CompareImpl(l, r);
+ }
+ };
- template <>
- struct TSmallCompareSelector<true> {
- template <typename T>
- static inline int Compare(const T& l, const T& r) {
- return l.compare(r);
- }
- };
+ template <>
+ struct TSmallCompareSelector<true> {
+ template <typename T>
+ static inline int Compare(const T& l, const T& r) {
+ return l.compare(r);
+ }
+ };
- template <bool val>
- struct TBigCompareSelector {
- template <typename T>
- static inline int Compare(const T& l, const T& r) {
+ template <bool val>
+ struct TBigCompareSelector {
+ template <typename T>
+ static inline int Compare(const T& l, const T& r) {
return TSmallCompareSelector<THascompare<T>::value>::Compare(l, r);
- }
- };
-
- template <>
- struct TBigCompareSelector<true> {
- template <typename T>
- static inline int Compare(const T& l, const T& r) {
- return l.Compare(r);
- }
- };
-
+ }
+ };
+
+ template <>
+ struct TBigCompareSelector<true> {
+ template <typename T>
+ static inline int Compare(const T& l, const T& r) {
+ return l.Compare(r);
+ }
+ };
+
template <typename T>
struct TCompareSelector: public TBigCompareSelector<THasCompare<T>::value> {
- };
- }
+ };
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Generic compare function
- ////////////////////////////////////////////////////////////////////////////////
- // Generic compare function
-
template <typename T>
- inline int Compare(const T& l, const T& r) {
- return NImpl::TCompareSelector<T>::Compare(l, r);
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- // Generic compare functor
-
- template <typename T>
- struct TCompare {
- inline int operator()(const T& l, const T& r) const {
- return Compare(l, r);
- }
+ inline int Compare(const T& l, const T& r) {
+ return NImpl::TCompareSelector<T>::Compare(l, r);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Generic compare functor
+
+ template <typename T>
+ struct TCompare {
+ inline int operator()(const T& l, const T& r) const {
+ return Compare(l, r);
+ }
};
}
diff --git a/library/cpp/threading/skip_list/perf/main.cpp b/library/cpp/threading/skip_list/perf/main.cpp
index d722d43436..4ad52049e7 100644
--- a/library/cpp/threading/skip_list/perf/main.cpp
+++ b/library/cpp/threading/skip_list/perf/main.cpp
@@ -14,345 +14,345 @@
#include <util/system/thread.h>
namespace {
- using namespace NThreading;
+ using namespace NThreading;
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
IOutputStream& LogInfo() {
- return Cerr << TInstant::Now() << " INFO: ";
- }
+ return Cerr << TInstant::Now() << " INFO: ";
+ }
IOutputStream& LogError() {
- return Cerr << TInstant::Now() << " ERROR: ";
- }
-
- ////////////////////////////////////////////////////////////////////////////////
-
- struct TListItem {
- TStringBuf Key;
- TStringBuf Value;
-
- TListItem(const TStringBuf& key, const TStringBuf& value)
- : Key(key)
- , Value(value)
- {
- }
-
- int Compare(const TListItem& other) const {
- return Key.compare(other.Key);
- }
- };
-
- using TListType = TSkipList<TListItem>;
-
- ////////////////////////////////////////////////////////////////////////////////
-
- class TRandomData {
- private:
- TVector<char> Buffer;
-
- public:
- TRandomData()
- : Buffer(1024 * 1024)
- {
- for (size_t i = 0; i < Buffer.size(); ++i) {
- Buffer[i] = RandomNumber<char>();
- }
- }
-
- TStringBuf GetString(size_t len) const {
- size_t start = RandomNumber(Buffer.size() - len);
- return TStringBuf(&Buffer[start], len);
+ return Cerr << TInstant::Now() << " ERROR: ";
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ struct TListItem {
+ TStringBuf Key;
+ TStringBuf Value;
+
+ TListItem(const TStringBuf& key, const TStringBuf& value)
+ : Key(key)
+ , Value(value)
+ {
+ }
+
+ int Compare(const TListItem& other) const {
+ return Key.compare(other.Key);
+ }
+ };
+
+ using TListType = TSkipList<TListItem>;
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ class TRandomData {
+ private:
+ TVector<char> Buffer;
+
+ public:
+ TRandomData()
+ : Buffer(1024 * 1024)
+ {
+ for (size_t i = 0; i < Buffer.size(); ++i) {
+ Buffer[i] = RandomNumber<char>();
+ }
+ }
+
+ TStringBuf GetString(size_t len) const {
+ size_t start = RandomNumber(Buffer.size() - len);
+ return TStringBuf(&Buffer[start], len);
+ }
+
+ TStringBuf GetString(size_t min, size_t max) const {
+ return GetString(min + RandomNumber(max - min));
}
+ };
- TStringBuf GetString(size_t min, size_t max) const {
- return GetString(min + RandomNumber(max - min));
- }
- };
-
- ////////////////////////////////////////////////////////////////////////////////
-
- class TWorkerThread: public ISimpleThread {
- private:
- std::function<void()> Func;
- TDuration Time;
-
- public:
- TWorkerThread(std::function<void()> func)
- : Func(func)
- {
- }
-
- TDuration GetTime() const {
- return Time;
- }
-
- private:
- void* ThreadProc() noexcept override {
- TInstant started = TInstant::Now();
- Func();
- Time = TInstant::Now() - started;
- return nullptr;
- }
- };
-
- inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) {
- TAutoPtr<TWorkerThread> thread = new TWorkerThread(func);
- thread->Start();
- return thread;
+ ////////////////////////////////////////////////////////////////////////////////
+
+ class TWorkerThread: public ISimpleThread {
+ private:
+ std::function<void()> Func;
+ TDuration Time;
+
+ public:
+ TWorkerThread(std::function<void()> func)
+ : Func(func)
+ {
+ }
+
+ TDuration GetTime() const {
+ return Time;
+ }
+
+ private:
+ void* ThreadProc() noexcept override {
+ TInstant started = TInstant::Now();
+ Func();
+ Time = TInstant::Now() - started;
+ return nullptr;
+ }
+ };
+
+ inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) {
+ TAutoPtr<TWorkerThread> thread = new TWorkerThread(func);
+ thread->Start();
+ return thread;
}
- ////////////////////////////////////////////////////////////////////////////////
-
- typedef std::function<void()> TTestFunc;
-
- struct TTest {
- TString Name;
- TTestFunc Func;
-
- TTest() {
- }
-
- TTest(const TString& name, const TTestFunc& func)
- : Name(name)
- , Func(func)
- {
- }
- };
-
- ////////////////////////////////////////////////////////////////////////////////
-
- class TTestSuite {
- private:
- size_t Iterations = 1000000;
- size_t KeyLen = 10;
- size_t ValueLen = 100;
- size_t NumReaders = 4;
- size_t NumWriters = 1;
- size_t BatchSize = 20;
-
- TMemoryPool MemoryPool;
- TListType List;
- TMutex Mutex;
- TRandomData Random;
-
- TMap<TCiString, TTest> AllTests;
- TVector<TTest> Tests;
-
- public:
- TTestSuite()
- : MemoryPool(64 * 1024)
- , List(MemoryPool)
- {
- }
-
- bool Init(int argc, const char* argv[]) {
- TVector<TString> tests;
- try {
- NLastGetopt::TOpts opts;
- opts.AddHelpOption();
-
-#define OPTION(opt, x) \
- opts.AddLongOption(opt, #x) \
- .Optional() \
- .DefaultValue(ToString(x)) \
- .StoreResult(&x) // end of OPTION
-
- OPTION('i', Iterations);
- OPTION('k', KeyLen);
- OPTION('v', ValueLen);
- OPTION('r', NumReaders);
- OPTION('w', NumWriters);
- OPTION('b', BatchSize);
+ ////////////////////////////////////////////////////////////////////////////////
+
+ typedef std::function<void()> TTestFunc;
+
+ struct TTest {
+ TString Name;
+ TTestFunc Func;
+
+ TTest() {
+ }
+
+ TTest(const TString& name, const TTestFunc& func)
+ : Name(name)
+ , Func(func)
+ {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ class TTestSuite {
+ private:
+ size_t Iterations = 1000000;
+ size_t KeyLen = 10;
+ size_t ValueLen = 100;
+ size_t NumReaders = 4;
+ size_t NumWriters = 1;
+ size_t BatchSize = 20;
+
+ TMemoryPool MemoryPool;
+ TListType List;
+ TMutex Mutex;
+ TRandomData Random;
+
+ TMap<TCiString, TTest> AllTests;
+ TVector<TTest> Tests;
+
+ public:
+ TTestSuite()
+ : MemoryPool(64 * 1024)
+ , List(MemoryPool)
+ {
+ }
+
+ bool Init(int argc, const char* argv[]) {
+ TVector<TString> tests;
+ try {
+ NLastGetopt::TOpts opts;
+ opts.AddHelpOption();
+
+#define OPTION(opt, x) \
+ opts.AddLongOption(opt, #x) \
+ .Optional() \
+ .DefaultValue(ToString(x)) \
+ .StoreResult(&x) // end of OPTION
+
+ OPTION('i', Iterations);
+ OPTION('k', KeyLen);
+ OPTION('v', ValueLen);
+ OPTION('r', NumReaders);
+ OPTION('w', NumWriters);
+ OPTION('b', BatchSize);
#undef OPTION
- NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv);
- for (const auto& opt : opts.Opts_) {
- const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true);
- if (r) {
- LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl;
- }
+ NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv);
+ for (const auto& opt : opts.Opts_) {
+ const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true);
+ if (r) {
+ LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl;
+ }
}
- tests = optsRes.GetFreeArgs();
- } catch (...) {
- LogError() << CurrentExceptionMessage() << Endl;
- return false;
+ tests = optsRes.GetFreeArgs();
+ } catch (...) {
+ LogError() << CurrentExceptionMessage() << Endl;
+ return false;
}
#define TEST(type) \
- AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST
+ AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST
- TEST(Clear);
- TEST(InsertRandom);
- TEST(InsertSequential);
- TEST(InsertSequentialSimple);
- TEST(LookupRandom);
- TEST(Concurrent);
+ TEST(Clear);
+ TEST(InsertRandom);
+ TEST(InsertSequential);
+ TEST(InsertSequentialSimple);
+ TEST(LookupRandom);
+ TEST(Concurrent);
#undef TEST
- if (tests.empty()) {
- LogError() << "no tests specified, choose from: " << PrintTests() << Endl;
+ if (tests.empty()) {
+ LogError() << "no tests specified, choose from: " << PrintTests() << Endl;
return false;
}
-
- for (size_t i = 0; i < tests.size(); ++i) {
+
+ for (size_t i = 0; i < tests.size(); ++i) {
if (!AllTests.contains(tests[i])) {
- LogError() << "unknown test name: " << tests[i] << Endl;
- return false;
- }
- Tests.push_back(AllTests[tests[i]]);
- }
-
- return true;
+ LogError() << "unknown test name: " << tests[i] << Endl;
+ return false;
+ }
+ Tests.push_back(AllTests[tests[i]]);
+ }
+
+ return true;
}
- void Run() {
+ void Run() {
#if !defined(NDEBUG)
- LogInfo() << "*** DEBUG build! ***" << Endl;
+ LogInfo() << "*** DEBUG build! ***" << Endl;
#endif
- for (const TTest& test : Tests) {
- LogInfo() << "Starting test " << test.Name << Endl;
-
- TInstant started = TInstant::Now();
- try {
- test.Func();
- } catch (...) {
- LogError() << "test " << test.Name
- << " failed: " << CurrentExceptionMessage()
- << Endl;
- }
-
- LogInfo() << "List size = " << List.GetSize() << Endl;
-
- TDuration duration = TInstant::Now() - started;
- LogInfo() << "test " << test.Name
- << " duration: " << duration
- << " (" << (double)duration.MicroSeconds() / (Iterations * NumWriters) << "us per iteration)"
- << Endl;
- LogInfo() << "Finished test " << test.Name << Endl;
- }
+ for (const TTest& test : Tests) {
+ LogInfo() << "Starting test " << test.Name << Endl;
+
+ TInstant started = TInstant::Now();
+ try {
+ test.Func();
+ } catch (...) {
+ LogError() << "test " << test.Name
+ << " failed: " << CurrentExceptionMessage()
+ << Endl;
+ }
+
+ LogInfo() << "List size = " << List.GetSize() << Endl;
+
+ TDuration duration = TInstant::Now() - started;
+ LogInfo() << "test " << test.Name
+ << " duration: " << duration
+ << " (" << (double)duration.MicroSeconds() / (Iterations * NumWriters) << "us per iteration)"
+ << Endl;
+ LogInfo() << "Finished test " << test.Name << Endl;
+ }
+ }
+
+ private:
+ void AddTest(const char* name, TTestFunc func) {
+ AllTests[name] = TTest(name, func);
}
- private:
- void AddTest(const char* name, TTestFunc func) {
- AllTests[name] = TTest(name, func);
- }
-
- TString PrintTests() const {
- TVector<TString> names;
- for (const auto& it : AllTests) {
- names.push_back(it.first);
- }
- return JoinSeq(", ", names);
+ TString PrintTests() const {
+ TVector<TString> names;
+ for (const auto& it : AllTests) {
+ names.push_back(it.first);
+ }
+ return JoinSeq(", ", names);
}
- void TEST_Clear() {
- List.Clear();
- }
+ void TEST_Clear() {
+ List.Clear();
+ }
- void TEST_InsertRandom() {
- for (size_t i = 0; i < Iterations; ++i) {
- List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));
- }
+ void TEST_InsertRandom() {
+ for (size_t i = 0; i < Iterations; ++i) {
+ List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));
+ }
}
- void TEST_InsertSequential() {
- TString key;
- for (size_t i = 0; i < Iterations;) {
- key.assign(Random.GetString(KeyLen));
- size_t batch = BatchSize / 2 + RandomNumber(BatchSize);
- for (size_t j = 0; j < batch; ++j, ++i) {
- key.resize(KeyLen - 1);
- key.append((char)j);
- List.Insert(TListItem(key, Random.GetString(ValueLen)));
- }
+ void TEST_InsertSequential() {
+ TString key;
+ for (size_t i = 0; i < Iterations;) {
+ key.assign(Random.GetString(KeyLen));
+ size_t batch = BatchSize / 2 + RandomNumber(BatchSize);
+ for (size_t j = 0; j < batch; ++j, ++i) {
+ key.resize(KeyLen - 1);
+ key.append((char)j);
+ List.Insert(TListItem(key, Random.GetString(ValueLen)));
+ }
}
}
- void TEST_InsertSequentialSimple() {
- for (size_t i = 0; i < Iterations; ++i) {
- List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));
- }
+ void TEST_InsertSequentialSimple() {
+ for (size_t i = 0; i < Iterations; ++i) {
+ List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));
+ }
}
- void TEST_LookupRandom() {
- for (size_t i = 0; i < Iterations; ++i) {
- List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));
- }
+ void TEST_LookupRandom() {
+ for (size_t i = 0; i < Iterations; ++i) {
+ List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));
+ }
}
- void TEST_Concurrent() {
- LogInfo() << "starting producers..." << Endl;
-
- TVector<TAutoPtr<TWorkerThread>> producers(NumWriters);
- for (size_t i1 = 0; i1 < producers.size(); ++i1) {
- producers[i1] = StartThread([&] {
- TInstant started = TInstant::Now();
- for (size_t i2 = 0; i2 < Iterations; ++i2) {
- {
- TGuard<TMutex> guard(Mutex);
- List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));
- }
+ void TEST_Concurrent() {
+ LogInfo() << "starting producers..." << Endl;
+
+ TVector<TAutoPtr<TWorkerThread>> producers(NumWriters);
+ for (size_t i1 = 0; i1 < producers.size(); ++i1) {
+ producers[i1] = StartThread([&] {
+ TInstant started = TInstant::Now();
+ for (size_t i2 = 0; i2 < Iterations; ++i2) {
+ {
+ TGuard<TMutex> guard(Mutex);
+ List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));
+ }
+ }
+ TDuration duration = TInstant::Now() - started;
+ LogInfo()
+ << "Average time for producer = "
+ << (double)duration.MicroSeconds() / Iterations << "us per iteration"
+ << Endl;
+ });
+ }
+
+ LogInfo() << "starting consumers..." << Endl;
+
+ TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders);
+ for (size_t i1 = 0; i1 < consumers.size(); ++i1) {
+ consumers[i1] = StartThread([&] {
+ TInstant started = TInstant::Now();
+ for (size_t i2 = 0; i2 < Iterations; ++i2) {
+ List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));
}
- TDuration duration = TInstant::Now() - started;
- LogInfo()
- << "Average time for producer = "
- << (double)duration.MicroSeconds() / Iterations << "us per iteration"
- << Endl;
- });
- }
-
- LogInfo() << "starting consumers..." << Endl;
-
- TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders);
- for (size_t i1 = 0; i1 < consumers.size(); ++i1) {
- consumers[i1] = StartThread([&] {
- TInstant started = TInstant::Now();
- for (size_t i2 = 0; i2 < Iterations; ++i2) {
- List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));
- }
- TDuration duration = TInstant::Now() - started;
- LogInfo()
- << "Average time for consumer = "
- << (double)duration.MicroSeconds() / Iterations << "us per iteration"
- << Endl;
- });
- }
-
- LogInfo() << "wait for producers..." << Endl;
-
- TDuration producerTime;
- for (size_t i = 0; i < producers.size(); ++i) {
- producers[i]->Join();
- producerTime += producers[i]->GetTime();
- }
-
- LogInfo() << "wait for consumers..." << Endl;
-
- TDuration consumerTime;
- for (size_t i = 0; i < consumers.size(); ++i) {
- consumers[i]->Join();
- consumerTime += consumers[i]->GetTime();
- }
-
- LogInfo() << "average producer time: "
- << producerTime.SecondsFloat() / producers.size() << " seconds"
- << Endl;
-
- LogInfo() << "average consumer time: "
- << consumerTime.SecondsFloat() / consumers.size() << " seconds"
- << Endl;
- }
- };
-
-}
+ TDuration duration = TInstant::Now() - started;
+ LogInfo()
+ << "Average time for consumer = "
+ << (double)duration.MicroSeconds() / Iterations << "us per iteration"
+ << Endl;
+ });
+ }
+
+ LogInfo() << "wait for producers..." << Endl;
+
+ TDuration producerTime;
+ for (size_t i = 0; i < producers.size(); ++i) {
+ producers[i]->Join();
+ producerTime += producers[i]->GetTime();
+ }
+
+ LogInfo() << "wait for consumers..." << Endl;
+
+ TDuration consumerTime;
+ for (size_t i = 0; i < consumers.size(); ++i) {
+ consumers[i]->Join();
+ consumerTime += consumers[i]->GetTime();
+ }
+
+ LogInfo() << "average producer time: "
+ << producerTime.SecondsFloat() / producers.size() << " seconds"
+ << Endl;
+
+ LogInfo() << "average consumer time: "
+ << consumerTime.SecondsFloat() / consumers.size() << " seconds"
+ << Endl;
+ }
+ };
+
+}
////////////////////////////////////////////////////////////////////////////////
-int main(int argc, const char* argv[]) {
+int main(int argc, const char* argv[]) {
TTestSuite suite;
if (!suite.Init(argc, argv)) {
return -1;
diff --git a/library/cpp/threading/skip_list/skiplist.h b/library/cpp/threading/skip_list/skiplist.h
index c1ed46c4aa..914a7c6ee7 100644
--- a/library/cpp/threading/skip_list/skiplist.h
+++ b/library/cpp/threading/skip_list/skiplist.h
@@ -10,399 +10,399 @@
#include <util/system/atomic.h>
namespace NThreading {
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- class TNopCounter {
- protected:
- template <typename T>
- void OnInsert(const T&) {
- }
+ class TNopCounter {
+ protected:
+ template <typename T>
+ void OnInsert(const T&) {
+ }
- template <typename T>
- void OnUpdate(const T&) {
- }
+ template <typename T>
+ void OnUpdate(const T&) {
+ }
- void Reset() {
- }
- };
+ void Reset() {
+ }
+ };
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
- class TSizeCounter {
+ class TSizeCounter {
private:
- size_t Size;
+ size_t Size;
public:
- TSizeCounter()
- : Size(0)
+ TSizeCounter()
+ : Size(0)
{
}
- size_t GetSize() const {
- return Size;
+ size_t GetSize() const {
+ return Size;
}
- protected:
- template <typename T>
- void OnInsert(const T&) {
- ++Size;
+ protected:
+ template <typename T>
+ void OnInsert(const T&) {
+ ++Size;
}
- template <typename T>
- void OnUpdate(const T&) {
+ template <typename T>
+ void OnUpdate(const T&) {
}
- void Reset() {
- Size = 0;
+ void Reset() {
+ Size = 0;
}
};
- ////////////////////////////////////////////////////////////////////////////////
- // Append-only concurrent skip-list
- //
- // Readers do not require any synchronization.
- // Writers should be externally synchronized.
- // Nodes will be allocated using TMemoryPool instance.
-
- template <
- typename T,
- typename TComparer = TCompare<T>,
- typename TAllocator = TMemoryPool,
- typename TCounter = TSizeCounter,
- int MaxHeight = 12,
- int Branching = 4>
- class TSkipList: public TCounter, private TNonCopyable {
- class TNode {
- private:
- T Value; // should be immutable after insert
- TNode* Next[]; // variable-size array maximum of MaxHeight values
-
- public:
+ ////////////////////////////////////////////////////////////////////////////////
+ // Append-only concurrent skip-list
+ //
+ // Readers do not require any synchronization.
+ // Writers should be externally synchronized.
+ // Nodes will be allocated using TMemoryPool instance.
+
+ template <
+ typename T,
+ typename TComparer = TCompare<T>,
+ typename TAllocator = TMemoryPool,
+ typename TCounter = TSizeCounter,
+ int MaxHeight = 12,
+ int Branching = 4>
+ class TSkipList: public TCounter, private TNonCopyable {
+ class TNode {
+ private:
+ T Value; // should be immutable after insert
+ TNode* Next[]; // variable-size array maximum of MaxHeight values
+
+ public:
TNode(T&& value)
: Value(std::move(value))
- {
- Y_UNUSED(Next);
- }
-
- const T& GetValue() const {
- return Value;
- }
-
- T& GetValue() {
- return Value;
- }
-
- TNode* GetNext(int height) const {
- return AtomicGet(Next[height]);
- }
-
- void Link(int height, TNode** prev) {
- for (int i = 0; i < height; ++i) {
- Next[i] = prev[i]->Next[i];
- AtomicSet(prev[i]->Next[i], this);
- }
- }
- };
-
+ {
+ Y_UNUSED(Next);
+ }
+
+ const T& GetValue() const {
+ return Value;
+ }
+
+ T& GetValue() {
+ return Value;
+ }
+
+ TNode* GetNext(int height) const {
+ return AtomicGet(Next[height]);
+ }
+
+ void Link(int height, TNode** prev) {
+ for (int i = 0; i < height; ++i) {
+ Next[i] = prev[i]->Next[i];
+ AtomicSet(prev[i]->Next[i], this);
+ }
+ }
+ };
+
public:
- class TIterator {
- private:
- const TSkipList* List;
- const TNode* Node;
-
- public:
- TIterator()
- : List(nullptr)
- , Node(nullptr)
- {
- }
-
- TIterator(const TSkipList* list, const TNode* node)
- : List(list)
- , Node(node)
- {
- }
-
- TIterator(const TIterator& other)
- : List(other.List)
- , Node(other.Node)
- {
- }
-
- TIterator& operator=(const TIterator& other) {
- List = other.List;
- Node = other.Node;
- return *this;
- }
-
- void Next() {
- Node = Node ? Node->GetNext(0) : nullptr;
+ class TIterator {
+ private:
+ const TSkipList* List;
+ const TNode* Node;
+
+ public:
+ TIterator()
+ : List(nullptr)
+ , Node(nullptr)
+ {
+ }
+
+ TIterator(const TSkipList* list, const TNode* node)
+ : List(list)
+ , Node(node)
+ {
+ }
+
+ TIterator(const TIterator& other)
+ : List(other.List)
+ , Node(other.Node)
+ {
}
- // much less efficient than Next as our list is single-linked
- void Prev() {
- if (Node) {
- TNode* node = List->FindLessThan(Node->GetValue(), nullptr);
- Node = (node != List->Head ? node : nullptr);
- }
- }
+ TIterator& operator=(const TIterator& other) {
+ List = other.List;
+ Node = other.Node;
+ return *this;
+ }
- void Reset() {
- Node = nullptr;
- }
+ void Next() {
+ Node = Node ? Node->GetNext(0) : nullptr;
+ }
- bool IsValid() const {
- return Node != nullptr;
- }
+ // much less efficient than Next as our list is single-linked
+ void Prev() {
+ if (Node) {
+ TNode* node = List->FindLessThan(Node->GetValue(), nullptr);
+ Node = (node != List->Head ? node : nullptr);
+ }
+ }
+
+ void Reset() {
+ Node = nullptr;
+ }
+
+ bool IsValid() const {
+ return Node != nullptr;
+ }
- const T& GetValue() const {
- Y_ASSERT(IsValid());
- return Node->GetValue();
- }
- };
+ const T& GetValue() const {
+ Y_ASSERT(IsValid());
+ return Node->GetValue();
+ }
+ };
- private:
- TAllocator& Allocator;
- TComparer Comparer;
+ private:
+ TAllocator& Allocator;
+ TComparer Comparer;
- TNode* Head;
- TAtomic Height;
- TCounter Counter;
+ TNode* Head;
+ TAtomic Height;
+ TCounter Counter;
- TNode* Prev[MaxHeight];
+ TNode* Prev[MaxHeight];
- template <typename TValue>
+ template <typename TValue>
using TComparerReturnType = std::invoke_result_t<TComparer, const T&, const TValue&>;
- public:
- TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer())
- : Allocator(allocator)
- , Comparer(comparer)
- {
- Init();
- }
-
- ~TSkipList() {
- CallDtors();
- }
-
- void Clear() {
- CallDtors();
- Allocator.ClearKeepFirstChunk();
- Init();
+ public:
+ TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer())
+ : Allocator(allocator)
+ , Comparer(comparer)
+ {
+ Init();
+ }
+
+ ~TSkipList() {
+ CallDtors();
+ }
+
+ void Clear() {
+ CallDtors();
+ Allocator.ClearKeepFirstChunk();
+ Init();
}
bool Insert(T value) {
- TNode* node = PrepareInsert(value);
- if (Y_UNLIKELY(node && Compare(node, value) == 0)) {
- // we do not allow duplicates
- return false;
+ TNode* node = PrepareInsert(value);
+ if (Y_UNLIKELY(node && Compare(node, value) == 0)) {
+ // we do not allow duplicates
+ return false;
}
node = DoInsert(std::move(value));
- TCounter::OnInsert(node->GetValue());
- return true;
+ TCounter::OnInsert(node->GetValue());
+ return true;
}
- template <typename TInsertAction, typename TUpdateAction>
- bool Insert(const T& value, TInsertAction insert, TUpdateAction update) {
- TNode* node = PrepareInsert(value);
- if (Y_UNLIKELY(node && Compare(node, value) == 0)) {
- if (update(node->GetValue())) {
- TCounter::OnUpdate(node->GetValue());
- return true;
- }
- // we do not allow duplicates
- return false;
- }
- node = DoInsert(insert(value));
- TCounter::OnInsert(node->GetValue());
- return true;
- }
-
- template <typename TValue>
- bool Contains(const TValue& value) const {
- TNode* node = FindGreaterThanOrEqual(value);
- return node && Compare(node, value) == 0;
- }
-
- TIterator SeekToFirst() const {
- return TIterator(this, FindFirst());
- }
-
- TIterator SeekToLast() const {
- TNode* last = FindLast();
- return TIterator(this, last != Head ? last : nullptr);
- }
-
- template <typename TValue>
- TIterator SeekTo(const TValue& value) const {
- return TIterator(this, FindGreaterThanOrEqual(value));
+ template <typename TInsertAction, typename TUpdateAction>
+ bool Insert(const T& value, TInsertAction insert, TUpdateAction update) {
+ TNode* node = PrepareInsert(value);
+ if (Y_UNLIKELY(node && Compare(node, value) == 0)) {
+ if (update(node->GetValue())) {
+ TCounter::OnUpdate(node->GetValue());
+ return true;
+ }
+ // we do not allow duplicates
+ return false;
+ }
+ node = DoInsert(insert(value));
+ TCounter::OnInsert(node->GetValue());
+ return true;
+ }
+
+ template <typename TValue>
+ bool Contains(const TValue& value) const {
+ TNode* node = FindGreaterThanOrEqual(value);
+ return node && Compare(node, value) == 0;
+ }
+
+ TIterator SeekToFirst() const {
+ return TIterator(this, FindFirst());
+ }
+
+ TIterator SeekToLast() const {
+ TNode* last = FindLast();
+ return TIterator(this, last != Head ? last : nullptr);
+ }
+
+ template <typename TValue>
+ TIterator SeekTo(const TValue& value) const {
+ return TIterator(this, FindGreaterThanOrEqual(value));
}
- private:
- static int RandomHeight() {
- int height = 1;
- while (height < MaxHeight && (RandomNumber<unsigned int>() % Branching) == 0) {
- ++height;
- }
- return height;
- }
-
- void Init() {
- Head = AllocateRootNode();
- Height = 1;
- TCounter::Reset();
-
- for (int i = 0; i < MaxHeight; ++i) {
- Prev[i] = Head;
- }
+ private:
+ static int RandomHeight() {
+ int height = 1;
+ while (height < MaxHeight && (RandomNumber<unsigned int>() % Branching) == 0) {
+ ++height;
+ }
+ return height;
}
- void CallDtors() {
- if (!TTypeTraits<T>::IsPod) {
- // we should explicitly call destructors for our nodes
- TNode* node = Head->GetNext(0);
- while (node) {
- TNode* next = node->GetNext(0);
- node->~TNode();
- node = next;
- }
+ void Init() {
+ Head = AllocateRootNode();
+ Height = 1;
+ TCounter::Reset();
+
+ for (int i = 0; i < MaxHeight; ++i) {
+ Prev[i] = Head;
}
}
- TNode* AllocateRootNode() {
- size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight;
- void* buffer = Allocator.Allocate(size);
- memset(buffer, 0, size);
- return static_cast<TNode*>(buffer);
- }
+ void CallDtors() {
+ if (!TTypeTraits<T>::IsPod) {
+ // we should explicitly call destructors for our nodes
+ TNode* node = Head->GetNext(0);
+ while (node) {
+ TNode* next = node->GetNext(0);
+ node->~TNode();
+ node = next;
+ }
+ }
+ }
+
+ TNode* AllocateRootNode() {
+ size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight;
+ void* buffer = Allocator.Allocate(size);
+ memset(buffer, 0, size);
+ return static_cast<TNode*>(buffer);
+ }
TNode* AllocateNode(T&& value, int height) {
- size_t size = sizeof(TNode) + sizeof(TNode*) * height;
- void* buffer = Allocator.Allocate(size);
- memset(buffer, 0, size);
+ size_t size = sizeof(TNode) + sizeof(TNode*) * height;
+ void* buffer = Allocator.Allocate(size);
+ memset(buffer, 0, size);
return new (buffer) TNode(std::move(value));
- }
-
- TNode* FindFirst() const {
- return Head->GetNext(0);
- }
-
- TNode* FindLast() const {
- TNode* node = Head;
- int height = AtomicGet(Height) - 1;
-
- while (true) {
- TNode* next = node->GetNext(height);
- if (next) {
- node = next;
- continue;
- }
-
- if (height) {
- --height;
- } else {
- return node;
- }
+ }
+
+ TNode* FindFirst() const {
+ return Head->GetNext(0);
+ }
+
+ TNode* FindLast() const {
+ TNode* node = Head;
+ int height = AtomicGet(Height) - 1;
+
+ while (true) {
+ TNode* next = node->GetNext(height);
+ if (next) {
+ node = next;
+ continue;
+ }
+
+ if (height) {
+ --height;
+ } else {
+ return node;
+ }
}
}
- template <typename TValue>
- TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const {
- return Comparer(node->GetValue(), value);
- }
-
- template <typename TValue>
- TNode* FindLessThan(const TValue& value, TNode** links) const {
- TNode* node = Head;
- int height = AtomicGet(Height) - 1;
-
- TNode* prev = nullptr;
- while (true) {
- TNode* next = node->GetNext(height);
- if (next && next != prev) {
- TComparerReturnType<TValue> cmp = Compare(next, value);
- if (cmp < 0) {
- node = next;
- continue;
- }
+ template <typename TValue>
+ TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const {
+ return Comparer(node->GetValue(), value);
+ }
+
+ template <typename TValue>
+ TNode* FindLessThan(const TValue& value, TNode** links) const {
+ TNode* node = Head;
+ int height = AtomicGet(Height) - 1;
+
+ TNode* prev = nullptr;
+ while (true) {
+ TNode* next = node->GetNext(height);
+ if (next && next != prev) {
+ TComparerReturnType<TValue> cmp = Compare(next, value);
+ if (cmp < 0) {
+ node = next;
+ continue;
+ }
}
- if (links) {
- // collect links from upper levels
- links[height] = node;
- }
-
- if (height) {
- prev = next;
- --height;
- } else {
- return node;
- }
+ if (links) {
+ // collect links from upper levels
+ links[height] = node;
+ }
+
+ if (height) {
+ prev = next;
+ --height;
+ } else {
+ return node;
+ }
}
}
- template <typename TValue>
- TNode* FindGreaterThanOrEqual(const TValue& value) const {
- TNode* node = Head;
- int height = AtomicGet(Height) - 1;
-
- TNode* prev = nullptr;
- while (true) {
- TNode* next = node->GetNext(height);
- if (next && next != prev) {
- TComparerReturnType<TValue> cmp = Compare(next, value);
- if (cmp < 0) {
- node = next;
- continue;
- }
- if (cmp == 0) {
- return next;
- }
+ template <typename TValue>
+ TNode* FindGreaterThanOrEqual(const TValue& value) const {
+ TNode* node = Head;
+ int height = AtomicGet(Height) - 1;
+
+ TNode* prev = nullptr;
+ while (true) {
+ TNode* next = node->GetNext(height);
+ if (next && next != prev) {
+ TComparerReturnType<TValue> cmp = Compare(next, value);
+ if (cmp < 0) {
+ node = next;
+ continue;
+ }
+ if (cmp == 0) {
+ return next;
+ }
}
-
- if (height) {
- prev = next;
- --height;
- } else {
+
+ if (height) {
+ prev = next;
+ --height;
+ } else {
return next;
}
}
- }
+ }
- TNode* PrepareInsert(const T& value) {
- TNode* prev = Prev[0];
- TNode* next = prev->GetNext(0);
- if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) {
- // avoid seek in case of sequential insert
+ TNode* PrepareInsert(const T& value) {
+ TNode* prev = Prev[0];
+ TNode* next = prev->GetNext(0);
+ if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) {
+ // avoid seek in case of sequential insert
} else {
- prev = FindLessThan(value, Prev);
- next = prev->GetNext(0);
+ prev = FindLessThan(value, Prev);
+ next = prev->GetNext(0);
}
- return next;
+ return next;
}
TNode* DoInsert(T&& value) {
- // choose level to place new node
- int currentHeight = AtomicGet(Height);
- int height = RandomHeight();
- if (height > currentHeight) {
- for (int i = currentHeight; i < height; ++i) {
- // head should link to all levels
- Prev[i] = Head;
- }
- AtomicSet(Height, height);
+ // choose level to place new node
+ int currentHeight = AtomicGet(Height);
+ int height = RandomHeight();
+ if (height > currentHeight) {
+ for (int i = currentHeight; i < height; ++i) {
+ // head should link to all levels
+ Prev[i] = Head;
+ }
+ AtomicSet(Height, height);
}
TNode* node = AllocateNode(std::move(value), height);
- node->Link(height, Prev);
+ node->Link(height, Prev);
- // keep last inserted node to optimize sequential inserts
- for (int i = 0; i < height; i++) {
- Prev[i] = node;
- }
- return node;
+ // keep last inserted node to optimize sequential inserts
+ for (int i = 0; i < height; i++) {
+ Prev[i] = node;
+ }
+ return node;
}
- };
+ };
-}
+}
diff --git a/library/cpp/threading/skip_list/skiplist_ut.cpp b/library/cpp/threading/skip_list/skiplist_ut.cpp
index fdc831dffd..52fcffda66 100644
--- a/library/cpp/threading/skip_list/skiplist_ut.cpp
+++ b/library/cpp/threading/skip_list/skiplist_ut.cpp
@@ -3,41 +3,41 @@
#include <library/cpp/testing/unittest/registar.h>
namespace NThreading {
- namespace {
- struct TTestObject {
- static size_t Count;
- int Tag;
-
- TTestObject(int tag)
- : Tag(tag)
- {
- ++Count;
- }
-
- TTestObject(const TTestObject& other)
- : Tag(other.Tag)
- {
- ++Count;
- }
-
- ~TTestObject() {
- --Count;
- }
-
- bool operator<(const TTestObject& other) const {
- return Tag < other.Tag;
- }
- };
-
- size_t TTestObject::Count = 0;
+ namespace {
+ struct TTestObject {
+ static size_t Count;
+ int Tag;
+
+ TTestObject(int tag)
+ : Tag(tag)
+ {
+ ++Count;
+ }
+
+ TTestObject(const TTestObject& other)
+ : Tag(other.Tag)
+ {
+ ++Count;
+ }
+
+ ~TTestObject() {
+ --Count;
+ }
+
+ bool operator<(const TTestObject& other) const {
+ return Tag < other.Tag;
+ }
+ };
+
+ size_t TTestObject::Count = 0;
}
- ////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////
Y_UNIT_TEST_SUITE(TSkipListTest) {
Y_UNIT_TEST(ShouldBeEmptyAfterCreation) {
- TMemoryPool pool(1024);
+ TMemoryPool pool(1024);
TSkipList<int> list(pool);
UNIT_ASSERT_EQUAL(list.GetSize(), 0);
@@ -182,4 +182,4 @@ namespace NThreading {
}
}
-}
+}
diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
index 2223dce650..3b5203194a 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
@@ -14,9 +14,9 @@ class TTaskSchedulerTest: public TTestBase {
class TCheckTask: public TTaskScheduler::IRepeatedTask {
public:
- TCheckTask(const TDuration& delay)
- : Start_(Now())
- , Delay_(delay)
+ TCheckTask(const TDuration& delay)
+ : Start_(Now())
+ , Delay_(delay)
{
AtomicIncrement(ScheduledTaskCounter_);
}
@@ -25,15 +25,15 @@ class TTaskSchedulerTest: public TTestBase {
}
bool Process() override {
- const TDuration delay = Now() - Start_;
+ const TDuration delay = Now() - Start_;
- if (delay < Delay_) {
+ if (delay < Delay_) {
AtomicIncrement(BadTimeoutCounter_);
}
AtomicIncrement(ExecutedTaskCounter_);
-
- return false;
+
+ return false;
}
static bool AllTaskExecuted() {
@@ -45,8 +45,8 @@ class TTaskSchedulerTest: public TTestBase {
}
private:
- TInstant Start_;
- TDuration Delay_;
+ TInstant Start_;
+ TDuration Delay_;
static TAtomic BadTimeoutCounter_;
static TAtomic ScheduledTaskCounter_;
static TAtomic ExecutedTaskCounter_;
@@ -60,7 +60,7 @@ class TTaskSchedulerTest: public TTestBase {
ScheduleCheckTask(10000);
ScheduleCheckTask(5000);
- Scheduler_.Start();
+ Scheduler_.Start();
usleep(1000000);
@@ -70,8 +70,8 @@ class TTaskSchedulerTest: public TTestBase {
private:
void ScheduleCheckTask(size_t delay) {
- TDuration d = TDuration::MicroSeconds(delay);
-
+ TDuration d = TDuration::MicroSeconds(delay);
+
Scheduler_.Add(new TCheckTask(d), d);
}