diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/threading | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading')
37 files changed, 2895 insertions, 2895 deletions
diff --git a/library/cpp/threading/atomic/bool.h b/library/cpp/threading/atomic/bool.h index ec8f75427b..d52544e762 100644 --- a/library/cpp/threading/atomic/bool.h +++ b/library/cpp/threading/atomic/bool.h @@ -1,7 +1,7 @@ -#pragma once - +#pragma once + #include <util/system/atomic.h> - + namespace NAtomic { class TBool { public: @@ -20,12 +20,12 @@ namespace NAtomic { return AtomicGet(Val_); } - const TBool& operator=(bool val) noexcept { + const TBool& operator=(bool val) noexcept { AtomicSet(Val_, val); return *this; } - const TBool& operator=(const TBool& src) noexcept { + const TBool& operator=(const TBool& src) noexcept { AtomicSet(Val_, AtomicGet(src.Val_)); return *this; } @@ -33,4 +33,4 @@ namespace NAtomic { private: TAtomic Val_ = 0; }; -} +} diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index 1959b0258c..55859601a1 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -19,21 +19,21 @@ namespace NThreading { // Platform helpers #if !defined(PLATFORM_CACHE_LINE) -#define PLATFORM_CACHE_LINE 64 +#define PLATFORM_CACHE_LINE 64 #endif #if !defined(PLATFORM_PAGE_SIZE) -#define PLATFORM_PAGE_SIZE 4 * 1024 +#define PLATFORM_PAGE_SIZE 4 * 1024 #endif - template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> - struct TPadded: public T { - char Pad[PadSize - sizeof(T) % PadSize]; + template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> + struct TPadded: public T { + char Pad[PadSize - sizeof(T) % PadSize]; - TPadded() { - static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); - Y_UNUSED(Pad); - } + TPadded() { + static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); + Y_UNUSED(Pad); + } template<typename... Args> TPadded(Args&&... args) @@ -42,280 +42,280 @@ namespace NThreading { static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); Y_UNUSED(Pad); } - }; + }; + + //////////////////////////////////////////////////////////////////////////////// + // Type helpers + + namespace NImpl { + template <typename T> + struct TPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + *ptr = value; + } + + static T Read(T* ptr) { + return *ptr; + } + + static void Destroy(T* ptr) { + Y_UNUSED(ptr); + } + }; + + template <typename T> + struct TNonPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + new (ptr) T(std::forward<TT>(value)); + } + + static T Read(T* ptr) { + return std::move(*ptr); + } + + static void Destroy(T* ptr) { + (void)ptr; /* Make MSVC happy. */ + ptr->~T(); + } + }; + + template <typename T> + using TTypeHelper = std::conditional_t< + TTypeTraits<T>::IsPod, + TPodTypeHelper<T>, + TNonPodTypeHelper<T>>; + + } + + //////////////////////////////////////////////////////////////////////////////// + // One producer/one consumer chunked queue. + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TOneOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TChunk; + + struct TChunkHeader { + size_t Count = 0; + TChunk* Next = nullptr; + }; + + struct TChunk: public TChunkHeader { + static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); + + char Entries[MaxCount * sizeof(T)]; + + TChunk() { + Y_UNUSED(Entries); // uninitialized + } + + ~TChunk() { + for (size_t i = 0; i < this->Count; ++i) { + TTypeHelper::Destroy(GetPtr(i)); + } + } + + T* GetPtr(size_t i) { + return (T*)Entries + i; + } + }; + + struct TWriterState { + TChunk* Chunk = nullptr; + }; + + struct TReaderState { + TChunk* Chunk = nullptr; + size_t Count = 0; + }; + + private: + TPadded<TWriterState> Writer; + TPadded<TReaderState> Reader; + + public: + using TItem = T; + + TOneOneQueue() { + Writer.Chunk = Reader.Chunk = new TChunk(); + } + + ~TOneOneQueue() { + DeleteChunks(Reader.Chunk); + } + + template <typename TT> + void Enqueue(TT&& value) { + T* ptr = PrepareWrite(); + Y_ASSERT(ptr); + TTypeHelper::Write(ptr, std::forward<TT>(value)); + CompleteWrite(); + } + + bool Dequeue(T& value) { + if (T* ptr = PrepareRead()) { + value = TTypeHelper::Read(ptr); + CompleteRead(); + return true; + } + return false; + } + + bool IsEmpty() { + return !PrepareRead(); + } + + protected: + T* PrepareWrite() { + TChunk* chunk = Writer.Chunk; + Y_ASSERT(chunk && !chunk->Next); + + if (chunk->Count != TChunk::MaxCount) { + return chunk->GetPtr(chunk->Count); + } + + chunk = new TChunk(); + AtomicSet(Writer.Chunk->Next, chunk); + Writer.Chunk = chunk; + return chunk->GetPtr(0); + } + + void CompleteWrite() { + AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); + } + + T* PrepareRead() { + TChunk* chunk = Reader.Chunk; + Y_ASSERT(chunk); + + for (;;) { + size_t writerCount = AtomicGet(chunk->Count); + if (Reader.Count != writerCount) { + return chunk->GetPtr(Reader.Count); + } + + if (writerCount != TChunk::MaxCount) { + return nullptr; + } + + chunk = AtomicGet(chunk->Next); + if (!chunk) { + return nullptr; + } + + delete Reader.Chunk; + Reader.Chunk = chunk; + Reader.Count = 0; + } + } - //////////////////////////////////////////////////////////////////////////////// - // Type helpers + void CompleteRead() { + ++Reader.Count; + } - namespace NImpl { - template <typename T> - struct TPodTypeHelper { - template <typename TT> - static void Write(T* ptr, TT&& value) { - *ptr = value; - } + private: + static void DeleteChunks(TChunk* chunk) { + while (chunk) { + TChunk* next = chunk->Next; + delete chunk; + chunk = next; + } + } + }; - static T Read(T* ptr) { - return *ptr; - } - - static void Destroy(T* ptr) { - Y_UNUSED(ptr); - } - }; + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Provides FIFO guaranties for each producer. - template <typename T> - struct TNonPodTypeHelper { - template <typename TT> - static void Write(T* ptr, TT&& value) { - new (ptr) T(std::forward<TT>(value)); - } - - static T Read(T* ptr) { - return std::move(*ptr); - } - - static void Destroy(T* ptr) { - (void)ptr; /* Make MSVC happy. */ - ptr->~T(); - } - }; - - template <typename T> - using TTypeHelper = std::conditional_t< - TTypeTraits<T>::IsPod, - TPodTypeHelper<T>, - TNonPodTypeHelper<T>>; - - } - - //////////////////////////////////////////////////////////////////////////////// - // One producer/one consumer chunked queue. - - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TOneOneQueue: private TNonCopyable { - using TTypeHelper = NImpl::TTypeHelper<T>; - - struct TChunk; - - struct TChunkHeader { - size_t Count = 0; - TChunk* Next = nullptr; - }; - - struct TChunk: public TChunkHeader { - static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); - - char Entries[MaxCount * sizeof(T)]; - - TChunk() { - Y_UNUSED(Entries); // uninitialized - } - - ~TChunk() { - for (size_t i = 0; i < this->Count; ++i) { - TTypeHelper::Destroy(GetPtr(i)); - } - } - - T* GetPtr(size_t i) { - return (T*)Entries + i; - } - }; - - struct TWriterState { - TChunk* Chunk = nullptr; - }; - - struct TReaderState { - TChunk* Chunk = nullptr; - size_t Count = 0; - }; - - private: - TPadded<TWriterState> Writer; - TPadded<TReaderState> Reader; - - public: - using TItem = T; - - TOneOneQueue() { - Writer.Chunk = Reader.Chunk = new TChunk(); - } - - ~TOneOneQueue() { - DeleteChunks(Reader.Chunk); - } - - template <typename TT> - void Enqueue(TT&& value) { - T* ptr = PrepareWrite(); - Y_ASSERT(ptr); - TTypeHelper::Write(ptr, std::forward<TT>(value)); - CompleteWrite(); - } - - bool Dequeue(T& value) { - if (T* ptr = PrepareRead()) { - value = TTypeHelper::Read(ptr); - CompleteRead(); - return true; - } - return false; - } - - bool IsEmpty() { - return !PrepareRead(); - } + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TManyOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; - protected: - T* PrepareWrite() { - TChunk* chunk = Writer.Chunk; - Y_ASSERT(chunk && !chunk->Next); - - if (chunk->Count != TChunk::MaxCount) { - return chunk->GetPtr(chunk->Count); - } - - chunk = new TChunk(); - AtomicSet(Writer.Chunk->Next, chunk); - Writer.Chunk = chunk; - return chunk->GetPtr(0); - } - - void CompleteWrite() { - AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); - } + struct TEntry { + T Value; + ui64 Tag; + }; + + struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { + TAtomic WriteLock = 0; - T* PrepareRead() { - TChunk* chunk = Reader.Chunk; - Y_ASSERT(chunk); + using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; + using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; + using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; + }; - for (;;) { - size_t writerCount = AtomicGet(chunk->Count); - if (Reader.Count != writerCount) { - return chunk->GetPtr(Reader.Count); - } - - if (writerCount != TChunk::MaxCount) { - return nullptr; - } - - chunk = AtomicGet(chunk->Next); - if (!chunk) { - return nullptr; - } - - delete Reader.Chunk; - Reader.Chunk = chunk; - Reader.Count = 0; - } - } - - void CompleteRead() { - ++Reader.Count; - } - - private: - static void DeleteChunks(TChunk* chunk) { - while (chunk) { - TChunk* next = chunk->Next; - delete chunk; - chunk = next; - } - } - }; - - //////////////////////////////////////////////////////////////////////////////// - // Multiple producers/single consumer partitioned queue. - // Provides FIFO guaranties for each producer. - - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TManyOneQueue: private TNonCopyable { - using TTypeHelper = NImpl::TTypeHelper<T>; - - struct TEntry { - T Value; - ui64 Tag; - }; - - struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { - TAtomic WriteLock = 0; - - using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; - using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; - - using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; - using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; - }; - - private: - union { - TAtomic WriteTag = 0; - char Pad[PLATFORM_CACHE_LINE]; - }; - - TQueueType Queues[Concurrency]; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - ui64 tag = NextTag(); - while (!TryEnqueue(std::forward<TT>(value), tag)) { - SpinLockPause(); - } - } - - bool Dequeue(T& value) { - size_t index = 0; - if (TEntry* entry = PrepareRead(index)) { - value = TTypeHelper::Read(&entry->Value); - Queues[index].CompleteRead(); - return true; - } - return false; - } + private: + union { + TAtomic WriteTag = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + ui64 tag = NextTag(); + while (!TryEnqueue(std::forward<TT>(value), tag)) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t index = 0; + if (TEntry* entry = PrepareRead(index)) { + value = TTypeHelper::Read(&entry->Value); + Queues[index].CompleteRead(); + return true; + } + return false; + } - bool IsEmpty() { - for (size_t i = 0; i < Concurrency; ++i) { - if (!Queues[i].IsEmpty()) { - return false; - } - } - return true; + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; } - private: - ui64 NextTag() { - // TODO: can we avoid synchronization here? it costs 1.5x performance penalty - // return GetCycleCount(); - return AtomicIncrement(WriteTag); - } + private: + ui64 NextTag() { + // TODO: can we avoid synchronization here? it costs 1.5x performance penalty + // return GetCycleCount(); + return AtomicIncrement(WriteTag); + } - template <typename TT> - bool TryEnqueue(TT&& value, ui64 tag) { - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { - TEntry* entry = queue.PrepareWrite(); - Y_ASSERT(entry); - TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); - entry->Tag = tag; - queue.CompleteWrite(); - AtomicUnlock(&queue.WriteLock); - return true; - } + template <typename TT> + bool TryEnqueue(TT&& value, ui64 tag) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + TEntry* entry = queue.PrepareWrite(); + Y_ASSERT(entry); + TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); + entry->Tag = tag; + queue.CompleteWrite(); + AtomicUnlock(&queue.WriteLock); + return true; + } } - return false; + return false; } - TEntry* PrepareRead(size_t& index) { - TEntry* entry = nullptr; - ui64 tag = Max(); + TEntry* PrepareRead(size_t& index) { + TEntry* entry = nullptr; + ui64 tag = Max(); - for (size_t i = 0; i < Concurrency; ++i) { + for (size_t i = 0; i < Concurrency; ++i) { TEntry* e = Queues[i].PrepareRead(); if (e && e->Tag < tag) { index = i; @@ -323,246 +323,246 @@ namespace NThreading { tag = e->Tag; } } - - if (entry) { - // need second pass to catch updates within already scanned range - size_t candidate = index; - for (size_t i = 0; i < candidate; ++i) { - TEntry* e = Queues[i].PrepareRead(); - if (e && e->Tag < tag) { - index = i; - entry = e; - tag = e->Tag; - } - } - } - - return entry; + + if (entry) { + // need second pass to catch updates within already scanned range + size_t candidate = index; + for (size_t i = 0; i < candidate; ++i) { + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } + } + + return entry; } - }; + }; - //////////////////////////////////////////////////////////////////////////////// - // Concurrent many-many queue with strong FIFO guaranties. - // Writers will not block readers (and vice versa), but will block each other. + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many queue with strong FIFO guaranties. + // Writers will not block readers (and vice versa), but will block each other. - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> - class TManyManyQueue: private TNonCopyable { - private: - TPadded<TLock> WriteLock; - TPadded<TLock> ReadLock; - - TOneOneQueue<T, ChunkSize> Queue; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - with_lock (WriteLock) { - Queue.Enqueue(std::forward<TT>(value)); - } - } - - bool Dequeue(T& value) { - with_lock (ReadLock) { - return Queue.Dequeue(value); - } - } - - bool IsEmpty() { - with_lock (ReadLock) { - return Queue.IsEmpty(); - } - } - }; - - //////////////////////////////////////////////////////////////////////////////// - // Multiple producers/single consumer partitioned queue. - // Because of random partitioning reordering possible - FIFO not guaranteed! - - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TRelaxedManyOneQueue: private TNonCopyable { - struct TQueueType: public TOneOneQueue<T, ChunkSize> { - TAtomic WriteLock = 0; - }; - - private: - union { - size_t ReadPos = 0; - char Pad[PLATFORM_CACHE_LINE]; - }; - - TQueueType Queues[Concurrency]; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - while (!TryEnqueue(std::forward<TT>(value))) { - SpinLockPause(); - } - } - - bool Dequeue(T& value) { - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[ReadPos++ % Concurrency]; - if (queue.Dequeue(value)) { - return true; - } - } - return false; - } - - bool IsEmpty() { - for (size_t i = 0; i < Concurrency; ++i) { - if (!Queues[i].IsEmpty()) { - return false; - } - } - return true; - } - - private: - template <typename TT> - bool TryEnqueue(TT&& value) { - size_t writePos = GetCycleCount(); - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { - queue.Enqueue(std::forward<TT>(value)); - AtomicUnlock(&queue.WriteLock); - return true; - } - } - return false; + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + class TManyManyQueue: private TNonCopyable { + private: + TPadded<TLock> WriteLock; + TPadded<TLock> ReadLock; + + TOneOneQueue<T, ChunkSize> Queue; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + with_lock (WriteLock) { + Queue.Enqueue(std::forward<TT>(value)); + } } - }; - //////////////////////////////////////////////////////////////////////////////// - // Concurrent many-many partitioned queue. - // Because of random partitioning reordering possible - FIFO not guaranteed! - - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TRelaxedManyManyQueue: private TNonCopyable { - struct TQueueType: public TOneOneQueue<T, ChunkSize> { - union { - TAtomic WriteLock = 0; - char Pad1[PLATFORM_CACHE_LINE]; - }; - union { - TAtomic ReadLock = 0; - char Pad2[PLATFORM_CACHE_LINE]; - }; + bool Dequeue(T& value) { + with_lock (ReadLock) { + return Queue.Dequeue(value); + } + } + + bool IsEmpty() { + with_lock (ReadLock) { + return Queue.IsEmpty(); + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyOneQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + TAtomic WriteLock = 0; + }; + + private: + union { + size_t ReadPos = 0; + char Pad[PLATFORM_CACHE_LINE]; }; - private: - TQueueType Queues[Concurrency]; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - while (!TryEnqueue(std::forward<TT>(value))) { - SpinLockPause(); - } - } - - bool Dequeue(T& value) { - size_t readPos = GetCycleCount(); - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[readPos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { - bool dequeued = queue.Dequeue(value); - AtomicUnlock(&queue.ReadLock); - if (dequeued) { - return true; - } + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[ReadPos++ % Concurrency]; + if (queue.Dequeue(value)) { + return true; } } - return false; + return false; } - bool IsEmpty() { - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { - bool empty = queue.IsEmpty(); - AtomicUnlock(&queue.ReadLock); - if (!empty) { - return false; - } + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; } } - return true; + return true; } - private: - template <typename TT> - bool TryEnqueue(TT&& value) { - size_t writePos = GetCycleCount(); - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { - queue.Enqueue(std::forward<TT>(value)); - AtomicUnlock(&queue.WriteLock); - return true; - } + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } } - return false; + return false; } - }; + }; + + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyManyQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + union { + TAtomic WriteLock = 0; + char Pad1[PLATFORM_CACHE_LINE]; + }; + union { + TAtomic ReadLock = 0; + char Pad2[PLATFORM_CACHE_LINE]; + }; + }; - //////////////////////////////////////////////////////////////////////////////// - // Simple wrapper to deal with AutoPtrs + private: + TQueueType Queues[Concurrency]; - template <typename T, typename TImpl> - class TAutoQueueBase: private TNonCopyable { - private: - TImpl Impl; + public: + using TItem = T; - public: - using TItem = TAutoPtr<T>; + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } - ~TAutoQueueBase() { - TAutoPtr<T> value; - while (Dequeue(value)) { - // do nothing - } + bool Dequeue(T& value) { + size_t readPos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[readPos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool dequeued = queue.Dequeue(value); + AtomicUnlock(&queue.ReadLock); + if (dequeued) { + return true; + } + } + } + return false; } - void Enqueue(TAutoPtr<T> value) { - Impl.Enqueue(value.Get()); + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool empty = queue.IsEmpty(); + AtomicUnlock(&queue.ReadLock); + if (!empty) { + return false; + } + } + } + return true; + } + + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Simple wrapper to deal with AutoPtrs + + template <typename T, typename TImpl> + class TAutoQueueBase: private TNonCopyable { + private: + TImpl Impl; + + public: + using TItem = TAutoPtr<T>; + + ~TAutoQueueBase() { + TAutoPtr<T> value; + while (Dequeue(value)) { + // do nothing + } + } + + void Enqueue(TAutoPtr<T> value) { + Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); - } + } - bool Dequeue(TAutoPtr<T>& value) { - T* ptr = nullptr; - if (Impl.Dequeue(ptr)) { - value.Reset(ptr); - return true; - } - return false; + bool Dequeue(TAutoPtr<T>& value) { + T* ptr = nullptr; + if (Impl.Dequeue(ptr)) { + value.Reset(ptr); + return true; + } + return false; } - bool IsEmpty() { - return Impl.IsEmpty(); - } - }; + bool IsEmpty() { + return Impl.IsEmpty(); + } + }; - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> - using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; -} + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; +} diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp index 406b71dd4c..8cb36d8dd1 100644 --- a/library/cpp/threading/chunk_queue/queue_ut.cpp +++ b/library/cpp/threading/chunk_queue/queue_ut.cpp @@ -5,55 +5,55 @@ #include <util/generic/set.h> namespace NThreading { - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// Y_UNIT_TEST_SUITE(TOneOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TOneOneQueue<int> queue; + TOneOneQueue<int> queue; - int result = 0; - UNIT_ASSERT(queue.IsEmpty()); - UNIT_ASSERT(!queue.Dequeue(result)); -} + int result = 0; + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} Y_UNIT_TEST(ShouldReturnEntries) { - TOneOneQueue<int> queue; - queue.Enqueue(1); - queue.Enqueue(2); - queue.Enqueue(3); + TOneOneQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); - int result = 0; - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 1); + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 1); - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 2); + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 2); - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 3); + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 3); - UNIT_ASSERT(queue.IsEmpty()); - UNIT_ASSERT(!queue.Dequeue(result)); -} + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} Y_UNIT_TEST(ShouldStoreMultipleChunks) { - TOneOneQueue<int, 100> queue; - for (int i = 0; i < 1000; ++i) { - queue.Enqueue(i); + TOneOneQueue<int, 100> queue; + for (int i = 0; i < 1000; ++i) { + queue.Enqueue(i); } - for (int i = 0; i < 1000; ++i) { - int result = 0; - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, i); + for (int i = 0; i < 1000; ++i) { + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, i); } -} -} -; +} +} +; //////////////////////////////////////////////////////////////////////////////// @@ -61,35 +61,35 @@ Y_UNIT_TEST_SUITE(TManyOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ TManyOneQueue<int> queue; -int result; -UNIT_ASSERT(queue.IsEmpty()); -UNIT_ASSERT(!queue.Dequeue(result)); -} +int result; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} Y_UNIT_TEST(ShouldReturnEntries) { - TManyOneQueue<int> queue; - queue.Enqueue(1); - queue.Enqueue(2); - queue.Enqueue(3); - - int result = 0; - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 1); - - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 2); - - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 3); - - UNIT_ASSERT(queue.IsEmpty()); - UNIT_ASSERT(!queue.Dequeue(result)); -} -} -; + TManyOneQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 1); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 2); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 3); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; //////////////////////////////////////////////////////////////////////////////// @@ -97,35 +97,35 @@ Y_UNIT_TEST_SUITE(TManyManyQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ TManyManyQueue<int> queue; -int result = 0; -UNIT_ASSERT(queue.IsEmpty()); -UNIT_ASSERT(!queue.Dequeue(result)); -} +int result = 0; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} Y_UNIT_TEST(ShouldReturnEntries) { - TManyManyQueue<int> queue; - queue.Enqueue(1); - queue.Enqueue(2); - queue.Enqueue(3); - - int result = 0; - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 1); - - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 2); - - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT_EQUAL(result, 3); - - UNIT_ASSERT(queue.IsEmpty()); - UNIT_ASSERT(!queue.Dequeue(result)); -} -} -; + TManyManyQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 1); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 2); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 3); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; //////////////////////////////////////////////////////////////////////////////// @@ -133,37 +133,37 @@ Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ TRelaxedManyOneQueue<int> queue; -int result; -UNIT_ASSERT(queue.IsEmpty()); -UNIT_ASSERT(!queue.Dequeue(result)); -} +int result; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} Y_UNIT_TEST(ShouldReturnEntries) { - TSet<int> items = {1, 2, 3}; + TSet<int> items = {1, 2, 3}; - TRelaxedManyOneQueue<int> queue; - for (int item : items) { - queue.Enqueue(item); - } + TRelaxedManyOneQueue<int> queue; + for (int item : items) { + queue.Enqueue(item); + } - int result = 0; - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT(items.erase(result)); + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT(items.erase(result)); + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT(items.erase(result)); + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); - UNIT_ASSERT(queue.IsEmpty()); - UNIT_ASSERT(!queue.Dequeue(result)); -} -} -; + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; //////////////////////////////////////////////////////////////////////////////// @@ -171,35 +171,35 @@ Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ TRelaxedManyManyQueue<int> queue; -int result = 0; -UNIT_ASSERT(queue.IsEmpty()); -UNIT_ASSERT(!queue.Dequeue(result)); -} +int result = 0; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} Y_UNIT_TEST(ShouldReturnEntries) { - TSet<int> items = {1, 2, 3}; - - TRelaxedManyManyQueue<int> queue; - for (int item : items) { - queue.Enqueue(item); - } - - int result = 0; - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT(items.erase(result)); - - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT(items.erase(result)); - - UNIT_ASSERT(!queue.IsEmpty()); - UNIT_ASSERT(queue.Dequeue(result)); - UNIT_ASSERT(items.erase(result)); - - UNIT_ASSERT(queue.IsEmpty()); - UNIT_ASSERT(!queue.Dequeue(result)); -} -} -; -} + TSet<int> items = {1, 2, 3}; + + TRelaxedManyManyQueue<int> queue; + for (int item : items) { + queue.Enqueue(item); + } + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; +} diff --git a/library/cpp/threading/future/async.h b/library/cpp/threading/future/async.h index f964d2dc88..8543fdd5c6 100644 --- a/library/cpp/threading/future/async.h +++ b/library/cpp/threading/future/async.h @@ -2,11 +2,11 @@ #include "future.h" -#include <util/generic/function.h> +#include <util/generic/function.h> #include <util/thread/pool.h> namespace NThreading { - /** + /** * @brief Asynchronously executes @arg func in @arg queue returning a future for the result. * * @arg func should be a callable object with signature T(). @@ -17,15 +17,15 @@ namespace NThreading { * If you want to use another queue for execution just write an overload, @see ExtensionExample * unittest. */ - template <typename Func> + template <typename Func> TFuture<TFutureType<TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) { - auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>(); + auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>(); auto lambda = [promise, func = std::forward<Func>(func)]() mutable { - NImpl::SetValue(promise, func); + NImpl::SetValue(promise, func); }; queue.SafeAddFunc(std::move(lambda)); - return promise.GetFuture(); - } + return promise.GetFuture(); + } } diff --git a/library/cpp/threading/future/async_ut.cpp b/library/cpp/threading/future/async_ut.cpp index a452965dbc..a3699744e4 100644 --- a/library/cpp/threading/future/async_ut.cpp +++ b/library/cpp/threading/future/async_ut.cpp @@ -6,13 +6,13 @@ #include <util/generic/vector.h> namespace { - struct TMySuperTaskQueue { - }; + struct TMySuperTaskQueue { + }; } namespace NThreading { - /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace + /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace * so that we can call it in the way * * TMySuperTaskQueue queue; @@ -20,38 +20,38 @@ namespace NThreading { * * See also ExtensionExample unittest. */ - template <typename Func> - TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) { - return MakeFuture(func()); - } + template <typename Func> + TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) { + return MakeFuture(func()); + } } Y_UNIT_TEST_SUITE(Async) { Y_UNIT_TEST(ExtensionExample) { - TMySuperTaskQueue queue; - auto future = NThreading::Async([]() { return 5; }, queue); - future.Wait(); - UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); - } + TMySuperTaskQueue queue; + auto future = NThreading::Async([]() { return 5; }, queue); + future.Wait(); + UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); + } Y_UNIT_TEST(WorksWithIMtpQueue) { auto queue = MakeHolder<TThreadPool>(); - queue->Start(1); + queue->Start(1); - auto future = NThreading::Async([]() { return 5; }, *queue); - future.Wait(); - UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); - } + auto future = NThreading::Async([]() { return 5; }, *queue); + future.Wait(); + UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); + } Y_UNIT_TEST(ProperlyDeducesFutureType) { - // Compileability test + // Compileability test auto queue = CreateThreadPool(1); - NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue); - NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue); - NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue); - NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue); - NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue); - } + NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue); + NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue); + NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue); + NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue); + NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue); + } } diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index a0e06c1891..5fd4296a93 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -2,92 +2,92 @@ #if !defined(INCLUDE_FUTURE_INL_H) #error "you should never include future-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H +#endif // INCLUDE_FUTURE_INL_H namespace NThreading { - namespace NImpl { - //////////////////////////////////////////////////////////////////////////////// + namespace NImpl { + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - using TCallback = std::function<void(const TFuture<T>&)>; + template <typename T> + using TCallback = std::function<void(const TFuture<T>&)>; - template <typename T> - using TCallbackList = TVector<TCallback<T>>; // TODO: small vector + template <typename T> + using TCallbackList = TVector<TCallback<T>>; // TODO: small vector - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// enum class TError { Error }; - template <typename T> - class TFutureState: public TAtomicRefCount<TFutureState<T>> { - enum { - NotReady, - ExceptionSet, - ValueMoved, // keep the ordering of this and following values - ValueSet, - ValueRead, - }; - - private: - mutable TAtomic State; - TAdaptiveLock StateLock; - - TCallbackList<T> Callbacks; + template <typename T> + class TFutureState: public TAtomicRefCount<TFutureState<T>> { + enum { + NotReady, + ExceptionSet, + ValueMoved, // keep the ordering of this and following values + ValueSet, + ValueRead, + }; + + private: + mutable TAtomic State; + TAdaptiveLock StateLock; + + TCallbackList<T> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - std::exception_ptr Exception; - - union { - char NullValue; - T Value; - }; - - void AccessValue(TDuration timeout, int acquireState) const { - int state = AtomicGet(State); - if (Y_UNLIKELY(state == NotReady)) { - if (timeout == TDuration::Zero()) { - ythrow TFutureException() << "value not set"; - } - - if (!Wait(timeout)) { - ythrow TFutureException() << "wait timeout"; - } - - state = AtomicGet(State); - } - + std::exception_ptr Exception; + + union { + char NullValue; + T Value; + }; + + void AccessValue(TDuration timeout, int acquireState) const { + int state = AtomicGet(State); + if (Y_UNLIKELY(state == NotReady)) { + if (timeout == TDuration::Zero()) { + ythrow TFutureException() << "value not set"; + } + + if (!Wait(timeout)) { + ythrow TFutureException() << "wait timeout"; + } + + state = AtomicGet(State); + } + TryRethrowWithState(state); - - switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { - case ValueSet: - break; - case ValueRead: - if (acquireState != ValueRead) { - ythrow TFutureException() << "value being read"; - } - break; - case ValueMoved: - ythrow TFutureException() << "value was moved"; - default: - Y_ASSERT(state == ValueSet); - } - } - - public: - TFutureState() - : State(NotReady) - , NullValue(0) - { - } - - template <typename TT> - TFutureState(TT&& value) - : State(ValueSet) - , Value(std::forward<TT>(value)) - { - } + + switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { + case ValueSet: + break; + case ValueRead: + if (acquireState != ValueRead) { + ythrow TFutureException() << "value being read"; + } + break; + case ValueMoved: + ythrow TFutureException() << "value was moved"; + default: + Y_ASSERT(state == ValueSet); + } + } + + public: + TFutureState() + : State(NotReady) + , NullValue(0) + { + } + + template <typename TT> + TFutureState(TT&& value) + : State(ValueSet) + , Value(std::forward<TT>(value)) + { + } TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) @@ -96,14 +96,14 @@ namespace NThreading { { } - ~TFutureState() { - if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead - Value.~T(); - } - } + ~TFutureState() { + if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead + Value.~T(); + } + } - bool HasValue() const { - return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead + bool HasValue() const { + return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead } void TryRethrow() const { @@ -111,22 +111,22 @@ namespace NThreading { TryRethrowWithState(state); } - bool HasException() const { - return AtomicGet(State) == ExceptionSet; - } + bool HasException() const { + return AtomicGet(State) == ExceptionSet; + } - const T& GetValue(TDuration timeout = TDuration::Zero()) const { - AccessValue(timeout, ValueRead); - return Value; - } + const T& GetValue(TDuration timeout = TDuration::Zero()) const { + AccessValue(timeout, ValueRead); + return Value; + } - T ExtractValue(TDuration timeout = TDuration::Zero()) { - AccessValue(timeout, ValueMoved); - return std::move(Value); - } + T ExtractValue(TDuration timeout = TDuration::Zero()) { + AccessValue(timeout, ValueMoved); + return std::move(Value); + } - template <typename TT> - void SetValue(TT&& value) { + template <typename TT> + void SetValue(TT&& value) { bool success = TrySetValue(std::forward<TT>(value)); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -136,37 +136,37 @@ namespace NThreading { template <typename TT> bool TrySetValue(TT&& value) { TSystemEvent* readyEvent = nullptr; - TCallbackList<T> callbacks; + TCallbackList<T> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } + } + + new (&Value) T(std::forward<TT>(value)); - new (&Value) T(std::forward<TT>(value)); + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); + AtomicSet(State, ValueSet); + } - AtomicSet(State, ValueSet); - } + if (readyEvent) { + readyEvent->Signal(); + } - if (readyEvent) { - readyEvent->Signal(); - } - - if (callbacks) { - TFuture<T> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + if (callbacks) { + TFuture<T> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; } - void SetException(std::exception_ptr e) { + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -175,73 +175,73 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent; - TCallbackList<T> callbacks; + TCallbackList<T> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } - - Exception = std::move(e); - - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); - - AtomicSet(State, ExceptionSet); - } - - if (readyEvent) { - readyEvent->Signal(); - } - - if (callbacks) { - TFuture<T> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + } + + Exception = std::move(e); + + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); + + AtomicSet(State, ExceptionSet); + } + + if (readyEvent) { + readyEvent->Signal(); + } + + if (callbacks) { + TFuture<T> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; } - template <typename F> - bool Subscribe(F&& func) { - with_lock (StateLock) { - int state = AtomicGet(State); - if (state == NotReady) { - Callbacks.emplace_back(std::forward<F>(func)); - return true; - } - } - return false; - } + template <typename F> + bool Subscribe(F&& func) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (state == NotReady) { + Callbacks.emplace_back(std::forward<F>(func)); + return true; + } + } + return false; + } - void Wait() const { - Wait(TInstant::Max()); + void Wait() const { + Wait(TInstant::Max()); } - bool Wait(TDuration timeout) const { - return Wait(timeout.ToDeadLine()); - } + bool Wait(TDuration timeout) const { + return Wait(timeout.ToDeadLine()); + } - bool Wait(TInstant deadline) const { + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - with_lock (StateLock) { - int state = AtomicGet(State); - if (state != NotReady) { - return true; - } + with_lock (StateLock) { + int state = AtomicGet(State); + if (state != NotReady) { + return true; + } - if (!ReadyEvent) { + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); - } - readyEvent = ReadyEvent.Get(); - } + } + readyEvent = ReadyEvent.Get(); + } - Y_ASSERT(readyEvent); - return readyEvent->WaitD(deadline); + Y_ASSERT(readyEvent); + return readyEvent->WaitD(deadline); } void TryRethrowWithState(int state) const { @@ -250,31 +250,31 @@ namespace NThreading { std::rethrow_exception(Exception); } } - }; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <> - class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { - enum { - NotReady, - ValueSet, - ExceptionSet, - }; + template <> + class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { + enum { + NotReady, + ValueSet, + ExceptionSet, + }; - private: - TAtomic State; - TAdaptiveLock StateLock; + private: + TAtomic State; + TAdaptiveLock StateLock; - TCallbackList<void> Callbacks; + TCallbackList<void> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - std::exception_ptr Exception; - - public: - TFutureState(bool valueSet = false) - : State(valueSet ? ValueSet : NotReady) - { + std::exception_ptr Exception; + + public: + TFutureState(bool valueSet = false) + : State(valueSet ? ValueSet : NotReady) + { } TFutureState(std::exception_ptr exception, TError) @@ -283,8 +283,8 @@ namespace NThreading { { } - bool HasValue() const { - return AtomicGet(State) == ValueSet; + bool HasValue() const { + return AtomicGet(State) == ValueSet; } void TryRethrow() const { @@ -292,30 +292,30 @@ namespace NThreading { TryRethrowWithState(state); } - bool HasException() const { - return AtomicGet(State) == ExceptionSet; - } + bool HasException() const { + return AtomicGet(State) == ExceptionSet; + } - void GetValue(TDuration timeout = TDuration::Zero()) const { - int state = AtomicGet(State); - if (Y_UNLIKELY(state == NotReady)) { - if (timeout == TDuration::Zero()) { - ythrow TFutureException() << "value not set"; - } + void GetValue(TDuration timeout = TDuration::Zero()) const { + int state = AtomicGet(State); + if (Y_UNLIKELY(state == NotReady)) { + if (timeout == TDuration::Zero()) { + ythrow TFutureException() << "value not set"; + } - if (!Wait(timeout)) { - ythrow TFutureException() << "wait timeout"; - } + if (!Wait(timeout)) { + ythrow TFutureException() << "wait timeout"; + } - state = AtomicGet(State); - } + state = AtomicGet(State); + } TryRethrowWithState(state); - Y_ASSERT(state == ValueSet); - } + Y_ASSERT(state == ValueSet); + } - void SetValue() { + void SetValue() { bool success = TrySetValue(); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -324,35 +324,35 @@ namespace NThreading { bool TrySetValue() { TSystemEvent* readyEvent = nullptr; - TCallbackList<void> callbacks; + TCallbackList<void> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } + } - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); - AtomicSet(State, ValueSet); - } + AtomicSet(State, ValueSet); + } - if (readyEvent) { - readyEvent->Signal(); - } - - if (callbacks) { - TFuture<void> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + if (readyEvent) { + readyEvent->Signal(); + } + + if (callbacks) { + TFuture<void> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; } - void SetException(std::exception_ptr e) { + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -361,73 +361,73 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent = nullptr; - TCallbackList<void> callbacks; + TCallbackList<void> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } + } - Exception = std::move(e); + Exception = std::move(e); - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); - AtomicSet(State, ExceptionSet); - } + AtomicSet(State, ExceptionSet); + } - if (readyEvent) { - readyEvent->Signal(); - } + if (readyEvent) { + readyEvent->Signal(); + } - if (callbacks) { - TFuture<void> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + if (callbacks) { + TFuture<void> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; - } + } - template <typename F> - bool Subscribe(F&& func) { - with_lock (StateLock) { - int state = AtomicGet(State); - if (state == NotReady) { - Callbacks.emplace_back(std::forward<F>(func)); - return true; - } - } - return false; - } + template <typename F> + bool Subscribe(F&& func) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (state == NotReady) { + Callbacks.emplace_back(std::forward<F>(func)); + return true; + } + } + return false; + } - void Wait() const { - Wait(TInstant::Max()); + void Wait() const { + Wait(TInstant::Max()); } - bool Wait(TDuration timeout) const { - return Wait(timeout.ToDeadLine()); - } + bool Wait(TDuration timeout) const { + return Wait(timeout.ToDeadLine()); + } - bool Wait(TInstant deadline) const { + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - - with_lock (StateLock) { - int state = AtomicGet(State); - if (state != NotReady) { - return true; - } - - if (!ReadyEvent) { + + with_lock (StateLock) { + int state = AtomicGet(State); + if (state != NotReady) { + return true; + } + + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); - } - readyEvent = ReadyEvent.Get(); - } - - Y_ASSERT(readyEvent); - return readyEvent->WaitD(deadline); + } + readyEvent = ReadyEvent.Get(); + } + + Y_ASSERT(readyEvent); + return readyEvent->WaitD(deadline); } void TryRethrowWithState(int state) const { @@ -436,53 +436,53 @@ namespace NThreading { std::rethrow_exception(Exception); } } - }; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - inline void SetValueImpl(TPromise<T>& promise, const T& value) { - promise.SetValue(value); - } + template <typename T> + inline void SetValueImpl(TPromise<T>& promise, const T& value) { + promise.SetValue(value); + } - template <typename T> - inline void SetValueImpl(TPromise<T>& promise, T&& value) { - promise.SetValue(std::move(value)); + template <typename T> + inline void SetValueImpl(TPromise<T>& promise, T&& value) { + promise.SetValue(std::move(value)); } - template <typename T> + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future, std::enable_if_t<!std::is_void<T>::value, bool> = false) { - future.Subscribe([=](const TFuture<T>& f) mutable { + future.Subscribe([=](const TFuture<T>& f) mutable { T const* value; - try { + try { value = &f.GetValue(); - } catch (...) { - promise.SetException(std::current_exception()); + } catch (...) { + promise.SetException(std::current_exception()); return; - } + } promise.SetValue(*value); - }); + }); } template <typename T> inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) { future.Subscribe([=](const TFuture<T>& f) mutable { - try { + try { f.TryRethrow(); - } catch (...) { - promise.SetException(std::current_exception()); + } catch (...) { + promise.SetException(std::current_exception()); return; - } + } promise.SetValue(); - }); - } - - template <typename T, typename F> - inline void SetValue(TPromise<T>& promise, F&& func) { - try { - SetValueImpl(promise, func()); - } catch (...) { + }); + } + + template <typename T, typename F> + inline void SetValue(TPromise<T>& promise, F&& func) { + try { + SetValueImpl(promise, func()); + } catch (...) { const bool success = promise.TrySetException(std::current_exception()); if (Y_UNLIKELY(!success)) { throw; @@ -490,21 +490,21 @@ namespace NThreading { } } - template <typename F> - inline void SetValue(TPromise<void>& promise, F&& func, - std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { - try { - func(); - } catch (...) { - promise.SetException(std::current_exception()); + template <typename F> + inline void SetValue(TPromise<void>& promise, F&& func, + std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { + try { + func(); + } catch (...) { + promise.SetException(std::current_exception()); return; } promise.SetValue(); } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// class TFutureStateId { private: @@ -532,45 +532,45 @@ namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - template <typename T> + template <typename T> inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) + : State(state) { } - template <typename T> - inline void TFuture<T>::Swap(TFuture<T>& other) { - State.Swap(other.State); - } + template <typename T> + inline void TFuture<T>::Swap(TFuture<T>& other) { + State.Swap(other.State); + } - template <typename T> - inline bool TFuture<T>::HasValue() const { - return State && State->HasValue(); - } + template <typename T> + inline bool TFuture<T>::HasValue() const { + return State && State->HasValue(); + } - template <typename T> - inline const T& TFuture<T>::GetValue(TDuration timeout) const { - EnsureInitialized(); - return State->GetValue(timeout); + template <typename T> + inline const T& TFuture<T>::GetValue(TDuration timeout) const { + EnsureInitialized(); + return State->GetValue(timeout); } - template <typename T> - inline T TFuture<T>::ExtractValue(TDuration timeout) { - EnsureInitialized(); - return State->ExtractValue(timeout); - } + template <typename T> + inline T TFuture<T>::ExtractValue(TDuration timeout) { + EnsureInitialized(); + return State->ExtractValue(timeout); + } - template <typename T> - inline const T& TFuture<T>::GetValueSync() const { - return GetValue(TDuration::Max()); - } + template <typename T> + inline const T& TFuture<T>::GetValueSync() const { + return GetValue(TDuration::Max()); + } - template <typename T> - inline T TFuture<T>::ExtractValueSync() { - return ExtractValue(TDuration::Max()); - } + template <typename T> + inline T TFuture<T>::ExtractValueSync() { + return ExtractValue(TDuration::Max()); + } - template <typename T> + template <typename T> inline void TFuture<T>::TryRethrow() const { if (State) { State->TryRethrow(); @@ -578,40 +578,40 @@ namespace NThreading { } template <typename T> - inline bool TFuture<T>::HasException() const { - return State && State->HasException(); - } - - template <typename T> - inline void TFuture<T>::Wait() const { - EnsureInitialized(); - return State->Wait(); - } - - template <typename T> - inline bool TFuture<T>::Wait(TDuration timeout) const { - EnsureInitialized(); - return State->Wait(timeout); - } - - template <typename T> - inline bool TFuture<T>::Wait(TInstant deadline) const { - EnsureInitialized(); - return State->Wait(deadline); - } - - template <typename T> - template <typename F> - inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { - EnsureInitialized(); - if (!State->Subscribe(std::forward<F>(func))) { - func(*this); - } - return *this; - } - - template <typename T> - template <typename F> + inline bool TFuture<T>::HasException() const { + return State && State->HasException(); + } + + template <typename T> + inline void TFuture<T>::Wait() const { + EnsureInitialized(); + return State->Wait(); + } + + template <typename T> + inline bool TFuture<T>::Wait(TDuration timeout) const { + EnsureInitialized(); + return State->Wait(timeout); + } + + template <typename T> + inline bool TFuture<T>::Wait(TInstant deadline) const { + EnsureInitialized(); + return State->Wait(deadline); + } + + template <typename T> + template <typename F> + inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { + EnsureInitialized(); + if (!State->Subscribe(std::forward<F>(func))) { + func(*this); + } + return *this; + } + + template <typename T> + template <typename F> inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); } @@ -623,59 +623,59 @@ namespace NThreading { auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>(); Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable { NImpl::SetValue(promise, [&]() { return func(future); }); - }); - return promise; - } - - template <typename T> - inline TFuture<void> TFuture<T>::IgnoreResult() const { - auto promise = NewPromise(); - Subscribe([=](const TFuture<T>& future) mutable { + }); + return promise; + } + + template <typename T> + inline TFuture<void> TFuture<T>::IgnoreResult() const { + auto promise = NewPromise(); + Subscribe([=](const TFuture<T>& future) mutable { NImpl::SetValueImpl(promise, future); - }); - return promise; - } + }); + return promise; + } - template <typename T> - inline bool TFuture<T>::Initialized() const { - return bool(State); + template <typename T> + inline bool TFuture<T>::Initialized() const { + return bool(State); } - template <typename T> + template <typename T> inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } template <typename T> - inline void TFuture<T>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; + inline void TFuture<T>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) - { - } + : State(state) + { + } - inline void TFuture<void>::Swap(TFuture<void>& other) { - State.Swap(other.State); - } + inline void TFuture<void>::Swap(TFuture<void>& other) { + State.Swap(other.State); + } - inline bool TFuture<void>::HasValue() const { - return State && State->HasValue(); - } + inline bool TFuture<void>::HasValue() const { + return State && State->HasValue(); + } - inline void TFuture<void>::GetValue(TDuration timeout) const { - EnsureInitialized(); - State->GetValue(timeout); - } + inline void TFuture<void>::GetValue(TDuration timeout) const { + EnsureInitialized(); + State->GetValue(timeout); + } - inline void TFuture<void>::GetValueSync() const { - GetValue(TDuration::Max()); - } + inline void TFuture<void>::GetValueSync() const { + GetValue(TDuration::Max()); + } inline void TFuture<void>::TryRethrow() const { if (State) { @@ -683,35 +683,35 @@ namespace NThreading { } } - inline bool TFuture<void>::HasException() const { - return State && State->HasException(); - } + inline bool TFuture<void>::HasException() const { + return State && State->HasException(); + } - inline void TFuture<void>::Wait() const { - EnsureInitialized(); - return State->Wait(); - } + inline void TFuture<void>::Wait() const { + EnsureInitialized(); + return State->Wait(); + } - inline bool TFuture<void>::Wait(TDuration timeout) const { - EnsureInitialized(); - return State->Wait(timeout); - } + inline bool TFuture<void>::Wait(TDuration timeout) const { + EnsureInitialized(); + return State->Wait(timeout); + } - inline bool TFuture<void>::Wait(TInstant deadline) const { - EnsureInitialized(); - return State->Wait(deadline); - } + inline bool TFuture<void>::Wait(TInstant deadline) const { + EnsureInitialized(); + return State->Wait(deadline); + } - template <typename F> - inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { - EnsureInitialized(); - if (!State->Subscribe(std::forward<F>(func))) { - func(*this); - } - return *this; - } + template <typename F> + inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { + EnsureInitialized(); + if (!State->Subscribe(std::forward<F>(func))) { + func(*this); + } + return *this; + } - template <typename F> + template <typename F> inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); } @@ -722,82 +722,82 @@ namespace NThreading { auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>(); Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable { NImpl::SetValue(promise, [&]() { return func(future); }); - }); - return promise; - } - - template <typename R> - inline TFuture<R> TFuture<void>::Return(const R& value) const { - auto promise = NewPromise<R>(); - Subscribe([=](const TFuture<void>& future) mutable { - try { + }); + return promise; + } + + template <typename R> + inline TFuture<R> TFuture<void>::Return(const R& value) const { + auto promise = NewPromise<R>(); + Subscribe([=](const TFuture<void>& future) mutable { + try { future.TryRethrow(); - } catch (...) { - promise.SetException(std::current_exception()); + } catch (...) { + promise.SetException(std::current_exception()); return; - } + } promise.SetValue(value); - }); - return promise; + }); + return promise; } - inline bool TFuture<void>::Initialized() const { - return bool(State); - } + inline bool TFuture<void>::Initialized() const { + return bool(State); + } inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } - inline void TFuture<void>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; + inline void TFuture<void>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> + template <typename T> inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) - { - } - - template <typename T> - inline void TPromise<T>::Swap(TPromise<T>& other) { - State.Swap(other.State); - } - - template <typename T> - inline const T& TPromise<T>::GetValue() const { - EnsureInitialized(); - return State->GetValue(); - } - - template <typename T> - inline T TPromise<T>::ExtractValue() { - EnsureInitialized(); - return State->ExtractValue(); - } - - template <typename T> - inline bool TPromise<T>::HasValue() const { - return State && State->HasValue(); - } - - template <typename T> - inline void TPromise<T>::SetValue(const T& value) { - EnsureInitialized(); - State->SetValue(value); - } - - template <typename T> - inline void TPromise<T>::SetValue(T&& value) { - EnsureInitialized(); - State->SetValue(std::move(value)); - } - - template <typename T> + : State(state) + { + } + + template <typename T> + inline void TPromise<T>::Swap(TPromise<T>& other) { + State.Swap(other.State); + } + + template <typename T> + inline const T& TPromise<T>::GetValue() const { + EnsureInitialized(); + return State->GetValue(); + } + + template <typename T> + inline T TPromise<T>::ExtractValue() { + EnsureInitialized(); + return State->ExtractValue(); + } + + template <typename T> + inline bool TPromise<T>::HasValue() const { + return State && State->HasValue(); + } + + template <typename T> + inline void TPromise<T>::SetValue(const T& value) { + EnsureInitialized(); + State->SetValue(value); + } + + template <typename T> + inline void TPromise<T>::SetValue(T&& value) { + EnsureInitialized(); + State->SetValue(std::move(value)); + } + + template <typename T> inline bool TPromise<T>::TrySetValue(const T& value) { EnsureInitialized(); return State->TrySetValue(value); @@ -817,75 +817,75 @@ namespace NThreading { } template <typename T> - inline bool TPromise<T>::HasException() const { - return State && State->HasException(); - } + inline bool TPromise<T>::HasException() const { + return State && State->HasException(); + } - template <typename T> - inline void TPromise<T>::SetException(const TString& e) { - EnsureInitialized(); - State->SetException(std::make_exception_ptr(yexception() << e)); - } + template <typename T> + inline void TPromise<T>::SetException(const TString& e) { + EnsureInitialized(); + State->SetException(std::make_exception_ptr(yexception() << e)); + } - template <typename T> - inline void TPromise<T>::SetException(std::exception_ptr e) { - EnsureInitialized(); - State->SetException(std::move(e)); - } + template <typename T> + inline void TPromise<T>::SetException(std::exception_ptr e) { + EnsureInitialized(); + State->SetException(std::move(e)); + } - template <typename T> + template <typename T> inline bool TPromise<T>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); } template <typename T> - inline TFuture<T> TPromise<T>::GetFuture() const { - EnsureInitialized(); - return TFuture<T>(State); - } + inline TFuture<T> TPromise<T>::GetFuture() const { + EnsureInitialized(); + return TFuture<T>(State); + } - template <typename T> - inline TPromise<T>::operator TFuture<T>() const { - return GetFuture(); - } + template <typename T> + inline TPromise<T>::operator TFuture<T>() const { + return GetFuture(); + } - template <typename T> - inline bool TPromise<T>::Initialized() const { - return bool(State); - } + template <typename T> + inline bool TPromise<T>::Initialized() const { + return bool(State); + } - template <typename T> - inline void TPromise<T>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; - } - } + template <typename T> + inline void TPromise<T>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; + } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) - { - } + : State(state) + { + } - inline void TPromise<void>::Swap(TPromise<void>& other) { - State.Swap(other.State); - } + inline void TPromise<void>::Swap(TPromise<void>& other) { + State.Swap(other.State); + } - inline void TPromise<void>::GetValue() const { - EnsureInitialized(); - State->GetValue(); - } + inline void TPromise<void>::GetValue() const { + EnsureInitialized(); + State->GetValue(); + } - inline bool TPromise<void>::HasValue() const { - return State && State->HasValue(); - } + inline bool TPromise<void>::HasValue() const { + return State && State->HasValue(); + } - inline void TPromise<void>::SetValue() { - EnsureInitialized(); - State->SetValue(); - } + inline void TPromise<void>::SetValue() { + EnsureInitialized(); + State->SetValue(); + } inline bool TPromise<void>::TrySetValue() { EnsureInitialized(); @@ -898,78 +898,78 @@ namespace NThreading { } } - inline bool TPromise<void>::HasException() const { - return State && State->HasException(); - } + inline bool TPromise<void>::HasException() const { + return State && State->HasException(); + } - inline void TPromise<void>::SetException(const TString& e) { - EnsureInitialized(); - State->SetException(std::make_exception_ptr(yexception() << e)); - } + inline void TPromise<void>::SetException(const TString& e) { + EnsureInitialized(); + State->SetException(std::make_exception_ptr(yexception() << e)); + } - inline void TPromise<void>::SetException(std::exception_ptr e) { - EnsureInitialized(); - State->SetException(std::move(e)); - } + inline void TPromise<void>::SetException(std::exception_ptr e) { + EnsureInitialized(); + State->SetException(std::move(e)); + } inline bool TPromise<void>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); } - inline TFuture<void> TPromise<void>::GetFuture() const { - EnsureInitialized(); - return TFuture<void>(State); - } + inline TFuture<void> TPromise<void>::GetFuture() const { + EnsureInitialized(); + return TFuture<void>(State); + } - inline TPromise<void>::operator TFuture<void>() const { - return GetFuture(); - } + inline TPromise<void>::operator TFuture<void>() const { + return GetFuture(); + } - inline bool TPromise<void>::Initialized() const { - return bool(State); - } + inline bool TPromise<void>::Initialized() const { + return bool(State); + } - inline void TPromise<void>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; - } - } + inline void TPromise<void>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; + } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - inline TPromise<T> NewPromise() { - return {new NImpl::TFutureState<T>()}; + template <typename T> + inline TPromise<T> NewPromise() { + return {new NImpl::TFutureState<T>()}; } - inline TPromise<void> NewPromise() { - return {new NImpl::TFutureState<void>()}; - } + inline TPromise<void> NewPromise() { + return {new NImpl::TFutureState<void>()}; + } - template <typename T> - inline TFuture<T> MakeFuture(const T& value) { - return {new NImpl::TFutureState<T>(value)}; - } + template <typename T> + inline TFuture<T> MakeFuture(const T& value) { + return {new NImpl::TFutureState<T>(value)}; + } - template <typename T> - inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { - return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; - } + template <typename T> + inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { + return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; + } - template <typename T> - inline TFuture<T> MakeFuture() { - struct TCache { - TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())}; + template <typename T> + inline TFuture<T> MakeFuture() { + struct TCache { + TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())}; TCache() { // Immediately advance state from ValueSet to ValueRead. // This should prevent corrupting shared value with an ExtractValue() call. Y_UNUSED(Instance.GetValue()); } - }; - return Singleton<TCache>()->Instance; - } + }; + return Singleton<TCache>()->Instance; + } template <typename T> inline TFuture<T> MakeErrorFuture(std::exception_ptr exception) @@ -977,10 +977,10 @@ namespace NThreading { return {new NImpl::TFutureState<T>(std::move(exception), NImpl::TError::Error)}; } - inline TFuture<void> MakeFuture() { - struct TCache { - TFuture<void> Instance{new NImpl::TFutureState<void>(true)}; - }; - return Singleton<TCache>()->Instance; - } + inline TFuture<void> MakeFuture() { + struct TCache { + TFuture<void> Instance{new NImpl::TFutureState<void>(true)}; + }; + return Singleton<TCache>()->Instance; + } } diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 12623389ca..2e82bb953e 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -12,51 +12,51 @@ #include <util/system/spinlock.h> namespace NThreading { - //////////////////////////////////////////////////////////////////////////////// - - struct TFutureException: public yexception {}; - - // creates unset promise - template <typename T> - TPromise<T> NewPromise(); - TPromise<void> NewPromise(); - - // creates preset future - template <typename T> - TFuture<T> MakeFuture(const T& value); - template <typename T> - TFuture<std::remove_reference_t<T>> MakeFuture(T&& value); - template <typename T> - TFuture<T> MakeFuture(); + //////////////////////////////////////////////////////////////////////////////// + + struct TFutureException: public yexception {}; + + // creates unset promise + template <typename T> + TPromise<T> NewPromise(); + TPromise<void> NewPromise(); + + // creates preset future + template <typename T> + TFuture<T> MakeFuture(const T& value); + template <typename T> + TFuture<std::remove_reference_t<T>> MakeFuture(T&& value); + template <typename T> + TFuture<T> MakeFuture(); template <typename T> TFuture<T> MakeErrorFuture(std::exception_ptr exception); - TFuture<void> MakeFuture(); + TFuture<void> MakeFuture(); - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - namespace NImpl { - template <typename T> - class TFutureState; + namespace NImpl { + template <typename T> + class TFutureState; - template <typename T> - struct TFutureType { - using TType = T; - }; + template <typename T> + struct TFutureType { + using TType = T; + }; - template <typename T> - struct TFutureType<TFuture<T>> { - using TType = typename TFutureType<T>::TType; - }; + template <typename T> + struct TFutureType<TFuture<T>> { + using TType = typename TFutureType<T>::TType; + }; template <typename F, typename T> struct TFutureCallResult { // NOTE: separate class for msvc compatibility using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>())); }; - } + } - template <typename F> - using TFutureType = typename NImpl::TFutureType<F>::TType; + template <typename F> + using TFutureType = typename NImpl::TFutureType<F>::TType; template <typename F, typename T> using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType; @@ -64,16 +64,16 @@ namespace NThreading { //! Type of the future/promise state identifier class TFutureStateId; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - class TFuture { - using TFutureState = NImpl::TFutureState<T>; + template <typename T> + class TFuture { + using TFutureState = NImpl::TFutureState<T>; - private: - TIntrusivePtr<TFutureState> State; + private: + TIntrusivePtr<TFutureState> State; - public: + public: using value_type = T; TFuture() noexcept = default; @@ -83,54 +83,54 @@ namespace NThreading { TFuture<T>& operator=(const TFuture<T>& other) noexcept = default; TFuture<T>& operator=(TFuture<T>&& other) noexcept = default; - void Swap(TFuture<T>& other); + void Swap(TFuture<T>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - const T& GetValue(TDuration timeout = TDuration::Zero()) const; - const T& GetValueSync() const; - T ExtractValue(TDuration timeout = TDuration::Zero()); - T ExtractValueSync(); + bool HasValue() const; + const T& GetValue(TDuration timeout = TDuration::Zero()) const; + const T& GetValueSync() const; + T ExtractValue(TDuration timeout = TDuration::Zero()); + T ExtractValueSync(); void TryRethrow() const; - bool HasException() const; + bool HasException() const; - void Wait() const; - bool Wait(TDuration timeout) const; - bool Wait(TInstant deadline) const; + void Wait() const; + bool Wait(TDuration timeout) const; + bool Wait(TInstant deadline) const; - template <typename F> - const TFuture<T>& Subscribe(F&& callback) const; + template <typename F> + const TFuture<T>& Subscribe(F&& callback) const; // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely - template <typename F> + template <typename F> const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept; template <typename F> TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const; - TFuture<void> IgnoreResult() const; + TFuture<void> IgnoreResult() const; //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death **/ TMaybe<TFutureStateId> StateId() const noexcept; - void EnsureInitialized() const; - }; + void EnsureInitialized() const; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <> - class TFuture<void> { - using TFutureState = NImpl::TFutureState<void>; + template <> + class TFuture<void> { + using TFutureState = NImpl::TFutureState<void>; - private: + private: TIntrusivePtr<TFutureState> State = nullptr; - public: + public: using value_type = void; TFuture() noexcept = default; @@ -140,34 +140,34 @@ namespace NThreading { TFuture<void>& operator=(const TFuture<void>& other) noexcept = default; TFuture<void>& operator=(TFuture<void>&& other) noexcept = default; - void Swap(TFuture<void>& other); + void Swap(TFuture<void>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - void GetValue(TDuration timeout = TDuration::Zero()) const; - void GetValueSync() const; + bool HasValue() const; + void GetValue(TDuration timeout = TDuration::Zero()) const; + void GetValueSync() const; void TryRethrow() const; - bool HasException() const; + bool HasException() const; - void Wait() const; - bool Wait(TDuration timeout) const; - bool Wait(TInstant deadline) const; + void Wait() const; + bool Wait(TDuration timeout) const; + bool Wait(TInstant deadline) const; - template <typename F> - const TFuture<void>& Subscribe(F&& callback) const; + template <typename F> + const TFuture<void>& Subscribe(F&& callback) const; // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely - template <typename F> + template <typename F> const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept; template <typename F> TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const; - template <typename R> - TFuture<R> Return(const R& value) const; + template <typename R> + TFuture<R> Return(const R& value) const; TFuture<void> IgnoreResult() const { return *this; @@ -178,19 +178,19 @@ namespace NThreading { **/ TMaybe<TFutureStateId> StateId() const noexcept; - void EnsureInitialized() const; - }; + void EnsureInitialized() const; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - class TPromise { - using TFutureState = NImpl::TFutureState<T>; + template <typename T> + class TPromise { + using TFutureState = NImpl::TFutureState<T>; - private: + private: TIntrusivePtr<TFutureState> State = nullptr; - public: + public: TPromise() noexcept = default; TPromise(const TPromise<T>& other) noexcept = default; TPromise(TPromise<T>&& other) noexcept = default; @@ -198,43 +198,43 @@ namespace NThreading { TPromise<T>& operator=(const TPromise<T>& other) noexcept = default; TPromise<T>& operator=(TPromise<T>&& other) noexcept = default; - void Swap(TPromise<T>& other); + void Swap(TPromise<T>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - const T& GetValue() const; - T ExtractValue(); + bool HasValue() const; + const T& GetValue() const; + T ExtractValue(); - void SetValue(const T& value); - void SetValue(T&& value); + void SetValue(const T& value); + void SetValue(T&& value); bool TrySetValue(const T& value); bool TrySetValue(T&& value); void TryRethrow() const; - bool HasException() const; - void SetException(const TString& e); - void SetException(std::exception_ptr e); + bool HasException() const; + void SetException(const TString& e); + void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - TFuture<T> GetFuture() const; - operator TFuture<T>() const; + TFuture<T> GetFuture() const; + operator TFuture<T>() const; - private: - void EnsureInitialized() const; - }; + private: + void EnsureInitialized() const; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <> - class TPromise<void> { - using TFutureState = NImpl::TFutureState<void>; + template <> + class TPromise<void> { + using TFutureState = NImpl::TFutureState<void>; - private: - TIntrusivePtr<TFutureState> State; + private: + TIntrusivePtr<TFutureState> State; - public: + public: TPromise() noexcept = default; TPromise(const TPromise<void>& other) noexcept = default; TPromise(TPromise<void>&& other) noexcept = default; @@ -242,30 +242,30 @@ namespace NThreading { TPromise<void>& operator=(const TPromise<void>& other) noexcept = default; TPromise<void>& operator=(TPromise<void>&& other) noexcept = default; - void Swap(TPromise<void>& other); + void Swap(TPromise<void>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - void GetValue() const; + bool HasValue() const; + void GetValue() const; - void SetValue(); + void SetValue(); bool TrySetValue(); void TryRethrow() const; - bool HasException() const; - void SetException(const TString& e); - void SetException(std::exception_ptr e); + bool HasException() const; + void SetException(const TString& e); + void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - TFuture<void> GetFuture() const; - operator TFuture<void>() const; + TFuture<void> GetFuture() const; + operator TFuture<void>() const; - private: - void EnsureInitialized() const; - }; + private: + void EnsureInitialized() const; + }; -} +} #define INCLUDE_FUTURE_INL_H #include "future-inl.h" diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp index 636b113f2f..05950a568d 100644 --- a/library/cpp/threading/future/future_ut.cpp +++ b/library/cpp/threading/future/future_ut.cpp @@ -62,180 +62,180 @@ namespace { } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// Y_UNIT_TEST_SUITE(TFutureTest) { Y_UNIT_TEST(ShouldInitiallyHasNoValue) { - TPromise<int> promise; - UNIT_ASSERT(!promise.HasValue()); + TPromise<int> promise; + UNIT_ASSERT(!promise.HasValue()); - promise = NewPromise<int>(); - UNIT_ASSERT(!promise.HasValue()); + promise = NewPromise<int>(); + UNIT_ASSERT(!promise.HasValue()); - TFuture<int> future; - UNIT_ASSERT(!future.HasValue()); + TFuture<int> future; + UNIT_ASSERT(!future.HasValue()); - future = promise.GetFuture(); - UNIT_ASSERT(!future.HasValue()); - } + future = promise.GetFuture(); + UNIT_ASSERT(!future.HasValue()); + } Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) { - TPromise<void> promise; - UNIT_ASSERT(!promise.HasValue()); + TPromise<void> promise; + UNIT_ASSERT(!promise.HasValue()); - promise = NewPromise(); - UNIT_ASSERT(!promise.HasValue()); + promise = NewPromise(); + UNIT_ASSERT(!promise.HasValue()); - TFuture<void> future; - UNIT_ASSERT(!future.HasValue()); + TFuture<void> future; + UNIT_ASSERT(!future.HasValue()); - future = promise.GetFuture(); - UNIT_ASSERT(!future.HasValue()); - } + future = promise.GetFuture(); + UNIT_ASSERT(!future.HasValue()); + } Y_UNIT_TEST(ShouldStoreValue) { - TPromise<int> promise = NewPromise<int>(); - promise.SetValue(123); - UNIT_ASSERT(promise.HasValue()); - UNIT_ASSERT_EQUAL(promise.GetValue(), 123); + TPromise<int> promise = NewPromise<int>(); + promise.SetValue(123); + UNIT_ASSERT(promise.HasValue()); + UNIT_ASSERT_EQUAL(promise.GetValue(), 123); - TFuture<int> future = promise.GetFuture(); - UNIT_ASSERT(future.HasValue()); - UNIT_ASSERT_EQUAL(future.GetValue(), 123); + TFuture<int> future = promise.GetFuture(); + UNIT_ASSERT(future.HasValue()); + UNIT_ASSERT_EQUAL(future.GetValue(), 123); - future = MakeFuture(345); - UNIT_ASSERT(future.HasValue()); - UNIT_ASSERT_EQUAL(future.GetValue(), 345); - } + future = MakeFuture(345); + UNIT_ASSERT(future.HasValue()); + UNIT_ASSERT_EQUAL(future.GetValue(), 345); + } Y_UNIT_TEST(ShouldStoreValueVoid) { - TPromise<void> promise = NewPromise(); - promise.SetValue(); - UNIT_ASSERT(promise.HasValue()); + TPromise<void> promise = NewPromise(); + promise.SetValue(); + UNIT_ASSERT(promise.HasValue()); - TFuture<void> future = promise.GetFuture(); - UNIT_ASSERT(future.HasValue()); + TFuture<void> future = promise.GetFuture(); + UNIT_ASSERT(future.HasValue()); - future = MakeFuture(); - UNIT_ASSERT(future.HasValue()); - } + future = MakeFuture(); + UNIT_ASSERT(future.HasValue()); + } - struct TTestCallback { - int Value; + struct TTestCallback { + int Value; - TTestCallback(int value) - : Value(value) - { - } + TTestCallback(int value) + : Value(value) + { + } - void Callback(const TFuture<int>& future) { - Value += future.GetValue(); - } + void Callback(const TFuture<int>& future) { + Value += future.GetValue(); + } - int Func(const TFuture<int>& future) { - return (Value += future.GetValue()); - } + int Func(const TFuture<int>& future) { + return (Value += future.GetValue()); + } - void VoidFunc(const TFuture<int>& future) { - future.GetValue(); - } + void VoidFunc(const TFuture<int>& future) { + future.GetValue(); + } - TFuture<int> FutureFunc(const TFuture<int>& future) { - return MakeFuture(Value += future.GetValue()); - } + TFuture<int> FutureFunc(const TFuture<int>& future) { + return MakeFuture(Value += future.GetValue()); + } - TPromise<void> Signal = NewPromise(); - TFuture<void> FutureVoidFunc(const TFuture<int>& future) { - future.GetValue(); - return Signal; - } - }; + TPromise<void> Signal = NewPromise(); + TFuture<void> FutureVoidFunc(const TFuture<int>& future) { + future.GetValue(); + return Signal; + } + }; Y_UNIT_TEST(ShouldInvokeCallback) { - TPromise<int> promise = NewPromise<int>(); + TPromise<int> promise = NewPromise<int>(); - TTestCallback callback(123); - TFuture<int> future = promise.GetFuture() - .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); }); + TTestCallback callback(123); + TFuture<int> future = promise.GetFuture() + .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); }); - promise.SetValue(456); - UNIT_ASSERT_EQUAL(future.GetValue(), 456); - UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); - } + promise.SetValue(456); + UNIT_ASSERT_EQUAL(future.GetValue(), 456); + UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); + } Y_UNIT_TEST(ShouldApplyFunc) { - TPromise<int> promise = NewPromise<int>(); + TPromise<int> promise = NewPromise<int>(); - TTestCallback callback(123); - TFuture<int> future = promise.GetFuture() + TTestCallback callback(123); + TFuture<int> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.Func(theFuture); }); - promise.SetValue(456); - UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); - UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); - } + promise.SetValue(456); + UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); + UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); + } Y_UNIT_TEST(ShouldApplyVoidFunc) { - TPromise<int> promise = NewPromise<int>(); + TPromise<int> promise = NewPromise<int>(); - TTestCallback callback(123); - TFuture<void> future = promise.GetFuture() + TTestCallback callback(123); + TFuture<void> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.VoidFunc(theFuture); }); - promise.SetValue(456); - UNIT_ASSERT(future.HasValue()); - } + promise.SetValue(456); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldApplyFutureFunc) { - TPromise<int> promise = NewPromise<int>(); + TPromise<int> promise = NewPromise<int>(); - TTestCallback callback(123); - TFuture<int> future = promise.GetFuture() + TTestCallback callback(123); + TFuture<int> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.FutureFunc(theFuture); }); - promise.SetValue(456); - UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); - UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); - } + promise.SetValue(456); + UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); + UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); + } Y_UNIT_TEST(ShouldApplyFutureVoidFunc) { - TPromise<int> promise = NewPromise<int>(); + TPromise<int> promise = NewPromise<int>(); - TTestCallback callback(123); - TFuture<void> future = promise.GetFuture() + TTestCallback callback(123); + TFuture<void> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.FutureVoidFunc(theFuture); }); - promise.SetValue(456); - UNIT_ASSERT(!future.HasValue()); + promise.SetValue(456); + UNIT_ASSERT(!future.HasValue()); - callback.Signal.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + callback.Signal.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldIgnoreResultIfAsked) { - TPromise<int> promise = NewPromise<int>(); + TPromise<int> promise = NewPromise<int>(); - TTestCallback callback(123); - TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42); + TTestCallback callback(123); + TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42); - promise.SetValue(456); - UNIT_ASSERT_EQUAL(future.GetValue(), 42); - } + promise.SetValue(456); + UNIT_ASSERT_EQUAL(future.GetValue(), 42); + } - class TCustomException: public yexception { - }; + class TCustomException: public yexception { + }; Y_UNIT_TEST(ShouldRethrowException) { - TPromise<int> promise = NewPromise<int>(); - try { - ythrow TCustomException(); - } catch (...) { - promise.SetException(std::current_exception()); - } - - UNIT_ASSERT(!promise.HasValue()); - UNIT_ASSERT(promise.HasException()); - UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException); + TPromise<int> promise = NewPromise<int>(); + try { + ythrow TCustomException(); + } catch (...) { + promise.SetException(std::current_exception()); + } + + UNIT_ASSERT(!promise.HasValue()); + UNIT_ASSERT(promise.HasException()); + UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException); UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException); } @@ -261,36 +261,36 @@ namespace { Y_UNIT_TEST(ShouldWaitExceptionOrAll) { - TPromise<void> promise1 = NewPromise(); - TPromise<void> promise2 = NewPromise(); + TPromise<void> promise1 = NewPromise(); + TPromise<void> promise2 = NewPromise(); TFuture<void> future = WaitExceptionOrAll(promise1, promise2); - UNIT_ASSERT(!future.HasValue()); + UNIT_ASSERT(!future.HasValue()); - promise1.SetValue(); - UNIT_ASSERT(!future.HasValue()); + promise1.SetValue(); + UNIT_ASSERT(!future.HasValue()); - promise2.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + promise2.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitExceptionOrAllVector) { - TPromise<void> promise1 = NewPromise(); - TPromise<void> promise2 = NewPromise(); + TPromise<void> promise1 = NewPromise(); + TPromise<void> promise2 = NewPromise(); - TVector<TFuture<void>> promises; - promises.push_back(promise1); - promises.push_back(promise2); + TVector<TFuture<void>> promises; + promises.push_back(promise1); + promises.push_back(promise2); TFuture<void> future = WaitExceptionOrAll(promises); - UNIT_ASSERT(!future.HasValue()); + UNIT_ASSERT(!future.HasValue()); - promise1.SetValue(); - UNIT_ASSERT(!future.HasValue()); + promise1.SetValue(); + UNIT_ASSERT(!future.HasValue()); - promise2.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + promise2.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorWithValueType) { TPromise<int> promise1 = NewPromise<int>(); @@ -311,47 +311,47 @@ namespace { } Y_UNIT_TEST(ShouldWaitExceptionOrAllList) { - TPromise<void> promise1 = NewPromise(); - TPromise<void> promise2 = NewPromise(); + TPromise<void> promise1 = NewPromise(); + TPromise<void> promise2 = NewPromise(); - std::list<TFuture<void>> promises; - promises.push_back(promise1); - promises.push_back(promise2); + std::list<TFuture<void>> promises; + promises.push_back(promise1); + promises.push_back(promise2); TFuture<void> future = WaitExceptionOrAll(promises); - UNIT_ASSERT(!future.HasValue()); + UNIT_ASSERT(!future.HasValue()); - promise1.SetValue(); - UNIT_ASSERT(!future.HasValue()); + promise1.SetValue(); + UNIT_ASSERT(!future.HasValue()); - promise2.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + promise2.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorEmpty) { - TVector<TFuture<void>> promises; + TVector<TFuture<void>> promises; TFuture<void> future = WaitExceptionOrAll(promises); - UNIT_ASSERT(future.HasValue()); - } + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitAnyVector) { - TPromise<void> promise1 = NewPromise(); - TPromise<void> promise2 = NewPromise(); + TPromise<void> promise1 = NewPromise(); + TPromise<void> promise2 = NewPromise(); - TVector<TFuture<void>> promises; - promises.push_back(promise1); - promises.push_back(promise2); + TVector<TFuture<void>> promises; + promises.push_back(promise1); + promises.push_back(promise2); - TFuture<void> future = WaitAny(promises); - UNIT_ASSERT(!future.HasValue()); + TFuture<void> future = WaitAny(promises); + UNIT_ASSERT(!future.HasValue()); - promise1.SetValue(); - UNIT_ASSERT(future.HasValue()); + promise1.SetValue(); + UNIT_ASSERT(future.HasValue()); - promise2.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + promise2.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitAnyVectorWithValueType) { @@ -373,112 +373,112 @@ namespace { } Y_UNIT_TEST(ShouldWaitAnyList) { - TPromise<void> promise1 = NewPromise(); - TPromise<void> promise2 = NewPromise(); + TPromise<void> promise1 = NewPromise(); + TPromise<void> promise2 = NewPromise(); - std::list<TFuture<void>> promises; - promises.push_back(promise1); - promises.push_back(promise2); + std::list<TFuture<void>> promises; + promises.push_back(promise1); + promises.push_back(promise2); - TFuture<void> future = WaitAny(promises); - UNIT_ASSERT(!future.HasValue()); + TFuture<void> future = WaitAny(promises); + UNIT_ASSERT(!future.HasValue()); - promise1.SetValue(); - UNIT_ASSERT(future.HasValue()); + promise1.SetValue(); + UNIT_ASSERT(future.HasValue()); - promise2.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + promise2.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) { - TVector<TFuture<void>> promises; + TVector<TFuture<void>> promises; - TFuture<void> future = WaitAny(promises); - UNIT_ASSERT(future.HasValue()); - } + TFuture<void> future = WaitAny(promises); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldWaitAny) { - TPromise<void> promise1 = NewPromise(); - TPromise<void> promise2 = NewPromise(); + TPromise<void> promise1 = NewPromise(); + TPromise<void> promise2 = NewPromise(); - TFuture<void> future = WaitAny(promise1, promise2); - UNIT_ASSERT(!future.HasValue()); + TFuture<void> future = WaitAny(promise1, promise2); + UNIT_ASSERT(!future.HasValue()); - promise1.SetValue(); - UNIT_ASSERT(future.HasValue()); + promise1.SetValue(); + UNIT_ASSERT(future.HasValue()); - promise2.SetValue(); - UNIT_ASSERT(future.HasValue()); - } + promise2.SetValue(); + UNIT_ASSERT(future.HasValue()); + } Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) { - // compileability test - struct TRec { - explicit TRec(int) { - } - }; + // compileability test + struct TRec { + explicit TRec(int) { + } + }; - auto promise = NewPromise<TRec>(); - promise.SetValue(TRec(1)); + auto promise = NewPromise<TRec>(); + promise.SetValue(TRec(1)); - auto future = MakeFuture(TRec(1)); - const auto& rec = future.GetValue(); - Y_UNUSED(rec); - } + auto future = MakeFuture(TRec(1)); + const auto& rec = future.GetValue(); + Y_UNUSED(rec); + } Y_UNIT_TEST(ShouldStoreMovableTypes) { - // compileability test - struct TRec : TMoveOnly { - explicit TRec(int) { - } - }; + // compileability test + struct TRec : TMoveOnly { + explicit TRec(int) { + } + }; - auto promise = NewPromise<TRec>(); - promise.SetValue(TRec(1)); + auto promise = NewPromise<TRec>(); + promise.SetValue(TRec(1)); - auto future = MakeFuture(TRec(1)); - const auto& rec = future.GetValue(); - Y_UNUSED(rec); - } + auto future = MakeFuture(TRec(1)); + const auto& rec = future.GetValue(); + Y_UNUSED(rec); + } Y_UNIT_TEST(ShouldMoveMovableTypes) { - // compileability test - struct TRec : TMoveOnly { - explicit TRec(int) { - } - }; + // compileability test + struct TRec : TMoveOnly { + explicit TRec(int) { + } + }; - auto promise = NewPromise<TRec>(); - promise.SetValue(TRec(1)); + auto promise = NewPromise<TRec>(); + promise.SetValue(TRec(1)); - auto future = MakeFuture(TRec(1)); - auto rec = future.ExtractValue(); - Y_UNUSED(rec); - } + auto future = MakeFuture(TRec(1)); + auto rec = future.ExtractValue(); + Y_UNUSED(rec); + } Y_UNIT_TEST(ShouldNotExtractAfterGet) { - TPromise<int> promise = NewPromise<int>(); - promise.SetValue(123); - UNIT_ASSERT(promise.HasValue()); - UNIT_ASSERT_EQUAL(promise.GetValue(), 123); - UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); - } + TPromise<int> promise = NewPromise<int>(); + promise.SetValue(123); + UNIT_ASSERT(promise.HasValue()); + UNIT_ASSERT_EQUAL(promise.GetValue(), 123); + UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); + } Y_UNIT_TEST(ShouldNotGetAfterExtract) { - TPromise<int> promise = NewPromise<int>(); - promise.SetValue(123); - UNIT_ASSERT(promise.HasValue()); - UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123); - UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException); - } + TPromise<int> promise = NewPromise<int>(); + promise.SetValue(123); + UNIT_ASSERT(promise.HasValue()); + UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123); + UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException); + } Y_UNIT_TEST(ShouldNotExtractAfterExtract) { - TPromise<int> promise = NewPromise<int>(); - promise.SetValue(123); - UNIT_ASSERT(promise.HasValue()); - UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123); - UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); - } + TPromise<int> promise = NewPromise<int>(); + promise.SetValue(123); + UNIT_ASSERT(promise.HasValue()); + UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123); + UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); + } Y_UNIT_TEST(ShouldNotExtractFromSharedDefault) { UNIT_CHECK_GENERATED_EXCEPTION(MakeFuture<int>().ExtractValue(), TFutureException); diff --git a/library/cpp/threading/future/legacy_future.h b/library/cpp/threading/future/legacy_future.h index c699aadf5c..6f1eabad73 100644 --- a/library/cpp/threading/future/legacy_future.h +++ b/library/cpp/threading/future/legacy_future.h @@ -4,79 +4,79 @@ #include "future.h" #include <util/thread/factory.h> - + #include <functional> - + namespace NThreading { template <typename TR, bool IgnoreException> class TLegacyFuture: public IThreadFactory::IThreadAble, TNonCopyable { - public: - typedef TR(TFunctionSignature)(); - using TFunctionObjectType = std::function<TFunctionSignature>; - using TResult = typename TFunctionObjectType::result_type; + public: + typedef TR(TFunctionSignature)(); + using TFunctionObjectType = std::function<TFunctionSignature>; + using TResult = typename TFunctionObjectType::result_type; - private: - TFunctionObjectType Func_; - TPromise<TResult> Result_; + private: + TFunctionObjectType Func_; + TPromise<TResult> Result_; THolder<IThreadFactory::IThread> Thread_; - public: + public: inline TLegacyFuture(const TFunctionObjectType func, IThreadFactory* pool = SystemThreadFactory()) - : Func_(func) - , Result_(NewPromise<TResult>()) - , Thread_(pool->Run(this)) - { - } + : Func_(func) + , Result_(NewPromise<TResult>()) + , Thread_(pool->Run(this)) + { + } - inline ~TLegacyFuture() override { - this->Join(); - } + inline ~TLegacyFuture() override { + this->Join(); + } - inline TResult Get() { - this->Join(); - return Result_.GetValue(); - } + inline TResult Get() { + this->Join(); + return Result_.GetValue(); + } - private: - inline void Join() { - if (Thread_) { - Thread_->Join(); - Thread_.Destroy(); - } + private: + inline void Join() { + if (Thread_) { + Thread_->Join(); + Thread_.Destroy(); + } } - template <typename Result, bool IgnoreException_> - struct TExecutor { - static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) { - if (IgnoreException_) { - try { - promise.SetValue(func()); - } catch (...) { - } - } else { + template <typename Result, bool IgnoreException_> + struct TExecutor { + static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) { + if (IgnoreException_) { + try { + promise.SetValue(func()); + } catch (...) { + } + } else { promise.SetValue(func()); } } - }; + }; - template <bool IgnoreException_> - struct TExecutor<void, IgnoreException_> { - static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) { - if (IgnoreException_) { - try { - func(); - promise.SetValue(); - } catch (...) { - } - } else { + template <bool IgnoreException_> + struct TExecutor<void, IgnoreException_> { + static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) { + if (IgnoreException_) { + try { + func(); + promise.SetValue(); + } catch (...) { + } + } else { func(); promise.SetValue(); } } - }; - - void DoExecute() override { - TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_); + }; + + void DoExecute() override { + TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_); } }; diff --git a/library/cpp/threading/future/legacy_future_ut.cpp b/library/cpp/threading/future/legacy_future_ut.cpp index 96b46ccebf..ff63db1725 100644 --- a/library/cpp/threading/future/legacy_future_ut.cpp +++ b/library/cpp/threading/future/legacy_future_ut.cpp @@ -4,69 +4,69 @@ namespace NThreading { Y_UNIT_TEST_SUITE(TLegacyFutureTest) { - int intf() { - return 17; - } + int intf() { + return 17; + } Y_UNIT_TEST(TestIntFunction) { - TLegacyFuture<int> f((&intf)); - UNIT_ASSERT_VALUES_EQUAL(17, f.Get()); - } + TLegacyFuture<int> f((&intf)); + UNIT_ASSERT_VALUES_EQUAL(17, f.Get()); + } - static int r; + static int r; - void voidf() { - r = 18; - } + void voidf() { + r = 18; + } Y_UNIT_TEST(TestVoidFunction) { - r = 0; - TLegacyFuture<> f((&voidf)); - f.Get(); - UNIT_ASSERT_VALUES_EQUAL(18, r); - } + r = 0; + TLegacyFuture<> f((&voidf)); + f.Get(); + UNIT_ASSERT_VALUES_EQUAL(18, r); + } - struct TSampleClass { - int mValue; + struct TSampleClass { + int mValue; - TSampleClass(int value) - : mValue(value) - { - } + TSampleClass(int value) + : mValue(value) + { + } - int Calc() { - return mValue + 1; - } - }; + int Calc() { + return mValue + 1; + } + }; Y_UNIT_TEST(TestMethod) { - TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3))); - UNIT_ASSERT_VALUES_EQUAL(4, f11.Get()); + TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3))); + UNIT_ASSERT_VALUES_EQUAL(4, f11.Get()); TLegacyFuture<int> f12(std::bind(&TSampleClass::Calc, TSampleClass(3)), SystemThreadFactory()); - UNIT_ASSERT_VALUES_EQUAL(4, f12.Get()); + UNIT_ASSERT_VALUES_EQUAL(4, f12.Get()); - TSampleClass c(5); + TSampleClass c(5); - TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c))); - UNIT_ASSERT_VALUES_EQUAL(6, f21.Get()); + TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c))); + UNIT_ASSERT_VALUES_EQUAL(6, f21.Get()); TLegacyFuture<int> f22(std::bind(&TSampleClass::Calc, std::ref(c)), SystemThreadFactory()); - UNIT_ASSERT_VALUES_EQUAL(6, f22.Get()); - } + UNIT_ASSERT_VALUES_EQUAL(6, f22.Get()); + } struct TSomeThreadPool: public IThreadFactory {}; Y_UNIT_TEST(TestFunction) { - std::function<int()> f((&intf)); + std::function<int()> f((&intf)); - UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get()); + UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get()); UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f, SystemThreadFactory()).Get()); - if (false) { - TSomeThreadPool* q = nullptr; - TLegacyFuture<int>(f, q); // just check compiles, do not start - } + if (false) { + TSomeThreadPool* q = nullptr; + TLegacyFuture<int>(f, q); // just check compiles, do not start + } } } diff --git a/library/cpp/threading/future/perf/main.cpp b/library/cpp/threading/future/perf/main.cpp index c7da5a51f3..5a0690af47 100644 --- a/library/cpp/threading/future/perf/main.cpp +++ b/library/cpp/threading/future/perf/main.cpp @@ -7,44 +7,44 @@ using namespace NThreading; template <typename T> -void TestAllocPromise(const NBench::NCpu::TParams& iface) { - for (const auto it : xrange(iface.Iterations())) { +void TestAllocPromise(const NBench::NCpu::TParams& iface) { + for (const auto it : xrange(iface.Iterations())) { Y_UNUSED(it); Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>()); } } template <typename T> -TPromise<T> SetPromise(T value) { +TPromise<T> SetPromise(T value) { auto promise = NewPromise<T>(); promise.SetValue(value); return promise; } template <typename T> -void TestSetPromise(const NBench::NCpu::TParams& iface, T value) { - for (const auto it : xrange(iface.Iterations())) { +void TestSetPromise(const NBench::NCpu::TParams& iface, T value) { + for (const auto it : xrange(iface.Iterations())) { Y_UNUSED(it); Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value)); } } -Y_CPU_BENCHMARK(AllocPromiseVoid, iface) { +Y_CPU_BENCHMARK(AllocPromiseVoid, iface) { TestAllocPromise<void>(iface); } -Y_CPU_BENCHMARK(AllocPromiseUI64, iface) { +Y_CPU_BENCHMARK(AllocPromiseUI64, iface) { TestAllocPromise<ui64>(iface); } -Y_CPU_BENCHMARK(AllocPromiseStroka, iface) { +Y_CPU_BENCHMARK(AllocPromiseStroka, iface) { TestAllocPromise<TString>(iface); } -Y_CPU_BENCHMARK(SetPromiseUI64, iface) { +Y_CPU_BENCHMARK(SetPromiseUI64, iface) { TestSetPromise<ui64>(iface, 1234567890ull); } -Y_CPU_BENCHMARK(SetPromiseStroka, iface) { +Y_CPU_BENCHMARK(SetPromiseStroka, iface) { TestSetPromise<TString>(iface, "test test test"); } diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h index 9d056ff777..2753d5446c 100644 --- a/library/cpp/threading/future/wait/wait-inl.h +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -2,10 +2,10 @@ #if !defined(INCLUDE_FUTURE_INL_H) #error "you should never include wait-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H +#endif // INCLUDE_FUTURE_INL_H namespace NThreading { - namespace NImpl { + namespace NImpl { template <typename TContainer> TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) { TVector<TFuture<void>> voidFutures; @@ -17,19 +17,19 @@ namespace NThreading { return voidFutures; } - } + } template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) { return WaitAll(NImpl::ToVoidFutures(futures)); } - template <typename TContainer> + template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) { return WaitExceptionOrAll(NImpl::ToVoidFutures(futures)); } - template <typename TContainer> + template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) { return WaitAny(NImpl::ToVoidFutures(futures)); } diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp index e0a1c3bbd3..a173833a7f 100644 --- a/library/cpp/threading/future/wait/wait.cpp +++ b/library/cpp/threading/future/wait/wait.cpp @@ -31,13 +31,13 @@ namespace NThreading { TWaitGroup<WaitPolicy> wg; for (const auto& fut : futures) { wg.Add(fut); - } + } return std::move(wg).Finish(); } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// TFuture<void> WaitAll(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TAll>(f1); @@ -56,25 +56,25 @@ namespace NThreading { TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1); - } + } TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2); - } + } TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures); } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// TFuture<void> WaitAny(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TAny>(f1); - } + } TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) { return WaitGeneric<TWaitPolicy::TAny>(f1, f2); - } + } TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) { return WaitGeneric<TWaitPolicy::TAny>(futures); diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index 60ec5b6a63..6ff7d57baa 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -25,16 +25,16 @@ namespace NThreading { [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1); [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2); [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures); - template <typename TContainer> + template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures); - // waits for any future + // waits for any future [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1); [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2); [[nodiscard]] TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures); - template <typename TContainer> + template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures); -} +} #define INCLUDE_FUTURE_INL_H #include "wait-inl.h" diff --git a/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp b/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp index 4faa53fc2a..c3027ea544 100644 --- a/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp +++ b/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp @@ -28,7 +28,7 @@ static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) { } #elif defined(__GNUC__) #else -#error unsupported platform +#error unsupported platform #endif class TPosixRWLock { diff --git a/library/cpp/threading/light_rw_lock/lightrwlock.cpp b/library/cpp/threading/light_rw_lock/lightrwlock.cpp index 107c8ec3bf..fbb63fd47f 100644 --- a/library/cpp/threading/light_rw_lock/lightrwlock.cpp +++ b/library/cpp/threading/light_rw_lock/lightrwlock.cpp @@ -79,7 +79,7 @@ void TLightRWLock::WaitForUntrappedAndAcquireRead() { } skip_lock_try: - if (AtomicLoad(UnshareFutex_) && (AtomicLoad(Counter_) & 0x7FFFFFFF) == 0) { + if (AtomicLoad(UnshareFutex_) && (AtomicLoad(Counter_) & 0x7FFFFFFF) == 0) { SequenceStore(UnshareFutex_, 0); FutexWake(UnshareFutex_, 1); } diff --git a/library/cpp/threading/light_rw_lock/lightrwlock.h b/library/cpp/threading/light_rw_lock/lightrwlock.h index 4c648fc2a3..931a1817bc 100644 --- a/library/cpp/threading/light_rw_lock/lightrwlock.h +++ b/library/cpp/threading/light_rw_lock/lightrwlock.h @@ -1,5 +1,5 @@ #pragma once - + #include <util/system/rwlock.h> #include <util/system/sanitizers.h> @@ -34,55 +34,55 @@ #include <errno.h> namespace NS_LightRWLock { - static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) { - return __atomic_fetch_add(&item, value, __ATOMIC_SEQ_CST); - } + static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) { + return __atomic_fetch_add(&item, value, __ATOMIC_SEQ_CST); + } #if defined(_x86_64_) || defined(_i386_) - static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) { - char ret; - __asm__ __volatile__( - "lock bts %2,%0\n" - "setc %1\n" - : "+m"(item), "=rm"(ret) - : "r"(bit) - : "cc"); + static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) { + char ret; + __asm__ __volatile__( + "lock bts %2,%0\n" + "setc %1\n" + : "+m"(item), "=rm"(ret) + : "r"(bit) + : "cc"); // msan doesn't treat ret as initialized NSan::Unpoison(&ret, sizeof(ret)); - return ret; - } + return ret; + } - static char Y_FORCE_INLINE AtomicClearBit(volatile int& item, unsigned bit) { - char ret; - __asm__ __volatile__( - "lock btc %2,%0\n" - "setc %1\n" - : "+m"(item), "=rm"(ret) - : "r"(bit) - : "cc"); + static char Y_FORCE_INLINE AtomicClearBit(volatile int& item, unsigned bit) { + char ret; + __asm__ __volatile__( + "lock btc %2,%0\n" + "setc %1\n" + : "+m"(item), "=rm"(ret) + : "r"(bit) + : "cc"); // msan doesn't treat ret as initialized NSan::Unpoison(&ret, sizeof(ret)); - return ret; - } + return ret; + } #else - static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) { - int prev = __atomic_fetch_or(&item, 1 << bit, __ATOMIC_SEQ_CST); - return (prev & (1 << bit)) != 0 ? 1 : 0; - } + static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) { + int prev = __atomic_fetch_or(&item, 1 << bit, __ATOMIC_SEQ_CST); + return (prev & (1 << bit)) != 0 ? 1 : 0; + } - static char Y_FORCE_INLINE - AtomicClearBit(volatile int& item, unsigned bit) { - int prev = __atomic_fetch_and(&item, ~(1 << bit), __ATOMIC_SEQ_CST); - return (prev & (1 << bit)) != 0 ? 1 : 0; - } + static char Y_FORCE_INLINE + AtomicClearBit(volatile int& item, unsigned bit) { + int prev = __atomic_fetch_and(&item, ~(1 << bit), __ATOMIC_SEQ_CST); + return (prev & (1 << bit)) != 0 ? 1 : 0; + } #endif #if defined(_x86_64_) || defined(_i386_) || defined (__aarch64__) || defined (__powerpc64__) @@ -100,42 +100,42 @@ namespace NS_LightRWLock { #endif - template <typename TInt> - static void Y_FORCE_INLINE AtomicStore(volatile TInt& var, TInt value) { - __atomic_store_n(&var, value, __ATOMIC_RELEASE); - } - - template <typename TInt> - static void Y_FORCE_INLINE SequenceStore(volatile TInt& var, TInt value) { - __atomic_store_n(&var, value, __ATOMIC_SEQ_CST); - } - - template <typename TInt> - static TInt Y_FORCE_INLINE AtomicLoad(const volatile TInt& var) { - return __atomic_load_n(&var, __ATOMIC_ACQUIRE); - } - - static void Y_FORCE_INLINE FutexWait(volatile int& fvar, int value) { - for (;;) { - int result = - syscall(SYS_futex, &fvar, FUTEX_WAIT_PRIVATE, value, NULL, NULL, 0); - if (Y_UNLIKELY(result == -1)) { - if (errno == EWOULDBLOCK) - return; - if (errno == EINTR) - continue; - Y_FAIL("futex error"); - } + template <typename TInt> + static void Y_FORCE_INLINE AtomicStore(volatile TInt& var, TInt value) { + __atomic_store_n(&var, value, __ATOMIC_RELEASE); + } + + template <typename TInt> + static void Y_FORCE_INLINE SequenceStore(volatile TInt& var, TInt value) { + __atomic_store_n(&var, value, __ATOMIC_SEQ_CST); + } + + template <typename TInt> + static TInt Y_FORCE_INLINE AtomicLoad(const volatile TInt& var) { + return __atomic_load_n(&var, __ATOMIC_ACQUIRE); + } + + static void Y_FORCE_INLINE FutexWait(volatile int& fvar, int value) { + for (;;) { + int result = + syscall(SYS_futex, &fvar, FUTEX_WAIT_PRIVATE, value, NULL, NULL, 0); + if (Y_UNLIKELY(result == -1)) { + if (errno == EWOULDBLOCK) + return; + if (errno == EINTR) + continue; + Y_FAIL("futex error"); + } } } - static void Y_FORCE_INLINE FutexWake(volatile int& fvar, int amount) { - const int result = - syscall(SYS_futex, &fvar, FUTEX_WAKE_PRIVATE, amount, NULL, NULL, 0); - if (Y_UNLIKELY(result == -1)) - Y_FAIL("futex error"); - } - + static void Y_FORCE_INLINE FutexWake(volatile int& fvar, int amount) { + const int result = + syscall(SYS_futex, &fvar, FUTEX_WAKE_PRIVATE, amount, NULL, NULL, 0); + if (Y_UNLIKELY(result == -1)) + Y_FAIL("futex error"); + } + } class alignas(64) TLightRWLock { @@ -145,8 +145,8 @@ public: , TrappedFutex_(0) , UnshareFutex_(0) , SpinCount_(spinCount) - { - } + { + } TLightRWLock(const TLightRWLock&) = delete; void operator=(const TLightRWLock&) = delete; @@ -208,10 +208,10 @@ private: class TLightRWLock: public TRWMutex { public: - TLightRWLock() { - } - TLightRWLock(ui32) { - } + TLightRWLock() { + } + TLightRWLock(ui32) { + } }; #endif diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index f2b825681f..1d3fbb4bf4 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -40,7 +40,7 @@ namespace { NPar::TLocallyExecutableFunction Exec; int FirstId, LastId; TVector<NThreading::TPromise<void>> Promises; - + public: TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) : Exec(std::move(exec)) diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index 33476722b8..c1c824f67c 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -5,7 +5,7 @@ #include <util/generic/cast.h> #include <util/generic/fwd.h> #include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> +#include <util/generic/ptr.h> #include <util/generic/singleton.h> #include <util/generic/ymath.h> @@ -135,9 +135,9 @@ namespace NPar { // TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - template <typename TBody> + template <typename TBody> static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { - return [=](int blockId) { + return [=](int blockId) { const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); for (int i = blockFirstId; i < blockLastId; ++i) { diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp index ccc833c1b9..ac5737717c 100644 --- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -15,315 +15,315 @@ static const int DefaultThreadsCount = 41; static const int DefaultRangeSize = 999; Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ - bool AllOf(const TVector<int>& vec, int value){ + bool AllOf(const TVector<int>& vec, int value){ return AllOf(vec, [value](int element) { return value == element; }); -} - -void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(threads); - TAtomic signal = 0; - TVector<int> data(rangeSize, 0); - TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { - UNIT_ASSERT(data[i] == 0); - while (AtomicGet(signal) == 0) - ; - data[i] += 1; - }, - 0, rangeSize, TLocalExecutor::HIGH_PRIORITY); - UNIT_ASSERT(AllOf(data, 0)); - for (auto& future : futures) - UNIT_ASSERT(!future.HasValue()); - AtomicSet(signal, 1); - for (auto& future : futures) { - future.GetValueSync(); +} + +void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(threads); + TAtomic signal = 0; + TVector<int> data(rangeSize, 0); + TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { + UNIT_ASSERT(data[i] == 0); + while (AtomicGet(signal) == 0) + ; + data[i] += 1; + }, + 0, rangeSize, TLocalExecutor::HIGH_PRIORITY); + UNIT_ASSERT(AllOf(data, 0)); + for (auto& future : futures) + UNIT_ASSERT(!future.HasValue()); + AtomicSet(signal, 1); + for (auto& future : futures) { + future.GetValueSync(); } - UNIT_ASSERT(AllOf(data, 1)); -} + UNIT_ASSERT(AllOf(data, 1)); +} Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { - AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); -} + AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); +} Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { - AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); -} + AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); +} Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { - AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); -} + AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); +} Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { - AsyncRunAndWaitFuturesReady(1, 1); -} + AsyncRunAndWaitFuturesReady(1, 1); +} Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(DefaultThreadsCount); - TAtomic signal = 0; - TVector<int> data1(DefaultRangeSize, 0); - TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { - UNIT_ASSERT(data1[i] == 0); - while (AtomicGet(signal) == 0) - ; - data1[i] += 1; - }, - 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); - TVector<int> data2(DefaultRangeSize, 0); - TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { - UNIT_ASSERT(data2[i] == 0); - while (AtomicGet(signal) == 0) - ; - data2[i] += 2; - }, - 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); - UNIT_ASSERT(AllOf(data1, 0)); - UNIT_ASSERT(AllOf(data2, 0)); - AtomicSet(signal, 1); - for (int i = 0; i < DefaultRangeSize; ++i) { - futures1[i].GetValueSync(); - futures2[i].GetValueSync(); + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + TAtomic signal = 0; + TVector<int> data1(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { + UNIT_ASSERT(data1[i] == 0); + while (AtomicGet(signal) == 0) + ; + data1[i] += 1; + }, + 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + TVector<int> data2(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { + UNIT_ASSERT(data2[i] == 0); + while (AtomicGet(signal) == 0) + ; + data2[i] += 2; + }, + 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + UNIT_ASSERT(AllOf(data1, 0)); + UNIT_ASSERT(AllOf(data2, 0)); + AtomicSet(signal, 1); + for (int i = 0; i < DefaultRangeSize; ++i) { + futures1[i].GetValueSync(); + futures2[i].GetValueSync(); } - UNIT_ASSERT(AllOf(data1, 1)); - UNIT_ASSERT(AllOf(data2, 2)); -} - -void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(threadsCount); - TAtomic signal = 0; - TVector<int> data(rangeSize, 0); - TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { - UNIT_ASSERT(data[i] == 0); - while (AtomicGet(signal) == 0) - ; - data[i] += 1; - throw 10000 + i; - }, - 0, rangeSize, TLocalExecutor::HIGH_PRIORITY); - UNIT_ASSERT(AllOf(data, 0)); - UNIT_ASSERT(futures.ysize() == rangeSize); - AtomicSet(signal, 1); - int exceptionsCaught = 0; - for (int i = 0; i < rangeSize; ++i) { - try { - futures[i].GetValueSync(); - } catch (int& e) { - if (e == 10000 + i) { - ++exceptionsCaught; + UNIT_ASSERT(AllOf(data1, 1)); + UNIT_ASSERT(AllOf(data2, 2)); +} + +void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(threadsCount); + TAtomic signal = 0; + TVector<int> data(rangeSize, 0); + TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { + UNIT_ASSERT(data[i] == 0); + while (AtomicGet(signal) == 0) + ; + data[i] += 1; + throw 10000 + i; + }, + 0, rangeSize, TLocalExecutor::HIGH_PRIORITY); + UNIT_ASSERT(AllOf(data, 0)); + UNIT_ASSERT(futures.ysize() == rangeSize); + AtomicSet(signal, 1); + int exceptionsCaught = 0; + for (int i = 0; i < rangeSize; ++i) { + try { + futures[i].GetValueSync(); + } catch (int& e) { + if (e == 10000 + i) { + ++exceptionsCaught; } } } - UNIT_ASSERT(exceptionsCaught == rangeSize); - UNIT_ASSERT(AllOf(data, 1)); -} + UNIT_ASSERT(exceptionsCaught == rangeSize); + UNIT_ASSERT(AllOf(data, 1)); +} Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { - AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); -} + AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); +} Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { - AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); -} + AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); +} Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { - AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); -} + AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); +} Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { - AsyncRunRangeAndWaitExceptions(1, 1); -} + AsyncRunRangeAndWaitExceptions(1, 1); +} Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(DefaultThreadsCount); - TAtomic signal = 0; - TVector<int> data1(DefaultRangeSize, 0); - TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { - UNIT_ASSERT(data1[i] == 0); - while (AtomicGet(signal) == 0) - ; - data1[i] += 1; - throw 15000 + i; - }, - 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY); - TVector<int> data2(DefaultRangeSize, 0); - TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { - UNIT_ASSERT(data2[i] == 0); - while (AtomicGet(signal) == 0) - ; - data2[i] += 2; - throw 16000 + i; - }, - 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); - - UNIT_ASSERT(AllOf(data1, 0)); - UNIT_ASSERT(AllOf(data2, 0)); - UNIT_ASSERT(futures1.size() == DefaultRangeSize); - UNIT_ASSERT(futures2.size() == DefaultRangeSize); - AtomicSet(signal, 1); - int exceptionsCaught = 0; - for (int i = 0; i < DefaultRangeSize; ++i) { - try { - futures1[i].GetValueSync(); - } catch (int& e) { - if (e == 15000 + i) { - ++exceptionsCaught; + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + TAtomic signal = 0; + TVector<int> data1(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { + UNIT_ASSERT(data1[i] == 0); + while (AtomicGet(signal) == 0) + ; + data1[i] += 1; + throw 15000 + i; + }, + 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY); + TVector<int> data2(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { + UNIT_ASSERT(data2[i] == 0); + while (AtomicGet(signal) == 0) + ; + data2[i] += 2; + throw 16000 + i; + }, + 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + + UNIT_ASSERT(AllOf(data1, 0)); + UNIT_ASSERT(AllOf(data2, 0)); + UNIT_ASSERT(futures1.size() == DefaultRangeSize); + UNIT_ASSERT(futures2.size() == DefaultRangeSize); + AtomicSet(signal, 1); + int exceptionsCaught = 0; + for (int i = 0; i < DefaultRangeSize; ++i) { + try { + futures1[i].GetValueSync(); + } catch (int& e) { + if (e == 15000 + i) { + ++exceptionsCaught; } - } - try { - futures2[i].GetValueSync(); - } catch (int& e) { - if (e == 16000 + i) { - ++exceptionsCaught; + } + try { + futures2[i].GetValueSync(); + } catch (int& e) { + if (e == 16000 + i) { + ++exceptionsCaught; } } } - UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize); - UNIT_ASSERT(AllOf(data1, 1)); - UNIT_ASSERT(AllOf(data2, 2)); -} - -void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) { - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(threadsCount); - TVector<int> data(rangeSize, 0); - TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) { - UNIT_ASSERT(data[i] == 0); - data[i] += 1; - throw 30000 + i; - }, - 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); - UNIT_ASSERT(AllOf(data, 1)); - int exceptionsCaught = 0; - for (int i = 0; i < rangeSize; ++i) { - try { - futures[i].GetValueSync(); - } catch (int& e) { - if (e == 30000 + i) { - ++exceptionsCaught; + UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize); + UNIT_ASSERT(AllOf(data1, 1)); + UNIT_ASSERT(AllOf(data2, 2)); +} + +void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(threadsCount); + TVector<int> data(rangeSize, 0); + TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) { + UNIT_ASSERT(data[i] == 0); + data[i] += 1; + throw 30000 + i; + }, + 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); + UNIT_ASSERT(AllOf(data, 1)); + int exceptionsCaught = 0; + for (int i = 0; i < rangeSize; ++i) { + try { + futures[i].GetValueSync(); + } catch (int& e) { + if (e == 30000 + i) { + ++exceptionsCaught; } } } - UNIT_ASSERT(exceptionsCaught == rangeSize); - UNIT_ASSERT(AllOf(data, 1)); -} + UNIT_ASSERT(exceptionsCaught == rangeSize); + UNIT_ASSERT(AllOf(data, 1)); +} Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { - RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); -} + RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); +} Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { - RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); -} + RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); +} Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { - RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); -} + RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); +} Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { - RunRangeAndCheckExceptionsWithWaitComplete(1, 1); -} + RunRangeAndCheckExceptionsWithWaitComplete(1, 1); +} Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { - RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); -} + RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); +} Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { - RunRangeAndCheckExceptionsWithWaitComplete(1, 0); -} -} -; + RunRangeAndCheckExceptionsWithWaitComplete(1, 0); +} +} +; Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ - void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ - AtomicSet(processed, 0); -TLocalExecutor localExecutor; -localExecutor.RunAdditionalThreads(threadsCount); -localExecutor.ExecRangeWithThrow([&processed](int) { - AtomicAdd(processed, 1); - throw TTestException(); -}, - rangeStart, rangeStart + rangeSize, flags); -} + void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ + AtomicSet(processed, 0); +TLocalExecutor localExecutor; +localExecutor.RunAdditionalThreads(threadsCount); +localExecutor.ExecRangeWithThrow([&processed](int) { + AtomicAdd(processed, 1); + throw TTestException(); +}, + rangeStart, rangeStart + rangeSize, flags); +} Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { - TAtomic processed = 0; - UNIT_ASSERT_EXCEPTION( - RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount, - TLocalExecutor::EFlags::WAIT_COMPLETE, processed), - TTestException); - UNIT_ASSERT(AtomicGet(processed) == 40); -} - -void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { - TAtomic processed = 0; - UNIT_ASSERT_EXCEPTION( - RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed), - TTestException); - UNIT_ASSERT(AtomicGet(processed) == rangeSize); -} + TAtomic processed = 0; + UNIT_ASSERT_EXCEPTION( + RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE, processed), + TTestException); + UNIT_ASSERT(AtomicGet(processed) == 40); +} + +void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { + TAtomic processed = 0; + UNIT_ASSERT_EXCEPTION( + RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed), + TTestException); + UNIT_ASSERT(AtomicGet(processed) == rangeSize); +} Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { - ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, - TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); -} + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); +} Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { - ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, - TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); -} + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); +} Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { - ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, - TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); -} + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); +} Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { - ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, - TLocalExecutor::EFlags::WAIT_COMPLETE); -} + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE); +} Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { - ThrowAndCatchTTestException(DefaultRangeSize, 0, - TLocalExecutor::EFlags::WAIT_COMPLETE); -} + ThrowAndCatchTTestException(DefaultRangeSize, 0, + TLocalExecutor::EFlags::WAIT_COMPLETE); +} Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { - ThrowAndCatchTTestException(DefaultRangeSize, 1, - TLocalExecutor::EFlags::WAIT_COMPLETE); -} - -void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) { - localExecutor.ExecRangeWithThrow([](int) { - throw TTestException(); - }, - 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE); -} - -void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(DefaultThreadsCount); - localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) { - AtomicAdd(processed1, 1); - UNIT_ASSERT_EXCEPTION( - ThrowsTTestExceptionFromNested(localExecutor), - TTestException); - AtomicAdd(processed2, 1); - }, - 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); -} + ThrowAndCatchTTestException(DefaultRangeSize, 1, + TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) { + localExecutor.ExecRangeWithThrow([](int) { + throw TTestException(); + }, + 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) { + AtomicAdd(processed1, 1); + UNIT_ASSERT_EXCEPTION( + ThrowsTTestExceptionFromNested(localExecutor), + TTestException); + AtomicAdd(processed2, 1); + }, + 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); +} Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { - TAtomic processed1 = 0; - TAtomic processed2 = 0; - UNIT_ASSERT_NO_EXCEPTION( - CatchTTestExceptionFromNested(processed1, processed2)); - UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize); - UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize); -} -} -; + TAtomic processed1 = 0; + TAtomic processed2 = 0; + UNIT_ASSERT_NO_EXCEPTION( + CatchTTestExceptionFromNested(processed1, processed2)); + UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize); + UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize); +} +} +; Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){ diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.cpp b/library/cpp/threading/poor_man_openmp/thread_helper.cpp index b4ec5c7879..34cb6507b9 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper.cpp @@ -1,7 +1,7 @@ #include "thread_helper.h" - -#include <util/generic/singleton.h> - -TMtpQueueHelper& TMtpQueueHelper::Instance() { - return *Singleton<TMtpQueueHelper>(); -} + +#include <util/generic/singleton.h> + +TMtpQueueHelper& TMtpQueueHelper::Instance() { + return *Singleton<TMtpQueueHelper>(); +} diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.h b/library/cpp/threading/poor_man_openmp/thread_helper.h index 1536c186cb..0ecee0590b 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.h +++ b/library/cpp/threading/poor_man_openmp/thread_helper.h @@ -2,17 +2,17 @@ #include <util/thread/pool.h> #include <util/generic/utility.h> -#include <util/generic/yexception.h> +#include <util/generic/yexception.h> #include <util/system/info.h> #include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/stream/output.h> +#include <util/stream/output.h> #include <functional> -#include <cstdlib> +#include <cstdlib> -class TMtpQueueHelper { +class TMtpQueueHelper { public: TMtpQueueHelper() { SetThreadCount(NSystemInfo::CachedNumberOfCpus()); @@ -27,79 +27,79 @@ public: ThreadCount = threads; q = CreateThreadPool(ThreadCount); } - - static TMtpQueueHelper& Instance(); - + + static TMtpQueueHelper& Instance(); + private: size_t ThreadCount; TAutoPtr<IThreadPool> q; }; -namespace NYmp { +namespace NYmp { inline void SetThreadCount(size_t threads) { - TMtpQueueHelper::Instance().SetThreadCount(threads); + TMtpQueueHelper::Instance().SetThreadCount(threads); } inline size_t GetThreadCount() { - return TMtpQueueHelper::Instance().GetThreadCount(); + return TMtpQueueHelper::Instance().GetThreadCount(); } - template <typename T> + template <typename T> inline void ParallelForStaticChunk(T begin, T end, size_t chunkSize, std::function<void(T)> func) { - chunkSize = Max<size_t>(chunkSize, 1); - - size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); + chunkSize = Max<size_t>(chunkSize, 1); + + size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); IThreadPool* queue = TMtpQueueHelper::Instance().Get(); TCondVar cv; TMutex mutex; TAtomic counter = threadCount; - std::exception_ptr err; - - for (size_t i = 0; i < threadCount; ++i) { - queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { - try { - T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); - - while (currentChunkStart < end) { - T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); - - for (T val = currentChunkStart; val < currentChunkEnd; ++val) { - func(val); - } - - currentChunkStart += chunkSize * threadCount; + std::exception_ptr err; + + for (size_t i = 0; i < threadCount; ++i) { + queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { + try { + T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); + + while (currentChunkStart < end) { + T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); + + for (T val = currentChunkStart; val < currentChunkEnd; ++val) { + func(val); + } + + currentChunkStart += chunkSize * threadCount; + } + } catch (...) { + with_lock (mutex) { + err = std::current_exception(); + } + } + + with_lock (mutex) { + if (AtomicDecrement(counter) == 0) { + //last one + cv.Signal(); } - } catch (...) { - with_lock (mutex) { - err = std::current_exception(); - } } - - with_lock (mutex) { - if (AtomicDecrement(counter) == 0) { - //last one - cv.Signal(); - } - } }); } - - with_lock (mutex) { - while (AtomicGet(counter) > 0) { - cv.WaitI(mutex); - } + + with_lock (mutex) { + while (AtomicGet(counter) > 0) { + cv.WaitI(mutex); + } + } + + if (err) { + std::rethrow_exception(err); } - - if (err) { - std::rethrow_exception(err); - } } - template <typename T> + template <typename T> inline void ParallelForStaticAutoChunk(T begin, T end, std::function<void(T)> func) { - const size_t taskSize = end - begin; - const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); - + const size_t taskSize = end - begin; + const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); + ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func); } -} +} diff --git a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp index 79c7a14b5e..7417636864 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp @@ -1,26 +1,26 @@ -#include "thread_helper.h" - +#include "thread_helper.h" + #include <library/cpp/testing/unittest/registar.h> - + #include <util/generic/string.h> -#include <util/generic/yexception.h> - +#include <util/generic/yexception.h> + Y_UNIT_TEST_SUITE(TestMP) { Y_UNIT_TEST(TestErr) { - std::function<void(int)> f = [](int x) { - if (x == 5) { - ythrow yexception() << "oops"; - } - }; - + std::function<void(int)> f = [](int x) { + if (x == 5) { + ythrow yexception() << "oops"; + } + }; + TString s; - - try { - NYmp::ParallelForStaticAutoChunk(0, 10, f); - } catch (...) { - s = CurrentExceptionMessage(); - } - - UNIT_ASSERT(s.find("oops") > 0); - } -} + + try { + NYmp::ParallelForStaticAutoChunk(0, 10, f); + } catch (...) { + s = CurrentExceptionMessage(); + } + + UNIT_ASSERT(s.find("oops") > 0); + } +} diff --git a/library/cpp/threading/poor_man_openmp/ut/ya.make b/library/cpp/threading/poor_man_openmp/ut/ya.make index 7305d14b99..6d7aa123ed 100644 --- a/library/cpp/threading/poor_man_openmp/ut/ya.make +++ b/library/cpp/threading/poor_man_openmp/ut/ya.make @@ -1,12 +1,12 @@ UNITTEST_FOR(library/cpp/threading/poor_man_openmp) - + OWNER( pg agorodilov ) - -SRCS( - thread_helper_ut.cpp -) - -END() + +SRCS( + thread_helper_ut.cpp +) + +END() diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h index 420b1e8829..c42caa7ac0 100644 --- a/library/cpp/threading/queue/mpsc_htswap.h +++ b/library/cpp/threading/queue/mpsc_htswap.h @@ -28,8 +28,8 @@ namespace NThreading { namespace NHTSwapPrivate { template <typename T, typename TTuneup> struct TNode - : public TTuneup::TNodeBase, - public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> { + : public TTuneup::TNodeBase, + public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> { TNode(const T& item) { this->Next = nullptr; this->Item = item; @@ -60,7 +60,7 @@ namespace NThreading { template <typename T, typename TTuneup> class THTSwapQueueImpl - : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> { + : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> { protected: using TTunedNode = TNode<T, TTuneup>; @@ -124,9 +124,9 @@ namespace NThreading { DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout); DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); - template <typename T = void*, typename... TParams> + template <typename T = void*, typename... TParams> class THTSwapQueue - : public NHTSwapPrivate::THTSwapQueueImpl<T, - TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> { + : public NHTSwapPrivate::THTSwapQueueImpl<T, + TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> { }; } diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h index 97e6131dd4..6ac7537ae9 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h @@ -25,7 +25,7 @@ namespace NThreading { void Push(void* node) noexcept { Push(reinterpret_cast<TIntrusiveNode*>(node)); } - + private: TIntrusiveNode* HeadForCaS = nullptr; TIntrusiveNode* HeadForSwap = nullptr; diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h index 621517328e..be33ba5a58 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.h +++ b/library/cpp/threading/queue/mpsc_read_as_filled.h @@ -132,7 +132,7 @@ namespace NThreading { TMsgBunch* volatile NextToken; /* this push can return PUSH_RESULT_BLOCKED */ - inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { + inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) { if (Y_UNLIKELY(slot < FirstSlot)) { return PUSH_RESULT_BACKWARD; } @@ -194,7 +194,7 @@ namespace NThreading { // the object could be destroyed after this method inline void SetNextToken(TMsgBunch* next) { AtomicSet(NextToken, next); - if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) { + if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) { Release(this); next->DecrementToken(); } @@ -317,8 +317,8 @@ namespace NThreading { } }; - template <typename TWBucket = TWriteBucket<>, - template <typename, typename...> class TContainer = TDeque> + template <typename TWBucket = TWriteBucket<>, + template <typename, typename...> class TContainer = TDeque> class TReadBucket { public: using TAux = typename TWBucket::TUsingAux; @@ -543,7 +543,7 @@ namespace NThreading { static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE; using TBunchBase = TEmpty; - template <typename TElem, typename... TRest> + template <typename TElem, typename... TRest> using TContainer = TDeque<TElem, TRest...>; static constexpr bool DeleteItems = true; @@ -556,7 +556,7 @@ namespace NThreading { DeclareTuneContainer(TRaFQueueSkipContainer, TContainer); DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems); - template <typename TItem = void, typename... TParams> + template <typename TItem = void, typename... TParams> class TReadAsFilledQueue { private: using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>; @@ -565,7 +565,7 @@ namespace NThreading { using TBunchBase = typename TTuned::TBunchBase; - template <typename TElem, typename... TRest> + template <typename TElem, typename... TRest> using TContainer = typename TTuned::template TContainer<TElem, TRest...>; diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h index 4c85bef6ec..5f91f1b5a8 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h @@ -469,7 +469,7 @@ namespace NThreading { DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase); DeclareTuneTypeParam(TObstructiveQueueAux, TAux); - template <typename TItem = void, typename... TParams> + template <typename TItem = void, typename... TParams> class TObstructiveConsumerAuxQueue { private: using TTuned = @@ -522,7 +522,7 @@ namespace NThreading { template <typename TItem = void, bool DeleteItems = true> class TObstructiveConsumerQueue - : public TObstructiveConsumerAuxQueue<TItem, - TObstructiveQueueDeleteItems<DeleteItems>> { + : public TObstructiveConsumerAuxQueue<TItem, + TObstructiveQueueDeleteItems<DeleteItems>> { }; -} +} diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index a55f952cbc..80eca147da 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -12,7 +12,7 @@ private: UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) - /* + /* UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h index 1072342620..50fc3dc17c 100644 --- a/library/cpp/threading/queue/tune.h +++ b/library/cpp/threading/queue/tune.h @@ -96,14 +96,14 @@ }; \ } -#define DeclareTuneContainer(TParamName, InternalName) \ - template <template <typename, typename...> class TNewContainer> \ - struct TParamName { \ - template <typename TBase> \ - struct TApply: public TBase { \ - template <typename TElem, typename... TRest> \ - using InternalName = TNewContainer<TElem, TRest...>; \ - }; \ +#define DeclareTuneContainer(TParamName, InternalName) \ + template <template <typename, typename...> class TNewContainer> \ + struct TParamName { \ + template <typename TBase> \ + struct TApply: public TBase { \ + template <typename TElem, typename... TRest> \ + using InternalName = TNewContainer<TElem, TRest...>; \ + }; \ } namespace NTunePrivate { diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp index 49ebd4a1cf..a43b7f520e 100644 --- a/library/cpp/threading/queue/unordered_ut.cpp +++ b/library/cpp/threading/queue/unordered_ut.cpp @@ -59,9 +59,9 @@ public: class TWorker: public ISimpleThread { public: TWorker( - TQueueType* queues_, - ui16 mine, - TAtomic* pushDone) + TQueueType* queues_, + ui16 mine, + TAtomic* pushDone) : Queues(queues_) , MineQueue(mine) , PushDone(pushDone) @@ -132,7 +132,7 @@ public: for (ui32 i = 0; i < COUNT; ++i) { workers[i]->Join(); all.insert(all.begin(), - workers[i]->Received.begin(), workers[i]->Received.end()); + workers[i]->Received.begin(), workers[i]->Received.end()); } std::sort(all.begin(), all.end()); diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h index b017763794..2756b52601 100644 --- a/library/cpp/threading/queue/ut_helpers.h +++ b/library/cpp/threading/queue/ut_helpers.h @@ -13,11 +13,11 @@ struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> { }; struct TBasicObstructiveConsumer - : public NThreading::TObstructiveConsumerQueue<> { + : public NThreading::TObstructiveConsumerQueue<> { }; struct TBasicMPSCIntrusiveUnordered - : public NThreading::TMPSCIntrusiveUnordered { + : public NThreading::TMPSCIntrusiveUnordered { }; struct TIntrusiveLink: public NThreading::TIntrusiveNode { @@ -30,11 +30,11 @@ struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing { } }; -#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate) \ - UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>); \ - UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \ +#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate) \ + UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>); \ + UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>) -#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate) \ +#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate) \ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \ UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>); diff --git a/library/cpp/threading/skip_list/compare.h b/library/cpp/threading/skip_list/compare.h index 336582a1b8..ac98b3e1ce 100644 --- a/library/cpp/threading/skip_list/compare.h +++ b/library/cpp/threading/skip_list/compare.h @@ -4,74 +4,74 @@ #include <util/str_stl.h> namespace NThreading { - namespace NImpl { - Y_HAS_MEMBER(compare); - Y_HAS_MEMBER(Compare); + namespace NImpl { + Y_HAS_MEMBER(compare); + Y_HAS_MEMBER(Compare); template <typename T> - inline int CompareImpl(const T& l, const T& r) { - if (l < r) { - return -1; - } else if (r < l) { - return +1; - } else { - return 0; - } + inline int CompareImpl(const T& l, const T& r) { + if (l < r) { + return -1; + } else if (r < l) { + return +1; + } else { + return 0; + } } - template <bool val> - struct TSmallCompareSelector { - template <typename T> - static inline int Compare(const T& l, const T& r) { - return CompareImpl(l, r); - } - }; + template <bool val> + struct TSmallCompareSelector { + template <typename T> + static inline int Compare(const T& l, const T& r) { + return CompareImpl(l, r); + } + }; - template <> - struct TSmallCompareSelector<true> { - template <typename T> - static inline int Compare(const T& l, const T& r) { - return l.compare(r); - } - }; + template <> + struct TSmallCompareSelector<true> { + template <typename T> + static inline int Compare(const T& l, const T& r) { + return l.compare(r); + } + }; - template <bool val> - struct TBigCompareSelector { - template <typename T> - static inline int Compare(const T& l, const T& r) { + template <bool val> + struct TBigCompareSelector { + template <typename T> + static inline int Compare(const T& l, const T& r) { return TSmallCompareSelector<THascompare<T>::value>::Compare(l, r); - } - }; - - template <> - struct TBigCompareSelector<true> { - template <typename T> - static inline int Compare(const T& l, const T& r) { - return l.Compare(r); - } - }; - + } + }; + + template <> + struct TBigCompareSelector<true> { + template <typename T> + static inline int Compare(const T& l, const T& r) { + return l.Compare(r); + } + }; + template <typename T> struct TCompareSelector: public TBigCompareSelector<THasCompare<T>::value> { - }; - } + }; + } + + //////////////////////////////////////////////////////////////////////////////// + // Generic compare function - //////////////////////////////////////////////////////////////////////////////// - // Generic compare function - template <typename T> - inline int Compare(const T& l, const T& r) { - return NImpl::TCompareSelector<T>::Compare(l, r); - } - - //////////////////////////////////////////////////////////////////////////////// - // Generic compare functor - - template <typename T> - struct TCompare { - inline int operator()(const T& l, const T& r) const { - return Compare(l, r); - } + inline int Compare(const T& l, const T& r) { + return NImpl::TCompareSelector<T>::Compare(l, r); + } + + //////////////////////////////////////////////////////////////////////////////// + // Generic compare functor + + template <typename T> + struct TCompare { + inline int operator()(const T& l, const T& r) const { + return Compare(l, r); + } }; } diff --git a/library/cpp/threading/skip_list/perf/main.cpp b/library/cpp/threading/skip_list/perf/main.cpp index d722d43436..4ad52049e7 100644 --- a/library/cpp/threading/skip_list/perf/main.cpp +++ b/library/cpp/threading/skip_list/perf/main.cpp @@ -14,345 +14,345 @@ #include <util/system/thread.h> namespace { - using namespace NThreading; + using namespace NThreading; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// IOutputStream& LogInfo() { - return Cerr << TInstant::Now() << " INFO: "; - } + return Cerr << TInstant::Now() << " INFO: "; + } IOutputStream& LogError() { - return Cerr << TInstant::Now() << " ERROR: "; - } - - //////////////////////////////////////////////////////////////////////////////// - - struct TListItem { - TStringBuf Key; - TStringBuf Value; - - TListItem(const TStringBuf& key, const TStringBuf& value) - : Key(key) - , Value(value) - { - } - - int Compare(const TListItem& other) const { - return Key.compare(other.Key); - } - }; - - using TListType = TSkipList<TListItem>; - - //////////////////////////////////////////////////////////////////////////////// - - class TRandomData { - private: - TVector<char> Buffer; - - public: - TRandomData() - : Buffer(1024 * 1024) - { - for (size_t i = 0; i < Buffer.size(); ++i) { - Buffer[i] = RandomNumber<char>(); - } - } - - TStringBuf GetString(size_t len) const { - size_t start = RandomNumber(Buffer.size() - len); - return TStringBuf(&Buffer[start], len); + return Cerr << TInstant::Now() << " ERROR: "; + } + + //////////////////////////////////////////////////////////////////////////////// + + struct TListItem { + TStringBuf Key; + TStringBuf Value; + + TListItem(const TStringBuf& key, const TStringBuf& value) + : Key(key) + , Value(value) + { + } + + int Compare(const TListItem& other) const { + return Key.compare(other.Key); + } + }; + + using TListType = TSkipList<TListItem>; + + //////////////////////////////////////////////////////////////////////////////// + + class TRandomData { + private: + TVector<char> Buffer; + + public: + TRandomData() + : Buffer(1024 * 1024) + { + for (size_t i = 0; i < Buffer.size(); ++i) { + Buffer[i] = RandomNumber<char>(); + } + } + + TStringBuf GetString(size_t len) const { + size_t start = RandomNumber(Buffer.size() - len); + return TStringBuf(&Buffer[start], len); + } + + TStringBuf GetString(size_t min, size_t max) const { + return GetString(min + RandomNumber(max - min)); } + }; - TStringBuf GetString(size_t min, size_t max) const { - return GetString(min + RandomNumber(max - min)); - } - }; - - //////////////////////////////////////////////////////////////////////////////// - - class TWorkerThread: public ISimpleThread { - private: - std::function<void()> Func; - TDuration Time; - - public: - TWorkerThread(std::function<void()> func) - : Func(func) - { - } - - TDuration GetTime() const { - return Time; - } - - private: - void* ThreadProc() noexcept override { - TInstant started = TInstant::Now(); - Func(); - Time = TInstant::Now() - started; - return nullptr; - } - }; - - inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) { - TAutoPtr<TWorkerThread> thread = new TWorkerThread(func); - thread->Start(); - return thread; + //////////////////////////////////////////////////////////////////////////////// + + class TWorkerThread: public ISimpleThread { + private: + std::function<void()> Func; + TDuration Time; + + public: + TWorkerThread(std::function<void()> func) + : Func(func) + { + } + + TDuration GetTime() const { + return Time; + } + + private: + void* ThreadProc() noexcept override { + TInstant started = TInstant::Now(); + Func(); + Time = TInstant::Now() - started; + return nullptr; + } + }; + + inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) { + TAutoPtr<TWorkerThread> thread = new TWorkerThread(func); + thread->Start(); + return thread; } - //////////////////////////////////////////////////////////////////////////////// - - typedef std::function<void()> TTestFunc; - - struct TTest { - TString Name; - TTestFunc Func; - - TTest() { - } - - TTest(const TString& name, const TTestFunc& func) - : Name(name) - , Func(func) - { - } - }; - - //////////////////////////////////////////////////////////////////////////////// - - class TTestSuite { - private: - size_t Iterations = 1000000; - size_t KeyLen = 10; - size_t ValueLen = 100; - size_t NumReaders = 4; - size_t NumWriters = 1; - size_t BatchSize = 20; - - TMemoryPool MemoryPool; - TListType List; - TMutex Mutex; - TRandomData Random; - - TMap<TCiString, TTest> AllTests; - TVector<TTest> Tests; - - public: - TTestSuite() - : MemoryPool(64 * 1024) - , List(MemoryPool) - { - } - - bool Init(int argc, const char* argv[]) { - TVector<TString> tests; - try { - NLastGetopt::TOpts opts; - opts.AddHelpOption(); - -#define OPTION(opt, x) \ - opts.AddLongOption(opt, #x) \ - .Optional() \ - .DefaultValue(ToString(x)) \ - .StoreResult(&x) // end of OPTION - - OPTION('i', Iterations); - OPTION('k', KeyLen); - OPTION('v', ValueLen); - OPTION('r', NumReaders); - OPTION('w', NumWriters); - OPTION('b', BatchSize); + //////////////////////////////////////////////////////////////////////////////// + + typedef std::function<void()> TTestFunc; + + struct TTest { + TString Name; + TTestFunc Func; + + TTest() { + } + + TTest(const TString& name, const TTestFunc& func) + : Name(name) + , Func(func) + { + } + }; + + //////////////////////////////////////////////////////////////////////////////// + + class TTestSuite { + private: + size_t Iterations = 1000000; + size_t KeyLen = 10; + size_t ValueLen = 100; + size_t NumReaders = 4; + size_t NumWriters = 1; + size_t BatchSize = 20; + + TMemoryPool MemoryPool; + TListType List; + TMutex Mutex; + TRandomData Random; + + TMap<TCiString, TTest> AllTests; + TVector<TTest> Tests; + + public: + TTestSuite() + : MemoryPool(64 * 1024) + , List(MemoryPool) + { + } + + bool Init(int argc, const char* argv[]) { + TVector<TString> tests; + try { + NLastGetopt::TOpts opts; + opts.AddHelpOption(); + +#define OPTION(opt, x) \ + opts.AddLongOption(opt, #x) \ + .Optional() \ + .DefaultValue(ToString(x)) \ + .StoreResult(&x) // end of OPTION + + OPTION('i', Iterations); + OPTION('k', KeyLen); + OPTION('v', ValueLen); + OPTION('r', NumReaders); + OPTION('w', NumWriters); + OPTION('b', BatchSize); #undef OPTION - NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv); - for (const auto& opt : opts.Opts_) { - const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true); - if (r) { - LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl; - } + NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv); + for (const auto& opt : opts.Opts_) { + const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true); + if (r) { + LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl; + } } - tests = optsRes.GetFreeArgs(); - } catch (...) { - LogError() << CurrentExceptionMessage() << Endl; - return false; + tests = optsRes.GetFreeArgs(); + } catch (...) { + LogError() << CurrentExceptionMessage() << Endl; + return false; } #define TEST(type) \ - AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST + AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST - TEST(Clear); - TEST(InsertRandom); - TEST(InsertSequential); - TEST(InsertSequentialSimple); - TEST(LookupRandom); - TEST(Concurrent); + TEST(Clear); + TEST(InsertRandom); + TEST(InsertSequential); + TEST(InsertSequentialSimple); + TEST(LookupRandom); + TEST(Concurrent); #undef TEST - if (tests.empty()) { - LogError() << "no tests specified, choose from: " << PrintTests() << Endl; + if (tests.empty()) { + LogError() << "no tests specified, choose from: " << PrintTests() << Endl; return false; } - - for (size_t i = 0; i < tests.size(); ++i) { + + for (size_t i = 0; i < tests.size(); ++i) { if (!AllTests.contains(tests[i])) { - LogError() << "unknown test name: " << tests[i] << Endl; - return false; - } - Tests.push_back(AllTests[tests[i]]); - } - - return true; + LogError() << "unknown test name: " << tests[i] << Endl; + return false; + } + Tests.push_back(AllTests[tests[i]]); + } + + return true; } - void Run() { + void Run() { #if !defined(NDEBUG) - LogInfo() << "*** DEBUG build! ***" << Endl; + LogInfo() << "*** DEBUG build! ***" << Endl; #endif - for (const TTest& test : Tests) { - LogInfo() << "Starting test " << test.Name << Endl; - - TInstant started = TInstant::Now(); - try { - test.Func(); - } catch (...) { - LogError() << "test " << test.Name - << " failed: " << CurrentExceptionMessage() - << Endl; - } - - LogInfo() << "List size = " << List.GetSize() << Endl; - - TDuration duration = TInstant::Now() - started; - LogInfo() << "test " << test.Name - << " duration: " << duration - << " (" << (double)duration.MicroSeconds() / (Iterations * NumWriters) << "us per iteration)" - << Endl; - LogInfo() << "Finished test " << test.Name << Endl; - } + for (const TTest& test : Tests) { + LogInfo() << "Starting test " << test.Name << Endl; + + TInstant started = TInstant::Now(); + try { + test.Func(); + } catch (...) { + LogError() << "test " << test.Name + << " failed: " << CurrentExceptionMessage() + << Endl; + } + + LogInfo() << "List size = " << List.GetSize() << Endl; + + TDuration duration = TInstant::Now() - started; + LogInfo() << "test " << test.Name + << " duration: " << duration + << " (" << (double)duration.MicroSeconds() / (Iterations * NumWriters) << "us per iteration)" + << Endl; + LogInfo() << "Finished test " << test.Name << Endl; + } + } + + private: + void AddTest(const char* name, TTestFunc func) { + AllTests[name] = TTest(name, func); } - private: - void AddTest(const char* name, TTestFunc func) { - AllTests[name] = TTest(name, func); - } - - TString PrintTests() const { - TVector<TString> names; - for (const auto& it : AllTests) { - names.push_back(it.first); - } - return JoinSeq(", ", names); + TString PrintTests() const { + TVector<TString> names; + for (const auto& it : AllTests) { + names.push_back(it.first); + } + return JoinSeq(", ", names); } - void TEST_Clear() { - List.Clear(); - } + void TEST_Clear() { + List.Clear(); + } - void TEST_InsertRandom() { - for (size_t i = 0; i < Iterations; ++i) { - List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); - } + void TEST_InsertRandom() { + for (size_t i = 0; i < Iterations; ++i) { + List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); + } } - void TEST_InsertSequential() { - TString key; - for (size_t i = 0; i < Iterations;) { - key.assign(Random.GetString(KeyLen)); - size_t batch = BatchSize / 2 + RandomNumber(BatchSize); - for (size_t j = 0; j < batch; ++j, ++i) { - key.resize(KeyLen - 1); - key.append((char)j); - List.Insert(TListItem(key, Random.GetString(ValueLen))); - } + void TEST_InsertSequential() { + TString key; + for (size_t i = 0; i < Iterations;) { + key.assign(Random.GetString(KeyLen)); + size_t batch = BatchSize / 2 + RandomNumber(BatchSize); + for (size_t j = 0; j < batch; ++j, ++i) { + key.resize(KeyLen - 1); + key.append((char)j); + List.Insert(TListItem(key, Random.GetString(ValueLen))); + } } } - void TEST_InsertSequentialSimple() { - for (size_t i = 0; i < Iterations; ++i) { - List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); - } + void TEST_InsertSequentialSimple() { + for (size_t i = 0; i < Iterations; ++i) { + List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); + } } - void TEST_LookupRandom() { - for (size_t i = 0; i < Iterations; ++i) { - List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf())); - } + void TEST_LookupRandom() { + for (size_t i = 0; i < Iterations; ++i) { + List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf())); + } } - void TEST_Concurrent() { - LogInfo() << "starting producers..." << Endl; - - TVector<TAutoPtr<TWorkerThread>> producers(NumWriters); - for (size_t i1 = 0; i1 < producers.size(); ++i1) { - producers[i1] = StartThread([&] { - TInstant started = TInstant::Now(); - for (size_t i2 = 0; i2 < Iterations; ++i2) { - { - TGuard<TMutex> guard(Mutex); - List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); - } + void TEST_Concurrent() { + LogInfo() << "starting producers..." << Endl; + + TVector<TAutoPtr<TWorkerThread>> producers(NumWriters); + for (size_t i1 = 0; i1 < producers.size(); ++i1) { + producers[i1] = StartThread([&] { + TInstant started = TInstant::Now(); + for (size_t i2 = 0; i2 < Iterations; ++i2) { + { + TGuard<TMutex> guard(Mutex); + List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); + } + } + TDuration duration = TInstant::Now() - started; + LogInfo() + << "Average time for producer = " + << (double)duration.MicroSeconds() / Iterations << "us per iteration" + << Endl; + }); + } + + LogInfo() << "starting consumers..." << Endl; + + TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders); + for (size_t i1 = 0; i1 < consumers.size(); ++i1) { + consumers[i1] = StartThread([&] { + TInstant started = TInstant::Now(); + for (size_t i2 = 0; i2 < Iterations; ++i2) { + List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf())); } - TDuration duration = TInstant::Now() - started; - LogInfo() - << "Average time for producer = " - << (double)duration.MicroSeconds() / Iterations << "us per iteration" - << Endl; - }); - } - - LogInfo() << "starting consumers..." << Endl; - - TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders); - for (size_t i1 = 0; i1 < consumers.size(); ++i1) { - consumers[i1] = StartThread([&] { - TInstant started = TInstant::Now(); - for (size_t i2 = 0; i2 < Iterations; ++i2) { - List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf())); - } - TDuration duration = TInstant::Now() - started; - LogInfo() - << "Average time for consumer = " - << (double)duration.MicroSeconds() / Iterations << "us per iteration" - << Endl; - }); - } - - LogInfo() << "wait for producers..." << Endl; - - TDuration producerTime; - for (size_t i = 0; i < producers.size(); ++i) { - producers[i]->Join(); - producerTime += producers[i]->GetTime(); - } - - LogInfo() << "wait for consumers..." << Endl; - - TDuration consumerTime; - for (size_t i = 0; i < consumers.size(); ++i) { - consumers[i]->Join(); - consumerTime += consumers[i]->GetTime(); - } - - LogInfo() << "average producer time: " - << producerTime.SecondsFloat() / producers.size() << " seconds" - << Endl; - - LogInfo() << "average consumer time: " - << consumerTime.SecondsFloat() / consumers.size() << " seconds" - << Endl; - } - }; - -} + TDuration duration = TInstant::Now() - started; + LogInfo() + << "Average time for consumer = " + << (double)duration.MicroSeconds() / Iterations << "us per iteration" + << Endl; + }); + } + + LogInfo() << "wait for producers..." << Endl; + + TDuration producerTime; + for (size_t i = 0; i < producers.size(); ++i) { + producers[i]->Join(); + producerTime += producers[i]->GetTime(); + } + + LogInfo() << "wait for consumers..." << Endl; + + TDuration consumerTime; + for (size_t i = 0; i < consumers.size(); ++i) { + consumers[i]->Join(); + consumerTime += consumers[i]->GetTime(); + } + + LogInfo() << "average producer time: " + << producerTime.SecondsFloat() / producers.size() << " seconds" + << Endl; + + LogInfo() << "average consumer time: " + << consumerTime.SecondsFloat() / consumers.size() << " seconds" + << Endl; + } + }; + +} //////////////////////////////////////////////////////////////////////////////// -int main(int argc, const char* argv[]) { +int main(int argc, const char* argv[]) { TTestSuite suite; if (!suite.Init(argc, argv)) { return -1; diff --git a/library/cpp/threading/skip_list/skiplist.h b/library/cpp/threading/skip_list/skiplist.h index c1ed46c4aa..914a7c6ee7 100644 --- a/library/cpp/threading/skip_list/skiplist.h +++ b/library/cpp/threading/skip_list/skiplist.h @@ -10,399 +10,399 @@ #include <util/system/atomic.h> namespace NThreading { - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - class TNopCounter { - protected: - template <typename T> - void OnInsert(const T&) { - } + class TNopCounter { + protected: + template <typename T> + void OnInsert(const T&) { + } - template <typename T> - void OnUpdate(const T&) { - } + template <typename T> + void OnUpdate(const T&) { + } - void Reset() { - } - }; + void Reset() { + } + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - class TSizeCounter { + class TSizeCounter { private: - size_t Size; + size_t Size; public: - TSizeCounter() - : Size(0) + TSizeCounter() + : Size(0) { } - size_t GetSize() const { - return Size; + size_t GetSize() const { + return Size; } - protected: - template <typename T> - void OnInsert(const T&) { - ++Size; + protected: + template <typename T> + void OnInsert(const T&) { + ++Size; } - template <typename T> - void OnUpdate(const T&) { + template <typename T> + void OnUpdate(const T&) { } - void Reset() { - Size = 0; + void Reset() { + Size = 0; } }; - //////////////////////////////////////////////////////////////////////////////// - // Append-only concurrent skip-list - // - // Readers do not require any synchronization. - // Writers should be externally synchronized. - // Nodes will be allocated using TMemoryPool instance. - - template < - typename T, - typename TComparer = TCompare<T>, - typename TAllocator = TMemoryPool, - typename TCounter = TSizeCounter, - int MaxHeight = 12, - int Branching = 4> - class TSkipList: public TCounter, private TNonCopyable { - class TNode { - private: - T Value; // should be immutable after insert - TNode* Next[]; // variable-size array maximum of MaxHeight values - - public: + //////////////////////////////////////////////////////////////////////////////// + // Append-only concurrent skip-list + // + // Readers do not require any synchronization. + // Writers should be externally synchronized. + // Nodes will be allocated using TMemoryPool instance. + + template < + typename T, + typename TComparer = TCompare<T>, + typename TAllocator = TMemoryPool, + typename TCounter = TSizeCounter, + int MaxHeight = 12, + int Branching = 4> + class TSkipList: public TCounter, private TNonCopyable { + class TNode { + private: + T Value; // should be immutable after insert + TNode* Next[]; // variable-size array maximum of MaxHeight values + + public: TNode(T&& value) : Value(std::move(value)) - { - Y_UNUSED(Next); - } - - const T& GetValue() const { - return Value; - } - - T& GetValue() { - return Value; - } - - TNode* GetNext(int height) const { - return AtomicGet(Next[height]); - } - - void Link(int height, TNode** prev) { - for (int i = 0; i < height; ++i) { - Next[i] = prev[i]->Next[i]; - AtomicSet(prev[i]->Next[i], this); - } - } - }; - + { + Y_UNUSED(Next); + } + + const T& GetValue() const { + return Value; + } + + T& GetValue() { + return Value; + } + + TNode* GetNext(int height) const { + return AtomicGet(Next[height]); + } + + void Link(int height, TNode** prev) { + for (int i = 0; i < height; ++i) { + Next[i] = prev[i]->Next[i]; + AtomicSet(prev[i]->Next[i], this); + } + } + }; + public: - class TIterator { - private: - const TSkipList* List; - const TNode* Node; - - public: - TIterator() - : List(nullptr) - , Node(nullptr) - { - } - - TIterator(const TSkipList* list, const TNode* node) - : List(list) - , Node(node) - { - } - - TIterator(const TIterator& other) - : List(other.List) - , Node(other.Node) - { - } - - TIterator& operator=(const TIterator& other) { - List = other.List; - Node = other.Node; - return *this; - } - - void Next() { - Node = Node ? Node->GetNext(0) : nullptr; + class TIterator { + private: + const TSkipList* List; + const TNode* Node; + + public: + TIterator() + : List(nullptr) + , Node(nullptr) + { + } + + TIterator(const TSkipList* list, const TNode* node) + : List(list) + , Node(node) + { + } + + TIterator(const TIterator& other) + : List(other.List) + , Node(other.Node) + { } - // much less efficient than Next as our list is single-linked - void Prev() { - if (Node) { - TNode* node = List->FindLessThan(Node->GetValue(), nullptr); - Node = (node != List->Head ? node : nullptr); - } - } + TIterator& operator=(const TIterator& other) { + List = other.List; + Node = other.Node; + return *this; + } - void Reset() { - Node = nullptr; - } + void Next() { + Node = Node ? Node->GetNext(0) : nullptr; + } - bool IsValid() const { - return Node != nullptr; - } + // much less efficient than Next as our list is single-linked + void Prev() { + if (Node) { + TNode* node = List->FindLessThan(Node->GetValue(), nullptr); + Node = (node != List->Head ? node : nullptr); + } + } + + void Reset() { + Node = nullptr; + } + + bool IsValid() const { + return Node != nullptr; + } - const T& GetValue() const { - Y_ASSERT(IsValid()); - return Node->GetValue(); - } - }; + const T& GetValue() const { + Y_ASSERT(IsValid()); + return Node->GetValue(); + } + }; - private: - TAllocator& Allocator; - TComparer Comparer; + private: + TAllocator& Allocator; + TComparer Comparer; - TNode* Head; - TAtomic Height; - TCounter Counter; + TNode* Head; + TAtomic Height; + TCounter Counter; - TNode* Prev[MaxHeight]; + TNode* Prev[MaxHeight]; - template <typename TValue> + template <typename TValue> using TComparerReturnType = std::invoke_result_t<TComparer, const T&, const TValue&>; - public: - TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer()) - : Allocator(allocator) - , Comparer(comparer) - { - Init(); - } - - ~TSkipList() { - CallDtors(); - } - - void Clear() { - CallDtors(); - Allocator.ClearKeepFirstChunk(); - Init(); + public: + TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer()) + : Allocator(allocator) + , Comparer(comparer) + { + Init(); + } + + ~TSkipList() { + CallDtors(); + } + + void Clear() { + CallDtors(); + Allocator.ClearKeepFirstChunk(); + Init(); } bool Insert(T value) { - TNode* node = PrepareInsert(value); - if (Y_UNLIKELY(node && Compare(node, value) == 0)) { - // we do not allow duplicates - return false; + TNode* node = PrepareInsert(value); + if (Y_UNLIKELY(node && Compare(node, value) == 0)) { + // we do not allow duplicates + return false; } node = DoInsert(std::move(value)); - TCounter::OnInsert(node->GetValue()); - return true; + TCounter::OnInsert(node->GetValue()); + return true; } - template <typename TInsertAction, typename TUpdateAction> - bool Insert(const T& value, TInsertAction insert, TUpdateAction update) { - TNode* node = PrepareInsert(value); - if (Y_UNLIKELY(node && Compare(node, value) == 0)) { - if (update(node->GetValue())) { - TCounter::OnUpdate(node->GetValue()); - return true; - } - // we do not allow duplicates - return false; - } - node = DoInsert(insert(value)); - TCounter::OnInsert(node->GetValue()); - return true; - } - - template <typename TValue> - bool Contains(const TValue& value) const { - TNode* node = FindGreaterThanOrEqual(value); - return node && Compare(node, value) == 0; - } - - TIterator SeekToFirst() const { - return TIterator(this, FindFirst()); - } - - TIterator SeekToLast() const { - TNode* last = FindLast(); - return TIterator(this, last != Head ? last : nullptr); - } - - template <typename TValue> - TIterator SeekTo(const TValue& value) const { - return TIterator(this, FindGreaterThanOrEqual(value)); + template <typename TInsertAction, typename TUpdateAction> + bool Insert(const T& value, TInsertAction insert, TUpdateAction update) { + TNode* node = PrepareInsert(value); + if (Y_UNLIKELY(node && Compare(node, value) == 0)) { + if (update(node->GetValue())) { + TCounter::OnUpdate(node->GetValue()); + return true; + } + // we do not allow duplicates + return false; + } + node = DoInsert(insert(value)); + TCounter::OnInsert(node->GetValue()); + return true; + } + + template <typename TValue> + bool Contains(const TValue& value) const { + TNode* node = FindGreaterThanOrEqual(value); + return node && Compare(node, value) == 0; + } + + TIterator SeekToFirst() const { + return TIterator(this, FindFirst()); + } + + TIterator SeekToLast() const { + TNode* last = FindLast(); + return TIterator(this, last != Head ? last : nullptr); + } + + template <typename TValue> + TIterator SeekTo(const TValue& value) const { + return TIterator(this, FindGreaterThanOrEqual(value)); } - private: - static int RandomHeight() { - int height = 1; - while (height < MaxHeight && (RandomNumber<unsigned int>() % Branching) == 0) { - ++height; - } - return height; - } - - void Init() { - Head = AllocateRootNode(); - Height = 1; - TCounter::Reset(); - - for (int i = 0; i < MaxHeight; ++i) { - Prev[i] = Head; - } + private: + static int RandomHeight() { + int height = 1; + while (height < MaxHeight && (RandomNumber<unsigned int>() % Branching) == 0) { + ++height; + } + return height; } - void CallDtors() { - if (!TTypeTraits<T>::IsPod) { - // we should explicitly call destructors for our nodes - TNode* node = Head->GetNext(0); - while (node) { - TNode* next = node->GetNext(0); - node->~TNode(); - node = next; - } + void Init() { + Head = AllocateRootNode(); + Height = 1; + TCounter::Reset(); + + for (int i = 0; i < MaxHeight; ++i) { + Prev[i] = Head; } } - TNode* AllocateRootNode() { - size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight; - void* buffer = Allocator.Allocate(size); - memset(buffer, 0, size); - return static_cast<TNode*>(buffer); - } + void CallDtors() { + if (!TTypeTraits<T>::IsPod) { + // we should explicitly call destructors for our nodes + TNode* node = Head->GetNext(0); + while (node) { + TNode* next = node->GetNext(0); + node->~TNode(); + node = next; + } + } + } + + TNode* AllocateRootNode() { + size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight; + void* buffer = Allocator.Allocate(size); + memset(buffer, 0, size); + return static_cast<TNode*>(buffer); + } TNode* AllocateNode(T&& value, int height) { - size_t size = sizeof(TNode) + sizeof(TNode*) * height; - void* buffer = Allocator.Allocate(size); - memset(buffer, 0, size); + size_t size = sizeof(TNode) + sizeof(TNode*) * height; + void* buffer = Allocator.Allocate(size); + memset(buffer, 0, size); return new (buffer) TNode(std::move(value)); - } - - TNode* FindFirst() const { - return Head->GetNext(0); - } - - TNode* FindLast() const { - TNode* node = Head; - int height = AtomicGet(Height) - 1; - - while (true) { - TNode* next = node->GetNext(height); - if (next) { - node = next; - continue; - } - - if (height) { - --height; - } else { - return node; - } + } + + TNode* FindFirst() const { + return Head->GetNext(0); + } + + TNode* FindLast() const { + TNode* node = Head; + int height = AtomicGet(Height) - 1; + + while (true) { + TNode* next = node->GetNext(height); + if (next) { + node = next; + continue; + } + + if (height) { + --height; + } else { + return node; + } } } - template <typename TValue> - TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const { - return Comparer(node->GetValue(), value); - } - - template <typename TValue> - TNode* FindLessThan(const TValue& value, TNode** links) const { - TNode* node = Head; - int height = AtomicGet(Height) - 1; - - TNode* prev = nullptr; - while (true) { - TNode* next = node->GetNext(height); - if (next && next != prev) { - TComparerReturnType<TValue> cmp = Compare(next, value); - if (cmp < 0) { - node = next; - continue; - } + template <typename TValue> + TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const { + return Comparer(node->GetValue(), value); + } + + template <typename TValue> + TNode* FindLessThan(const TValue& value, TNode** links) const { + TNode* node = Head; + int height = AtomicGet(Height) - 1; + + TNode* prev = nullptr; + while (true) { + TNode* next = node->GetNext(height); + if (next && next != prev) { + TComparerReturnType<TValue> cmp = Compare(next, value); + if (cmp < 0) { + node = next; + continue; + } } - if (links) { - // collect links from upper levels - links[height] = node; - } - - if (height) { - prev = next; - --height; - } else { - return node; - } + if (links) { + // collect links from upper levels + links[height] = node; + } + + if (height) { + prev = next; + --height; + } else { + return node; + } } } - template <typename TValue> - TNode* FindGreaterThanOrEqual(const TValue& value) const { - TNode* node = Head; - int height = AtomicGet(Height) - 1; - - TNode* prev = nullptr; - while (true) { - TNode* next = node->GetNext(height); - if (next && next != prev) { - TComparerReturnType<TValue> cmp = Compare(next, value); - if (cmp < 0) { - node = next; - continue; - } - if (cmp == 0) { - return next; - } + template <typename TValue> + TNode* FindGreaterThanOrEqual(const TValue& value) const { + TNode* node = Head; + int height = AtomicGet(Height) - 1; + + TNode* prev = nullptr; + while (true) { + TNode* next = node->GetNext(height); + if (next && next != prev) { + TComparerReturnType<TValue> cmp = Compare(next, value); + if (cmp < 0) { + node = next; + continue; + } + if (cmp == 0) { + return next; + } } - - if (height) { - prev = next; - --height; - } else { + + if (height) { + prev = next; + --height; + } else { return next; } } - } + } - TNode* PrepareInsert(const T& value) { - TNode* prev = Prev[0]; - TNode* next = prev->GetNext(0); - if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) { - // avoid seek in case of sequential insert + TNode* PrepareInsert(const T& value) { + TNode* prev = Prev[0]; + TNode* next = prev->GetNext(0); + if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) { + // avoid seek in case of sequential insert } else { - prev = FindLessThan(value, Prev); - next = prev->GetNext(0); + prev = FindLessThan(value, Prev); + next = prev->GetNext(0); } - return next; + return next; } TNode* DoInsert(T&& value) { - // choose level to place new node - int currentHeight = AtomicGet(Height); - int height = RandomHeight(); - if (height > currentHeight) { - for (int i = currentHeight; i < height; ++i) { - // head should link to all levels - Prev[i] = Head; - } - AtomicSet(Height, height); + // choose level to place new node + int currentHeight = AtomicGet(Height); + int height = RandomHeight(); + if (height > currentHeight) { + for (int i = currentHeight; i < height; ++i) { + // head should link to all levels + Prev[i] = Head; + } + AtomicSet(Height, height); } TNode* node = AllocateNode(std::move(value), height); - node->Link(height, Prev); + node->Link(height, Prev); - // keep last inserted node to optimize sequential inserts - for (int i = 0; i < height; i++) { - Prev[i] = node; - } - return node; + // keep last inserted node to optimize sequential inserts + for (int i = 0; i < height; i++) { + Prev[i] = node; + } + return node; } - }; + }; -} +} diff --git a/library/cpp/threading/skip_list/skiplist_ut.cpp b/library/cpp/threading/skip_list/skiplist_ut.cpp index fdc831dffd..52fcffda66 100644 --- a/library/cpp/threading/skip_list/skiplist_ut.cpp +++ b/library/cpp/threading/skip_list/skiplist_ut.cpp @@ -3,41 +3,41 @@ #include <library/cpp/testing/unittest/registar.h> namespace NThreading { - namespace { - struct TTestObject { - static size_t Count; - int Tag; - - TTestObject(int tag) - : Tag(tag) - { - ++Count; - } - - TTestObject(const TTestObject& other) - : Tag(other.Tag) - { - ++Count; - } - - ~TTestObject() { - --Count; - } - - bool operator<(const TTestObject& other) const { - return Tag < other.Tag; - } - }; - - size_t TTestObject::Count = 0; + namespace { + struct TTestObject { + static size_t Count; + int Tag; + + TTestObject(int tag) + : Tag(tag) + { + ++Count; + } + + TTestObject(const TTestObject& other) + : Tag(other.Tag) + { + ++Count; + } + + ~TTestObject() { + --Count; + } + + bool operator<(const TTestObject& other) const { + return Tag < other.Tag; + } + }; + + size_t TTestObject::Count = 0; } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// Y_UNIT_TEST_SUITE(TSkipListTest) { Y_UNIT_TEST(ShouldBeEmptyAfterCreation) { - TMemoryPool pool(1024); + TMemoryPool pool(1024); TSkipList<int> list(pool); UNIT_ASSERT_EQUAL(list.GetSize(), 0); @@ -182,4 +182,4 @@ namespace NThreading { } } -} +} diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp index 2223dce650..3b5203194a 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp @@ -14,9 +14,9 @@ class TTaskSchedulerTest: public TTestBase { class TCheckTask: public TTaskScheduler::IRepeatedTask { public: - TCheckTask(const TDuration& delay) - : Start_(Now()) - , Delay_(delay) + TCheckTask(const TDuration& delay) + : Start_(Now()) + , Delay_(delay) { AtomicIncrement(ScheduledTaskCounter_); } @@ -25,15 +25,15 @@ class TTaskSchedulerTest: public TTestBase { } bool Process() override { - const TDuration delay = Now() - Start_; + const TDuration delay = Now() - Start_; - if (delay < Delay_) { + if (delay < Delay_) { AtomicIncrement(BadTimeoutCounter_); } AtomicIncrement(ExecutedTaskCounter_); - - return false; + + return false; } static bool AllTaskExecuted() { @@ -45,8 +45,8 @@ class TTaskSchedulerTest: public TTestBase { } private: - TInstant Start_; - TDuration Delay_; + TInstant Start_; + TDuration Delay_; static TAtomic BadTimeoutCounter_; static TAtomic ScheduledTaskCounter_; static TAtomic ExecutedTaskCounter_; @@ -60,7 +60,7 @@ class TTaskSchedulerTest: public TTestBase { ScheduleCheckTask(10000); ScheduleCheckTask(5000); - Scheduler_.Start(); + Scheduler_.Start(); usleep(1000000); @@ -70,8 +70,8 @@ class TTaskSchedulerTest: public TTestBase { private: void ScheduleCheckTask(size_t delay) { - TDuration d = TDuration::MicroSeconds(delay); - + TDuration d = TDuration::MicroSeconds(delay); + Scheduler_.Add(new TCheckTask(d), d); } |