diff options
author | vskipin <vskipin@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
commit | 4d8b546b89b5afc08cf3667e176271c7ba935f33 (patch) | |
tree | 1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /library/cpp/threading | |
parent | 4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (diff) | |
download | ydb-4d8b546b89b5afc08cf3667e176271c7ba935f33.tar.gz |
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading')
26 files changed, 953 insertions, 953 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.cpp b/library/cpp/threading/chunk_queue/queue.cpp index 52dd119921..4ebd3f3205 100644 --- a/library/cpp/threading/chunk_queue/queue.cpp +++ b/library/cpp/threading/chunk_queue/queue.cpp @@ -1 +1 @@ -#include "queue.h" +#include "queue.h" diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index fdf4c93f92..55859601a1 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -1,35 +1,35 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/generic/typetraits.h> -#include <util/generic/vector.h> -#include <util/generic/ylimits.h> -#include <util/system/atomic.h> -#include <util/system/guard.h> -#include <util/system/spinlock.h> -#include <util/system/yassert.h> - -#include <type_traits> -#include <utility> - -namespace NThreading { -//////////////////////////////////////////////////////////////////////////////// -// Platform helpers - -#if !defined(PLATFORM_CACHE_LINE) +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/typetraits.h> +#include <util/generic/vector.h> +#include <util/generic/ylimits.h> +#include <util/system/atomic.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> +#include <util/system/yassert.h> + +#include <type_traits> +#include <utility> + +namespace NThreading { +//////////////////////////////////////////////////////////////////////////////// +// Platform helpers + +#if !defined(PLATFORM_CACHE_LINE) #define PLATFORM_CACHE_LINE 64 -#endif - -#if !defined(PLATFORM_PAGE_SIZE) +#endif + +#if !defined(PLATFORM_PAGE_SIZE) #define PLATFORM_PAGE_SIZE 4 * 1024 -#endif - +#endif + template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> struct TPadded: public T { char Pad[PadSize - sizeof(T) % PadSize]; - + TPadded() { static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); Y_UNUSED(Pad); @@ -43,10 +43,10 @@ namespace NThreading { Y_UNUSED(Pad); } }; - + //////////////////////////////////////////////////////////////////////////////// // Type helpers - + namespace NImpl { template <typename T> struct TPodTypeHelper { @@ -54,99 +54,99 @@ namespace NThreading { static void Write(T* ptr, TT&& value) { *ptr = value; } - + static T Read(T* ptr) { return *ptr; } - + static void Destroy(T* ptr) { Y_UNUSED(ptr); } }; - + template <typename T> struct TNonPodTypeHelper { template <typename TT> static void Write(T* ptr, TT&& value) { new (ptr) T(std::forward<TT>(value)); } - + static T Read(T* ptr) { return std::move(*ptr); } - + static void Destroy(T* ptr) { (void)ptr; /* Make MSVC happy. */ ptr->~T(); } }; - + template <typename T> using TTypeHelper = std::conditional_t< TTypeTraits<T>::IsPod, TPodTypeHelper<T>, TNonPodTypeHelper<T>>; - + } - + //////////////////////////////////////////////////////////////////////////////// // One producer/one consumer chunked queue. - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TOneOneQueue: private TNonCopyable { using TTypeHelper = NImpl::TTypeHelper<T>; - + struct TChunk; - + struct TChunkHeader { size_t Count = 0; TChunk* Next = nullptr; }; - + struct TChunk: public TChunkHeader { static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); - + char Entries[MaxCount * sizeof(T)]; - + TChunk() { Y_UNUSED(Entries); // uninitialized } - + ~TChunk() { for (size_t i = 0; i < this->Count; ++i) { TTypeHelper::Destroy(GetPtr(i)); } - } - + } + T* GetPtr(size_t i) { return (T*)Entries + i; } }; - + struct TWriterState { TChunk* Chunk = nullptr; }; - + struct TReaderState { TChunk* Chunk = nullptr; size_t Count = 0; }; - + private: TPadded<TWriterState> Writer; TPadded<TReaderState> Reader; - + public: using TItem = T; - + TOneOneQueue() { Writer.Chunk = Reader.Chunk = new TChunk(); } - + ~TOneOneQueue() { DeleteChunks(Reader.Chunk); } - + template <typename TT> void Enqueue(TT&& value) { T* ptr = PrepareWrite(); @@ -154,7 +154,7 @@ namespace NThreading { TTypeHelper::Write(ptr, std::forward<TT>(value)); CompleteWrite(); } - + bool Dequeue(T& value) { if (T* ptr = PrepareRead()) { value = TTypeHelper::Read(ptr); @@ -162,17 +162,17 @@ namespace NThreading { return true; } return false; - } - + } + bool IsEmpty() { return !PrepareRead(); } - + protected: T* PrepareWrite() { TChunk* chunk = Writer.Chunk; Y_ASSERT(chunk && !chunk->Next); - + if (chunk->Count != TChunk::MaxCount) { return chunk->GetPtr(chunk->Count); } @@ -181,41 +181,41 @@ namespace NThreading { AtomicSet(Writer.Chunk->Next, chunk); Writer.Chunk = chunk; return chunk->GetPtr(0); - } - + } + void CompleteWrite() { AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); } - + T* PrepareRead() { TChunk* chunk = Reader.Chunk; Y_ASSERT(chunk); - + for (;;) { size_t writerCount = AtomicGet(chunk->Count); if (Reader.Count != writerCount) { return chunk->GetPtr(Reader.Count); } - + if (writerCount != TChunk::MaxCount) { return nullptr; } - + chunk = AtomicGet(chunk->Next); if (!chunk) { return nullptr; } - + delete Reader.Chunk; Reader.Chunk = chunk; Reader.Count = 0; - } + } } - + void CompleteRead() { ++Reader.Count; - } - + } + private: static void DeleteChunks(TChunk* chunk) { while (chunk) { @@ -223,51 +223,51 @@ namespace NThreading { delete chunk; chunk = next; } - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Multiple producers/single consumer partitioned queue. // Provides FIFO guaranties for each producer. - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TManyOneQueue: private TNonCopyable { using TTypeHelper = NImpl::TTypeHelper<T>; - + struct TEntry { T Value; ui64 Tag; }; - + struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { TAtomic WriteLock = 0; - + using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; - + using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; }; - + private: union { TAtomic WriteTag = 0; char Pad[PLATFORM_CACHE_LINE]; }; - + TQueueType Queues[Concurrency]; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { ui64 tag = NextTag(); while (!TryEnqueue(std::forward<TT>(value), tag)) { SpinLockPause(); } - } - + } + bool Dequeue(T& value) { size_t index = 0; if (TEntry* entry = PrepareRead(index)) { @@ -276,24 +276,24 @@ namespace NThreading { return true; } return false; - } - + } + bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { if (!Queues[i].IsEmpty()) { return false; } - } + } return true; - } - + } + private: ui64 NextTag() { // TODO: can we avoid synchronization here? it costs 1.5x performance penalty // return GetCycleCount(); return AtomicIncrement(WriteTag); } - + template <typename TT> bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { @@ -307,22 +307,22 @@ namespace NThreading { AtomicUnlock(&queue.WriteLock); return true; } - } + } return false; - } - + } + TEntry* PrepareRead(size_t& index) { TEntry* entry = nullptr; ui64 tag = Max(); - + for (size_t i = 0; i < Concurrency; ++i) { - TEntry* e = Queues[i].PrepareRead(); - if (e && e->Tag < tag) { - index = i; - entry = e; - tag = e->Tag; - } - } + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } if (entry) { // need second pass to catch updates within already scanned range @@ -338,91 +338,91 @@ namespace NThreading { } return entry; - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Concurrent many-many queue with strong FIFO guaranties. // Writers will not block readers (and vice versa), but will block each other. - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> class TManyManyQueue: private TNonCopyable { private: TPadded<TLock> WriteLock; TPadded<TLock> ReadLock; - + TOneOneQueue<T, ChunkSize> Queue; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { with_lock (WriteLock) { Queue.Enqueue(std::forward<TT>(value)); } - } - + } + bool Dequeue(T& value) { with_lock (ReadLock) { return Queue.Dequeue(value); } - } - + } + bool IsEmpty() { with_lock (ReadLock) { return Queue.IsEmpty(); } - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Multiple producers/single consumer partitioned queue. // Because of random partitioning reordering possible - FIFO not guaranteed! - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TRelaxedManyOneQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue<T, ChunkSize> { TAtomic WriteLock = 0; }; - + private: union { size_t ReadPos = 0; char Pad[PLATFORM_CACHE_LINE]; }; - + TQueueType Queues[Concurrency]; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { while (!TryEnqueue(std::forward<TT>(value))) { SpinLockPause(); } - } - + } + bool Dequeue(T& value) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[ReadPos++ % Concurrency]; if (queue.Dequeue(value)) { return true; } - } + } return false; - } - + } + bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { if (!Queues[i].IsEmpty()) { return false; } - } + } return true; - } - + } + private: template <typename TT> bool TryEnqueue(TT&& value) { @@ -434,15 +434,15 @@ namespace NThreading { AtomicUnlock(&queue.WriteLock); return true; } - } + } return false; - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Concurrent many-many partitioned queue. // Because of random partitioning reordering possible - FIFO not guaranteed! - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TRelaxedManyManyQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue<T, ChunkSize> { @@ -454,21 +454,21 @@ namespace NThreading { TAtomic ReadLock = 0; char Pad2[PLATFORM_CACHE_LINE]; }; - }; - + }; + private: TQueueType Queues[Concurrency]; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { while (!TryEnqueue(std::forward<TT>(value))) { SpinLockPause(); } - } - + } + bool Dequeue(T& value) { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { @@ -479,11 +479,11 @@ namespace NThreading { if (dequeued) { return true; } - } - } + } + } return false; - } - + } + bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; @@ -493,11 +493,11 @@ namespace NThreading { if (!empty) { return false; } - } - } + } + } return true; - } - + } + private: template <typename TT> bool TryEnqueue(TT&& value) { @@ -509,34 +509,34 @@ namespace NThreading { AtomicUnlock(&queue.WriteLock); return true; } - } + } return false; - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Simple wrapper to deal with AutoPtrs - + template <typename T, typename TImpl> class TAutoQueueBase: private TNonCopyable { private: TImpl Impl; - + public: using TItem = TAutoPtr<T>; - + ~TAutoQueueBase() { TAutoPtr<T> value; while (Dequeue(value)) { // do nothing } - } - + } + void Enqueue(TAutoPtr<T> value) { Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); } - + bool Dequeue(TAutoPtr<T>& value) { T* ptr = nullptr; if (Impl.Dequeue(ptr)) { @@ -544,25 +544,25 @@ namespace NThreading { return true; } return false; - } - + } + bool IsEmpty() { return Impl.IsEmpty(); } }; - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; } diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp index dc103202e8..8cb36d8dd1 100644 --- a/library/cpp/threading/chunk_queue/queue_ut.cpp +++ b/library/cpp/threading/chunk_queue/queue_ut.cpp @@ -1,202 +1,202 @@ -#include "queue.h" - +#include "queue.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/set.h> - -namespace NThreading { + +#include <util/generic/set.h> + +namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - + Y_UNIT_TEST_SUITE(TOneOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ TOneOneQueue<int> queue; - + int result = 0; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TOneOneQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); queue.Enqueue(3); - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 1); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 2); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 3); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldStoreMultipleChunks) { TOneOneQueue<int, 100> queue; for (int i = 0; i < 1000; ++i) { queue.Enqueue(i); - } - + } + for (int i = 0; i < 1000; ++i) { int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, i); - } + } } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TManyOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TManyOneQueue<int> queue; - + TManyOneQueue<int> queue; + int result; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TManyOneQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); queue.Enqueue(3); - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 1); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 2); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 3); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TManyManyQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TManyManyQueue<int> queue; - + TManyManyQueue<int> queue; + int result = 0; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TManyManyQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); queue.Enqueue(3); - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 1); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 2); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 3); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TRelaxedManyOneQueue<int> queue; - + TRelaxedManyOneQueue<int> queue; + int result; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TSet<int> items = {1, 2, 3}; - + TRelaxedManyOneQueue<int> queue; for (int item : items) { queue.Enqueue(item); } - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TRelaxedManyManyQueue<int> queue; - + TRelaxedManyManyQueue<int> queue; + int result = 0; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TSet<int> items = {1, 2, 3}; - + TRelaxedManyManyQueue<int> queue; for (int item : items) { queue.Enqueue(item); } - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } diff --git a/library/cpp/threading/chunk_queue/readme.txt b/library/cpp/threading/chunk_queue/readme.txt index 104a8ec744..7c9f046a86 100644 --- a/library/cpp/threading/chunk_queue/readme.txt +++ b/library/cpp/threading/chunk_queue/readme.txt @@ -1,60 +1,60 @@ -vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64 -2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000 -2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4 -2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4 -2016-05-08T11:49:56.729502Z INFO: starting consumers... -2016-05-08T11:49:56.729621Z INFO: starting producers... -2016-05-08T11:49:56.729711Z INFO: wait for producers... -2016-05-08T11:50:14.650803Z INFO: wait for consumers... -2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds -2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds -2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration) -2016-05-08T11:50:14.650913Z INFO: starting consumers... -2016-05-08T11:50:14.651028Z INFO: starting producers... -2016-05-08T11:50:14.651122Z INFO: wait for producers... -2016-05-08T11:50:31.426378Z INFO: wait for consumers... -2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds -2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds -2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration) -2016-05-08T11:50:31.426584Z INFO: starting consumers... -2016-05-08T11:50:31.426655Z INFO: starting producers... -2016-05-08T11:50:31.426749Z INFO: wait for producers... -2016-05-08T11:50:40.578425Z INFO: wait for consumers... -2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds -2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds -2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration) -2016-05-08T11:50:40.578670Z INFO: starting consumers... -2016-05-08T11:50:40.578742Z INFO: starting producers... -2016-05-08T11:50:40.578893Z INFO: wait for producers... -2016-05-08T11:50:47.447686Z INFO: wait for consumers... -2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds -2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds -2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration) -2016-05-08T11:50:47.447901Z INFO: starting consumers... -2016-05-08T11:50:47.447967Z INFO: starting producers... -2016-05-08T11:50:47.448058Z INFO: wait for producers... -2016-05-08T11:50:50.469710Z INFO: wait for consumers... -2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds -2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds -2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration) -2016-05-08T11:50:50.469947Z INFO: starting consumers... -2016-05-08T11:50:50.470012Z INFO: starting producers... -2016-05-08T11:50:50.470104Z INFO: wait for producers... -2016-05-08T11:50:53.139964Z INFO: wait for consumers... -2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds -2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds -2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration) -2016-05-08T11:50:53.140206Z INFO: starting consumers... -2016-05-08T11:50:53.140281Z INFO: starting producers... -2016-05-08T11:50:53.140371Z INFO: wait for producers... -2016-05-08T11:50:59.067812Z INFO: wait for consumers... -2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds -2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds -2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration) -2016-05-08T11:50:59.068068Z INFO: starting consumers... -2016-05-08T11:50:59.068179Z INFO: starting producers... -2016-05-08T11:50:59.068288Z INFO: wait for producers... -2016-05-08T11:51:03.427416Z INFO: wait for consumers... -2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds -2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds -2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration) +vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64 +2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000 +2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4 +2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4 +2016-05-08T11:49:56.729502Z INFO: starting consumers... +2016-05-08T11:49:56.729621Z INFO: starting producers... +2016-05-08T11:49:56.729711Z INFO: wait for producers... +2016-05-08T11:50:14.650803Z INFO: wait for consumers... +2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds +2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds +2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration) +2016-05-08T11:50:14.650913Z INFO: starting consumers... +2016-05-08T11:50:14.651028Z INFO: starting producers... +2016-05-08T11:50:14.651122Z INFO: wait for producers... +2016-05-08T11:50:31.426378Z INFO: wait for consumers... +2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds +2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds +2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration) +2016-05-08T11:50:31.426584Z INFO: starting consumers... +2016-05-08T11:50:31.426655Z INFO: starting producers... +2016-05-08T11:50:31.426749Z INFO: wait for producers... +2016-05-08T11:50:40.578425Z INFO: wait for consumers... +2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds +2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds +2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration) +2016-05-08T11:50:40.578670Z INFO: starting consumers... +2016-05-08T11:50:40.578742Z INFO: starting producers... +2016-05-08T11:50:40.578893Z INFO: wait for producers... +2016-05-08T11:50:47.447686Z INFO: wait for consumers... +2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds +2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds +2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration) +2016-05-08T11:50:47.447901Z INFO: starting consumers... +2016-05-08T11:50:47.447967Z INFO: starting producers... +2016-05-08T11:50:47.448058Z INFO: wait for producers... +2016-05-08T11:50:50.469710Z INFO: wait for consumers... +2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds +2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds +2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration) +2016-05-08T11:50:50.469947Z INFO: starting consumers... +2016-05-08T11:50:50.470012Z INFO: starting producers... +2016-05-08T11:50:50.470104Z INFO: wait for producers... +2016-05-08T11:50:53.139964Z INFO: wait for consumers... +2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds +2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds +2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration) +2016-05-08T11:50:53.140206Z INFO: starting consumers... +2016-05-08T11:50:53.140281Z INFO: starting producers... +2016-05-08T11:50:53.140371Z INFO: wait for producers... +2016-05-08T11:50:59.067812Z INFO: wait for consumers... +2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds +2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds +2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration) +2016-05-08T11:50:59.068068Z INFO: starting consumers... +2016-05-08T11:50:59.068179Z INFO: starting producers... +2016-05-08T11:50:59.068288Z INFO: wait for producers... +2016-05-08T11:51:03.427416Z INFO: wait for consumers... +2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds +2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds +2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration) diff --git a/library/cpp/threading/chunk_queue/ut/ya.make b/library/cpp/threading/chunk_queue/ut/ya.make index d69e219f66..a35ed6bc4b 100644 --- a/library/cpp/threading/chunk_queue/ut/ya.make +++ b/library/cpp/threading/chunk_queue/ut/ya.make @@ -1,9 +1,9 @@ UNITTEST_FOR(library/cpp/threading/chunk_queue) - + OWNER(g:rtmr) - -SRCS( - queue_ut.cpp -) - -END() + +SRCS( + queue_ut.cpp +) + +END() diff --git a/library/cpp/threading/chunk_queue/ya.make b/library/cpp/threading/chunk_queue/ya.make index 7e6ead7b36..2f883140ba 100644 --- a/library/cpp/threading/chunk_queue/ya.make +++ b/library/cpp/threading/chunk_queue/ya.make @@ -1,9 +1,9 @@ -LIBRARY() - +LIBRARY() + OWNER(g:rtmr) - -SRCS( - queue.cpp -) - -END() + +SRCS( + queue.cpp +) + +END() diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index a72985ec47..5fd4296a93 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -1,21 +1,21 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include future-inl.h directly" +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include future-inl.h directly" #endif // INCLUDE_FUTURE_INL_H - + namespace NThreading { namespace NImpl { //////////////////////////////////////////////////////////////////////////////// - + template <typename T> using TCallback = std::function<void(const TFuture<T>&)>; - + template <typename T> using TCallbackList = TVector<TCallback<T>>; // TODO: small vector - + //////////////////////////////////////////////////////////////////////////////// - + enum class TError { Error }; @@ -29,28 +29,28 @@ namespace NThreading { ValueSet, ValueRead, }; - + private: mutable TAtomic State; TAdaptiveLock StateLock; - + TCallbackList<T> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - + std::exception_ptr Exception; - + union { char NullValue; T Value; }; - + void AccessValue(TDuration timeout, int acquireState) const { int state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ythrow TFutureException() << "value not set"; } - + if (!Wait(timeout)) { ythrow TFutureException() << "wait timeout"; } @@ -114,17 +114,17 @@ namespace NThreading { bool HasException() const { return AtomicGet(State) == ExceptionSet; } - + const T& GetValue(TDuration timeout = TDuration::Zero()) const { AccessValue(timeout, ValueRead); return Value; } - + T ExtractValue(TDuration timeout = TDuration::Zero()) { AccessValue(timeout, ValueMoved); return std::move(Value); } - + template <typename TT> void SetValue(TT&& value) { bool success = TrySetValue(std::forward<TT>(value)); @@ -137,21 +137,21 @@ namespace NThreading { bool TrySetValue(TT&& value) { TSystemEvent* readyEvent = nullptr; TCallbackList<T> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + new (&Value) T(std::forward<TT>(value)); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ValueSet); } - + if (readyEvent) { readyEvent->Signal(); } @@ -164,8 +164,8 @@ namespace NThreading { } return true; - } - + } + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { @@ -176,18 +176,18 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent; TCallbackList<T> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + Exception = std::move(e); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ExceptionSet); } @@ -203,8 +203,8 @@ namespace NThreading { } return true; - } - + } + template <typename F> bool Subscribe(F&& func) { with_lock (StateLock) { @@ -216,33 +216,33 @@ namespace NThreading { } return false; } - + void Wait() const { Wait(TInstant::Max()); - } - + } + bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } - + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - + with_lock (StateLock) { int state = AtomicGet(State); if (state != NotReady) { return true; } - + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); } readyEvent = ReadyEvent.Get(); } - + Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); - } + } void TryRethrowWithState(int state) const { if (Y_UNLIKELY(state == ExceptionSet)) { @@ -251,9 +251,9 @@ namespace NThreading { } } }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { enum { @@ -261,22 +261,22 @@ namespace NThreading { ValueSet, ExceptionSet, }; - + private: TAtomic State; TAdaptiveLock StateLock; - + TCallbackList<void> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - + std::exception_ptr Exception; public: TFutureState(bool valueSet = false) : State(valueSet ? ValueSet : NotReady) { - } - + } + TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) , Exception(std::move(exception)) @@ -285,8 +285,8 @@ namespace NThreading { bool HasValue() const { return AtomicGet(State) == ValueSet; - } - + } + void TryRethrow() const { int state = AtomicGet(State); TryRethrowWithState(state); @@ -295,26 +295,26 @@ namespace NThreading { bool HasException() const { return AtomicGet(State) == ExceptionSet; } - + void GetValue(TDuration timeout = TDuration::Zero()) const { int state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ythrow TFutureException() << "value not set"; } - + if (!Wait(timeout)) { ythrow TFutureException() << "wait timeout"; } - + state = AtomicGet(State); } - + TryRethrowWithState(state); - + Y_ASSERT(state == ValueSet); } - + void SetValue() { bool success = TrySetValue(); if (Y_UNLIKELY(!success)) { @@ -325,19 +325,19 @@ namespace NThreading { bool TrySetValue() { TSystemEvent* readyEvent = nullptr; TCallbackList<void> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ValueSet); } - + if (readyEvent) { readyEvent->Signal(); } @@ -350,8 +350,8 @@ namespace NThreading { } return true; - } - + } + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { @@ -362,25 +362,25 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent = nullptr; TCallbackList<void> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + Exception = std::move(e); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ExceptionSet); } - + if (readyEvent) { readyEvent->Signal(); } - + if (callbacks) { TFuture<void> temp(this); for (auto& callback : callbacks) { @@ -390,7 +390,7 @@ namespace NThreading { return true; } - + template <typename F> bool Subscribe(F&& func) { with_lock (StateLock) { @@ -402,15 +402,15 @@ namespace NThreading { } return false; } - + void Wait() const { Wait(TInstant::Max()); - } - + } + bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } - + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; @@ -428,7 +428,7 @@ namespace NThreading { Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); - } + } void TryRethrowWithState(int state) const { if (Y_UNLIKELY(state == ExceptionSet)) { @@ -437,19 +437,19 @@ namespace NThreading { } } }; - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const T& value) { promise.SetValue(value); } - + template <typename T> inline void SetValueImpl(TPromise<T>& promise, T&& value) { promise.SetValue(std::move(value)); - } - + } + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future, std::enable_if_t<!std::is_void<T>::value, bool> = false) { @@ -463,8 +463,8 @@ namespace NThreading { } promise.SetValue(*value); }); - } - + } + template <typename T> inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) { future.Subscribe([=](const TFuture<T>& f) mutable { @@ -487,9 +487,9 @@ namespace NThreading { if (Y_UNLIKELY(!success)) { throw; } - } - } - + } + } + template <typename F> inline void SetValue(TPromise<void>& promise, F&& func, std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { @@ -498,14 +498,14 @@ namespace NThreading { } catch (...) { promise.SetException(std::current_exception()); return; - } + } promise.SetValue(); - } - + } + } - + //////////////////////////////////////////////////////////////////////////////// - + class TFutureStateId { private: const void* Id; @@ -535,41 +535,41 @@ namespace NThreading { template <typename T> inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) - { - } - + { + } + template <typename T> inline void TFuture<T>::Swap(TFuture<T>& other) { State.Swap(other.State); } - + template <typename T> inline bool TFuture<T>::HasValue() const { return State && State->HasValue(); } - + template <typename T> inline const T& TFuture<T>::GetValue(TDuration timeout) const { EnsureInitialized(); return State->GetValue(timeout); - } - + } + template <typename T> inline T TFuture<T>::ExtractValue(TDuration timeout) { EnsureInitialized(); return State->ExtractValue(timeout); } - + template <typename T> inline const T& TFuture<T>::GetValueSync() const { return GetValue(TDuration::Max()); } - + template <typename T> inline T TFuture<T>::ExtractValueSync() { return ExtractValue(TDuration::Max()); } - + template <typename T> inline void TFuture<T>::TryRethrow() const { if (State) { @@ -581,25 +581,25 @@ namespace NThreading { inline bool TFuture<T>::HasException() const { return State && State->HasException(); } - + template <typename T> inline void TFuture<T>::Wait() const { EnsureInitialized(); return State->Wait(); } - + template <typename T> inline bool TFuture<T>::Wait(TDuration timeout) const { EnsureInitialized(); return State->Wait(timeout); } - + template <typename T> inline bool TFuture<T>::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } - + template <typename T> template <typename F> inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { @@ -609,7 +609,7 @@ namespace NThreading { } return *this; } - + template <typename T> template <typename F> inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { @@ -626,7 +626,7 @@ namespace NThreading { }); return promise; } - + template <typename T> inline TFuture<void> TFuture<T>::IgnoreResult() const { auto promise = NewPromise(); @@ -639,8 +639,8 @@ namespace NThreading { template <typename T> inline bool TFuture<T>::Initialized() const { return bool(State); - } - + } + template <typename T> inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); @@ -650,33 +650,33 @@ namespace NThreading { inline void TFuture<T>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; - } + } } - + //////////////////////////////////////////////////////////////////////////////// - + inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + inline void TFuture<void>::Swap(TFuture<void>& other) { State.Swap(other.State); } - + inline bool TFuture<void>::HasValue() const { return State && State->HasValue(); } - + inline void TFuture<void>::GetValue(TDuration timeout) const { EnsureInitialized(); State->GetValue(timeout); } - + inline void TFuture<void>::GetValueSync() const { GetValue(TDuration::Max()); } - + inline void TFuture<void>::TryRethrow() const { if (State) { State->TryRethrow(); @@ -686,7 +686,7 @@ namespace NThreading { inline bool TFuture<void>::HasException() const { return State && State->HasException(); } - + inline void TFuture<void>::Wait() const { EnsureInitialized(); return State->Wait(); @@ -696,12 +696,12 @@ namespace NThreading { EnsureInitialized(); return State->Wait(timeout); } - + inline bool TFuture<void>::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } - + template <typename F> inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { EnsureInitialized(); @@ -710,7 +710,7 @@ namespace NThreading { } return *this; } - + template <typename F> inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); @@ -725,7 +725,7 @@ namespace NThreading { }); return promise; } - + template <typename R> inline TFuture<R> TFuture<void>::Return(const R& value) const { auto promise = NewPromise<R>(); @@ -739,12 +739,12 @@ namespace NThreading { promise.SetValue(value); }); return promise; - } - + } + inline bool TFuture<void>::Initialized() const { return bool(State); } - + inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } @@ -752,39 +752,39 @@ namespace NThreading { inline void TFuture<void>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; - } + } } - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + template <typename T> inline void TPromise<T>::Swap(TPromise<T>& other) { State.Swap(other.State); } - + template <typename T> inline const T& TPromise<T>::GetValue() const { EnsureInitialized(); return State->GetValue(); } - + template <typename T> inline T TPromise<T>::ExtractValue() { EnsureInitialized(); return State->ExtractValue(); } - + template <typename T> inline bool TPromise<T>::HasValue() const { return State && State->HasValue(); } - + template <typename T> inline void TPromise<T>::SetValue(const T& value) { EnsureInitialized(); @@ -796,7 +796,7 @@ namespace NThreading { EnsureInitialized(); State->SetValue(std::move(value)); } - + template <typename T> inline bool TPromise<T>::TrySetValue(const T& value) { EnsureInitialized(); @@ -820,19 +820,19 @@ namespace NThreading { inline bool TPromise<T>::HasException() const { return State && State->HasException(); } - + template <typename T> inline void TPromise<T>::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } - + template <typename T> inline void TPromise<T>::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } - + template <typename T> inline bool TPromise<T>::TrySetException(std::exception_ptr e) { EnsureInitialized(); @@ -844,49 +844,49 @@ namespace NThreading { EnsureInitialized(); return TFuture<T>(State); } - + template <typename T> inline TPromise<T>::operator TFuture<T>() const { return GetFuture(); } - + template <typename T> inline bool TPromise<T>::Initialized() const { return bool(State); } - + template <typename T> inline void TPromise<T>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; } } - + //////////////////////////////////////////////////////////////////////////////// - + inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + inline void TPromise<void>::Swap(TPromise<void>& other) { State.Swap(other.State); } - + inline void TPromise<void>::GetValue() const { EnsureInitialized(); State->GetValue(); } - + inline bool TPromise<void>::HasValue() const { return State && State->HasValue(); } - + inline void TPromise<void>::SetValue() { EnsureInitialized(); State->SetValue(); } - + inline bool TPromise<void>::TrySetValue() { EnsureInitialized(); return State->TrySetValue(); @@ -901,17 +901,17 @@ namespace NThreading { inline bool TPromise<void>::HasException() const { return State && State->HasException(); } - + inline void TPromise<void>::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } - + inline void TPromise<void>::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } - + inline bool TPromise<void>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); @@ -921,42 +921,42 @@ namespace NThreading { EnsureInitialized(); return TFuture<void>(State); } - + inline TPromise<void>::operator TFuture<void>() const { return GetFuture(); } - + inline bool TPromise<void>::Initialized() const { return bool(State); } - + inline void TPromise<void>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; } } - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline TPromise<T> NewPromise() { return {new NImpl::TFutureState<T>()}; - } - + } + inline TPromise<void> NewPromise() { return {new NImpl::TFutureState<void>()}; } - + template <typename T> inline TFuture<T> MakeFuture(const T& value) { return {new NImpl::TFutureState<T>(value)}; } - + template <typename T> inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; } - + template <typename T> inline TFuture<T> MakeFuture() { struct TCache { @@ -970,7 +970,7 @@ namespace NThreading { }; return Singleton<TCache>()->Instance; } - + template <typename T> inline TFuture<T> MakeErrorFuture(std::exception_ptr exception) { @@ -983,4 +983,4 @@ namespace NThreading { }; return Singleton<TCache>()->Instance; } -} +} diff --git a/library/cpp/threading/future/core/future.cpp b/library/cpp/threading/future/core/future.cpp index 257a2a218f..3243afcb40 100644 --- a/library/cpp/threading/future/core/future.cpp +++ b/library/cpp/threading/future/core/future.cpp @@ -1 +1 @@ -#include "future.h" +#include "future.h" diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 2dfc4e0f25..2e82bb953e 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -1,26 +1,26 @@ -#pragma once - +#pragma once + #include "fwd.h" -#include <util/datetime/base.h> -#include <util/generic/function.h> +#include <util/datetime/base.h> +#include <util/generic/function.h> #include <util/generic/maybe.h> -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/generic/yexception.h> -#include <util/system/event.h> -#include <util/system/spinlock.h> - +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/system/event.h> +#include <util/system/spinlock.h> + namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - + struct TFutureException: public yexception {}; - + // creates unset promise template <typename T> TPromise<T> NewPromise(); TPromise<void> NewPromise(); - + // creates preset future template <typename T> TFuture<T> MakeFuture(const T& value); @@ -31,18 +31,18 @@ namespace NThreading { template <typename T> TFuture<T> MakeErrorFuture(std::exception_ptr exception); TFuture<void> MakeFuture(); - + //////////////////////////////////////////////////////////////////////////////// - + namespace NImpl { template <typename T> class TFutureState; - + template <typename T> struct TFutureType { using TType = T; }; - + template <typename T> struct TFutureType<TFuture<T>> { using TType = typename TFutureType<T>::TType; @@ -54,10 +54,10 @@ namespace NThreading { using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>())); }; } - + template <typename F> using TFutureType = typename NImpl::TFutureType<F>::TType; - + template <typename F, typename T> using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType; @@ -65,14 +65,14 @@ namespace NThreading { class TFutureStateId; //////////////////////////////////////////////////////////////////////////////// - + template <typename T> class TFuture { using TFutureState = NImpl::TFutureState<T>; - + private: TIntrusivePtr<TFutureState> State; - + public: using value_type = T; @@ -80,29 +80,29 @@ namespace NThreading { TFuture(const TFuture<T>& other) noexcept = default; TFuture(TFuture<T>&& other) noexcept = default; TFuture(const TIntrusivePtr<TFutureState>& state) noexcept; - + TFuture<T>& operator=(const TFuture<T>& other) noexcept = default; TFuture<T>& operator=(TFuture<T>&& other) noexcept = default; void Swap(TFuture<T>& other); - + bool Initialized() const; - + bool HasValue() const; const T& GetValue(TDuration timeout = TDuration::Zero()) const; const T& GetValueSync() const; T ExtractValue(TDuration timeout = TDuration::Zero()); T ExtractValueSync(); - + void TryRethrow() const; bool HasException() const; - + void Wait() const; bool Wait(TDuration timeout) const; bool Wait(TInstant deadline) const; - + template <typename F> const TFuture<T>& Subscribe(F&& callback) const; - + // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely template <typename F> @@ -110,9 +110,9 @@ namespace NThreading { template <typename F> TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const; - + TFuture<void> IgnoreResult() const; - + //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death **/ @@ -120,16 +120,16 @@ namespace NThreading { void EnsureInitialized() const; }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TFuture<void> { using TFutureState = NImpl::TFutureState<void>; - + private: TIntrusivePtr<TFutureState> State = nullptr; - + public: using value_type = void; @@ -137,27 +137,27 @@ namespace NThreading { TFuture(const TFuture<void>& other) noexcept = default; TFuture(TFuture<void>&& other) noexcept = default; TFuture(const TIntrusivePtr<TFutureState>& state) noexcept; - + TFuture<void>& operator=(const TFuture<void>& other) noexcept = default; TFuture<void>& operator=(TFuture<void>&& other) noexcept = default; void Swap(TFuture<void>& other); - + bool Initialized() const; - + bool HasValue() const; void GetValue(TDuration timeout = TDuration::Zero()) const; void GetValueSync() const; - + void TryRethrow() const; bool HasException() const; - + void Wait() const; bool Wait(TDuration timeout) const; bool Wait(TInstant deadline) const; - + template <typename F> const TFuture<void>& Subscribe(F&& callback) const; - + // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely template <typename F> @@ -165,10 +165,10 @@ namespace NThreading { template <typename F> TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const; - + template <typename R> TFuture<R> Return(const R& value) const; - + TFuture<void> IgnoreResult() const { return *this; } @@ -180,35 +180,35 @@ namespace NThreading { void EnsureInitialized() const; }; - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> class TPromise { using TFutureState = NImpl::TFutureState<T>; - + private: TIntrusivePtr<TFutureState> State = nullptr; - + public: TPromise() noexcept = default; TPromise(const TPromise<T>& other) noexcept = default; TPromise(TPromise<T>&& other) noexcept = default; TPromise(const TIntrusivePtr<TFutureState>& state) noexcept; - + TPromise<T>& operator=(const TPromise<T>& other) noexcept = default; TPromise<T>& operator=(TPromise<T>&& other) noexcept = default; void Swap(TPromise<T>& other); - + bool Initialized() const; - + bool HasValue() const; const T& GetValue() const; T ExtractValue(); - + void SetValue(const T& value); void SetValue(T&& value); - + bool TrySetValue(const T& value); bool TrySetValue(T&& value); @@ -217,56 +217,56 @@ namespace NThreading { void SetException(const TString& e); void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - + TFuture<T> GetFuture() const; operator TFuture<T>() const; - + private: void EnsureInitialized() const; }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TPromise<void> { using TFutureState = NImpl::TFutureState<void>; - + private: TIntrusivePtr<TFutureState> State; - + public: TPromise() noexcept = default; TPromise(const TPromise<void>& other) noexcept = default; TPromise(TPromise<void>&& other) noexcept = default; TPromise(const TIntrusivePtr<TFutureState>& state) noexcept; - + TPromise<void>& operator=(const TPromise<void>& other) noexcept = default; TPromise<void>& operator=(TPromise<void>&& other) noexcept = default; void Swap(TPromise<void>& other); - + bool Initialized() const; - + bool HasValue() const; void GetValue() const; - + void SetValue(); bool TrySetValue(); - + void TryRethrow() const; bool HasException() const; void SetException(const TString& e); void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - + TFuture<void> GetFuture() const; operator TFuture<void>() const; - + private: void EnsureInitialized() const; }; - + } - -#define INCLUDE_FUTURE_INL_H -#include "future-inl.h" -#undef INCLUDE_FUTURE_INL_H + +#define INCLUDE_FUTURE_INL_H +#include "future-inl.h" +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/future.h b/library/cpp/threading/future/future.h index 6b138a3583..35db9abbe2 100644 --- a/library/cpp/threading/future/future.h +++ b/library/cpp/threading/future/future.h @@ -1,4 +1,4 @@ -#pragma once - +#pragma once + #include "core/future.h" #include "wait/wait.h" diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp index a9d5a6cfbd..05950a568d 100644 --- a/library/cpp/threading/future/future_ut.cpp +++ b/library/cpp/threading/future/future_ut.cpp @@ -1,7 +1,7 @@ -#include "future.h" - +#include "future.h" + #include <library/cpp/testing/unittest/registar.h> - + #include <list> #include <type_traits> @@ -63,168 +63,168 @@ namespace { } //////////////////////////////////////////////////////////////////////////////// - + Y_UNIT_TEST_SUITE(TFutureTest) { Y_UNIT_TEST(ShouldInitiallyHasNoValue) { TPromise<int> promise; UNIT_ASSERT(!promise.HasValue()); - + promise = NewPromise<int>(); UNIT_ASSERT(!promise.HasValue()); - + TFuture<int> future; UNIT_ASSERT(!future.HasValue()); - + future = promise.GetFuture(); UNIT_ASSERT(!future.HasValue()); } - + Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) { TPromise<void> promise; UNIT_ASSERT(!promise.HasValue()); - + promise = NewPromise(); UNIT_ASSERT(!promise.HasValue()); - + TFuture<void> future; UNIT_ASSERT(!future.HasValue()); - + future = promise.GetFuture(); UNIT_ASSERT(!future.HasValue()); } - + Y_UNIT_TEST(ShouldStoreValue) { TPromise<int> promise = NewPromise<int>(); promise.SetValue(123); UNIT_ASSERT(promise.HasValue()); UNIT_ASSERT_EQUAL(promise.GetValue(), 123); - + TFuture<int> future = promise.GetFuture(); UNIT_ASSERT(future.HasValue()); UNIT_ASSERT_EQUAL(future.GetValue(), 123); - + future = MakeFuture(345); UNIT_ASSERT(future.HasValue()); UNIT_ASSERT_EQUAL(future.GetValue(), 345); } - + Y_UNIT_TEST(ShouldStoreValueVoid) { TPromise<void> promise = NewPromise(); promise.SetValue(); UNIT_ASSERT(promise.HasValue()); - + TFuture<void> future = promise.GetFuture(); UNIT_ASSERT(future.HasValue()); - + future = MakeFuture(); UNIT_ASSERT(future.HasValue()); } - + struct TTestCallback { int Value; - + TTestCallback(int value) : Value(value) { } - + void Callback(const TFuture<int>& future) { Value += future.GetValue(); } - + int Func(const TFuture<int>& future) { return (Value += future.GetValue()); } - + void VoidFunc(const TFuture<int>& future) { future.GetValue(); } - + TFuture<int> FutureFunc(const TFuture<int>& future) { return MakeFuture(Value += future.GetValue()); } - + TPromise<void> Signal = NewPromise(); TFuture<void> FutureVoidFunc(const TFuture<int>& future) { future.GetValue(); return Signal; } }; - + Y_UNIT_TEST(ShouldInvokeCallback) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture() .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 456); UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - + Y_UNIT_TEST(ShouldApplyFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.Func(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - + Y_UNIT_TEST(ShouldApplyVoidFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<void> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.VoidFunc(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldApplyFutureFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.FutureFunc(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - + Y_UNIT_TEST(ShouldApplyFutureVoidFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<void> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.FutureVoidFunc(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT(!future.HasValue()); - + callback.Signal.SetValue(); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldIgnoreResultIfAsked) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 42); } - + class TCustomException: public yexception { }; - + Y_UNIT_TEST(ShouldRethrowException) { TPromise<int> promise = NewPromise<int>(); try { @@ -238,7 +238,7 @@ namespace { UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException); UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException); } - + Y_UNIT_TEST(ShouldRethrowCallbackException) { TPromise<int> promise = NewPromise<int>(); TFuture<int> future = promise.GetFuture(); @@ -263,21 +263,21 @@ namespace { Y_UNIT_TEST(ShouldWaitExceptionOrAll) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); - + TFuture<void> future = WaitExceptionOrAll(promise1, promise2); UNIT_ASSERT(!future.HasValue()); - + promise1.SetValue(); UNIT_ASSERT(!future.HasValue()); - + promise2.SetValue(); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldWaitExceptionOrAllVector) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); - + TVector<TFuture<void>> promises; promises.push_back(promise1); promises.push_back(promise2); @@ -403,21 +403,21 @@ namespace { TFuture<void> future = WaitAny(promise1, promise2); UNIT_ASSERT(!future.HasValue()); - + promise1.SetValue(); UNIT_ASSERT(future.HasValue()); - + promise2.SetValue(); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) { // compileability test struct TRec { explicit TRec(int) { } }; - + auto promise = NewPromise<TRec>(); promise.SetValue(TRec(1)); @@ -425,22 +425,22 @@ namespace { const auto& rec = future.GetValue(); Y_UNUSED(rec); } - + Y_UNIT_TEST(ShouldStoreMovableTypes) { // compileability test struct TRec : TMoveOnly { explicit TRec(int) { } }; - + auto promise = NewPromise<TRec>(); promise.SetValue(TRec(1)); - + auto future = MakeFuture(TRec(1)); const auto& rec = future.GetValue(); Y_UNUSED(rec); } - + Y_UNIT_TEST(ShouldMoveMovableTypes) { // compileability test struct TRec : TMoveOnly { diff --git a/library/cpp/threading/future/perf/main.cpp b/library/cpp/threading/future/perf/main.cpp index 71e9e293de..5a0690af47 100644 --- a/library/cpp/threading/future/perf/main.cpp +++ b/library/cpp/threading/future/perf/main.cpp @@ -1,50 +1,50 @@ #include <library/cpp/testing/benchmark/bench.h> #include <library/cpp/threading/future/future.h> - + #include <util/generic/string.h> -#include <util/generic/xrange.h> - -using namespace NThreading; - -template <typename T> +#include <util/generic/xrange.h> + +using namespace NThreading; + +template <typename T> void TestAllocPromise(const NBench::NCpu::TParams& iface) { for (const auto it : xrange(iface.Iterations())) { - Y_UNUSED(it); - Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>()); - } -} - -template <typename T> + Y_UNUSED(it); + Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>()); + } +} + +template <typename T> TPromise<T> SetPromise(T value) { - auto promise = NewPromise<T>(); - promise.SetValue(value); - return promise; -} - -template <typename T> + auto promise = NewPromise<T>(); + promise.SetValue(value); + return promise; +} + +template <typename T> void TestSetPromise(const NBench::NCpu::TParams& iface, T value) { for (const auto it : xrange(iface.Iterations())) { - Y_UNUSED(it); - Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value)); - } -} - + Y_UNUSED(it); + Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value)); + } +} + Y_CPU_BENCHMARK(AllocPromiseVoid, iface) { - TestAllocPromise<void>(iface); -} - + TestAllocPromise<void>(iface); +} + Y_CPU_BENCHMARK(AllocPromiseUI64, iface) { - TestAllocPromise<ui64>(iface); -} - + TestAllocPromise<ui64>(iface); +} + Y_CPU_BENCHMARK(AllocPromiseStroka, iface) { TestAllocPromise<TString>(iface); -} - +} + Y_CPU_BENCHMARK(SetPromiseUI64, iface) { - TestSetPromise<ui64>(iface, 1234567890ull); -} - + TestSetPromise<ui64>(iface, 1234567890ull); +} + Y_CPU_BENCHMARK(SetPromiseStroka, iface) { TestSetPromise<TString>(iface, "test test test"); -} +} diff --git a/library/cpp/threading/future/perf/ya.make b/library/cpp/threading/future/perf/ya.make index b56e66a838..943d585d4b 100644 --- a/library/cpp/threading/future/perf/ya.make +++ b/library/cpp/threading/future/perf/ya.make @@ -1,16 +1,16 @@ Y_BENCHMARK(library-threading-future-perf) - + OWNER( g:rtmr ishfb ) - -SRCS( - main.cpp -) - -PEERDIR( + +SRCS( + main.cpp +) + +PEERDIR( library/cpp/threading/future -) - -END() +) + +END() diff --git a/library/cpp/threading/future/ut/ya.make b/library/cpp/threading/future/ut/ya.make index c4d5a7e1d2..566b622370 100644 --- a/library/cpp/threading/future/ut/ya.make +++ b/library/cpp/threading/future/ut/ya.make @@ -6,7 +6,7 @@ OWNER( ) SRCS( - async_ut.cpp + async_ut.cpp future_ut.cpp legacy_future_ut.cpp ) diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h index f778cf7fd5..2753d5446c 100644 --- a/library/cpp/threading/future/wait/wait-inl.h +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -1,16 +1,16 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) #error "you should never include wait-inl.h directly" #endif // INCLUDE_FUTURE_INL_H - + namespace NThreading { namespace NImpl { template <typename TContainer> TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) { TVector<TFuture<void>> voidFutures; voidFutures.reserve(futures.size()); - + for (const auto& future: futures) { voidFutures.push_back(future.IgnoreResult()); } @@ -18,7 +18,7 @@ namespace NThreading { return voidFutures; } } - + template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) { return WaitAll(NImpl::ToVoidFutures(futures)); @@ -27,10 +27,10 @@ namespace NThreading { template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) { return WaitExceptionOrAll(NImpl::ToVoidFutures(futures)); - } + } template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) { return WaitAny(NImpl::ToVoidFutures(futures)); - } -} + } +} diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp index 5d040985f2..a173833a7f 100644 --- a/library/cpp/threading/future/wait/wait.cpp +++ b/library/cpp/threading/future/wait/wait.cpp @@ -1,5 +1,5 @@ #include "wait.h" - + #include "wait_group.h" #include "wait_policy.h" @@ -9,16 +9,16 @@ namespace NThreading { TFuture<void> WaitGeneric(const TFuture<void>& f1) { return f1; } - + template <class WaitPolicy> TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) { TWaitGroup<WaitPolicy> wg; - + wg.Add(f1).Add(f2); - + return std::move(wg).Finish(); } - + template <class WaitPolicy> TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) { if (futures.empty()) { @@ -32,13 +32,13 @@ namespace NThreading { for (const auto& fut : futures) { wg.Add(fut); } - + return std::move(wg).Finish(); } } - + //////////////////////////////////////////////////////////////////////////////// - + TFuture<void> WaitAll(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TAll>(f1); } @@ -57,26 +57,26 @@ namespace NThreading { TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1); } - + TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2); } - + TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures); - } + } //////////////////////////////////////////////////////////////////////////////// - + TFuture<void> WaitAny(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TAny>(f1); } - + TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) { return WaitGeneric<TWaitPolicy::TAny>(f1, f2); } - + TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) { return WaitGeneric<TWaitPolicy::TAny>(futures); - } -} + } +} diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index bfccede548..6ff7d57baa 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -1,10 +1,10 @@ -#pragma once - +#pragma once + #include "fwd.h" #include <library/cpp/threading/future/core/future.h> #include <library/cpp/threading/future/wait/wait_group.h> - + #include <util/generic/array_ref.h> namespace NThreading { @@ -27,7 +27,7 @@ namespace NThreading { [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures); template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures); - + // waits for any future [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1); [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2); @@ -35,7 +35,7 @@ namespace NThreading { template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures); } - -#define INCLUDE_FUTURE_INL_H + +#define INCLUDE_FUTURE_INL_H #include "wait-inl.h" -#undef INCLUDE_FUTURE_INL_H +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make index 3a0db18662..6591031f46 100644 --- a/library/cpp/threading/future/ya.make +++ b/library/cpp/threading/future/ya.make @@ -4,7 +4,7 @@ OWNER( LIBRARY() -SRCS( +SRCS( async.cpp core/future.cpp core/fwd.cpp @@ -13,8 +13,8 @@ SRCS( wait/wait.cpp wait/wait_group.cpp wait/wait_policy.cpp -) - +) + END() RECURSE_FOR_TESTS( diff --git a/library/cpp/threading/skip_list/compare.h b/library/cpp/threading/skip_list/compare.h index c63003d67f..ac98b3e1ce 100644 --- a/library/cpp/threading/skip_list/compare.h +++ b/library/cpp/threading/skip_list/compare.h @@ -1,14 +1,14 @@ -#pragma once - -#include <util/generic/typetraits.h> -#include <util/str_stl.h> - -namespace NThreading { +#pragma once + +#include <util/generic/typetraits.h> +#include <util/str_stl.h> + +namespace NThreading { namespace NImpl { Y_HAS_MEMBER(compare); Y_HAS_MEMBER(Compare); - - template <typename T> + + template <typename T> inline int CompareImpl(const T& l, const T& r) { if (l < r) { return -1; @@ -17,8 +17,8 @@ namespace NThreading { } else { return 0; } - } - + } + template <bool val> struct TSmallCompareSelector { template <typename T> @@ -26,7 +26,7 @@ namespace NThreading { return CompareImpl(l, r); } }; - + template <> struct TSmallCompareSelector<true> { template <typename T> @@ -34,7 +34,7 @@ namespace NThreading { return l.compare(r); } }; - + template <bool val> struct TBigCompareSelector { template <typename T> @@ -51,15 +51,15 @@ namespace NThreading { } }; - template <typename T> + template <typename T> struct TCompareSelector: public TBigCompareSelector<THasCompare<T>::value> { }; } - + //////////////////////////////////////////////////////////////////////////////// // Generic compare function - template <typename T> + template <typename T> inline int Compare(const T& l, const T& r) { return NImpl::TCompareSelector<T>::Compare(l, r); } @@ -72,6 +72,6 @@ namespace NThreading { inline int operator()(const T& l, const T& r) const { return Compare(l, r); } - }; - -} + }; + +} diff --git a/library/cpp/threading/skip_list/perf/main.cpp b/library/cpp/threading/skip_list/perf/main.cpp index 4e8d5b4082..4ad52049e7 100644 --- a/library/cpp/threading/skip_list/perf/main.cpp +++ b/library/cpp/threading/skip_list/perf/main.cpp @@ -1,56 +1,56 @@ #include <library/cpp/threading/skip_list/skiplist.h> - + #include <library/cpp/getopt/small/last_getopt.h> #include <library/cpp/charset/ci_string.h> -#include <util/datetime/base.h> -#include <util/generic/map.h> -#include <util/generic/vector.h> +#include <util/datetime/base.h> +#include <util/generic/map.h> +#include <util/generic/vector.h> #include <functional> -#include <util/memory/pool.h> -#include <util/random/random.h> -#include <util/string/join.h> -#include <util/system/mutex.h> -#include <util/system/thread.h> - -namespace { +#include <util/memory/pool.h> +#include <util/random/random.h> +#include <util/string/join.h> +#include <util/system/mutex.h> +#include <util/system/thread.h> + +namespace { using namespace NThreading; - + //////////////////////////////////////////////////////////////////////////////// - + IOutputStream& LogInfo() { return Cerr << TInstant::Now() << " INFO: "; } - + IOutputStream& LogError() { return Cerr << TInstant::Now() << " ERROR: "; } - + //////////////////////////////////////////////////////////////////////////////// - + struct TListItem { TStringBuf Key; TStringBuf Value; - + TListItem(const TStringBuf& key, const TStringBuf& value) : Key(key) , Value(value) { } - + int Compare(const TListItem& other) const { return Key.compare(other.Key); } }; - + using TListType = TSkipList<TListItem>; - + //////////////////////////////////////////////////////////////////////////////// - + class TRandomData { private: TVector<char> Buffer; - + public: TRandomData() : Buffer(1024 * 1024) @@ -59,34 +59,34 @@ namespace { Buffer[i] = RandomNumber<char>(); } } - + TStringBuf GetString(size_t len) const { size_t start = RandomNumber(Buffer.size() - len); return TStringBuf(&Buffer[start], len); - } - + } + TStringBuf GetString(size_t min, size_t max) const { return GetString(min + RandomNumber(max - min)); } }; - + //////////////////////////////////////////////////////////////////////////////// - + class TWorkerThread: public ISimpleThread { private: std::function<void()> Func; TDuration Time; - + public: TWorkerThread(std::function<void()> func) : Func(func) { } - + TDuration GetTime() const { return Time; } - + private: void* ThreadProc() noexcept override { TInstant started = TInstant::Now(); @@ -95,33 +95,33 @@ namespace { return nullptr; } }; - + inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) { TAutoPtr<TWorkerThread> thread = new TWorkerThread(func); thread->Start(); return thread; - } - + } + //////////////////////////////////////////////////////////////////////////////// - + typedef std::function<void()> TTestFunc; - + struct TTest { TString Name; TTestFunc Func; - + TTest() { } - + TTest(const TString& name, const TTestFunc& func) : Name(name) , Func(func) { } }; - + //////////////////////////////////////////////////////////////////////////////// - + class TTestSuite { private: size_t Iterations = 1000000; @@ -130,72 +130,72 @@ namespace { size_t NumReaders = 4; size_t NumWriters = 1; size_t BatchSize = 20; - + TMemoryPool MemoryPool; TListType List; TMutex Mutex; TRandomData Random; - + TMap<TCiString, TTest> AllTests; TVector<TTest> Tests; - + public: TTestSuite() : MemoryPool(64 * 1024) , List(MemoryPool) { } - + bool Init(int argc, const char* argv[]) { TVector<TString> tests; try { NLastGetopt::TOpts opts; opts.AddHelpOption(); - + #define OPTION(opt, x) \ opts.AddLongOption(opt, #x) \ .Optional() \ .DefaultValue(ToString(x)) \ .StoreResult(&x) // end of OPTION - + OPTION('i', Iterations); OPTION('k', KeyLen); OPTION('v', ValueLen); OPTION('r', NumReaders); OPTION('w', NumWriters); OPTION('b', BatchSize); - -#undef OPTION - + +#undef OPTION + NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv); for (const auto& opt : opts.Opts_) { const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true); if (r) { LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl; } - } + } tests = optsRes.GetFreeArgs(); } catch (...) { LogError() << CurrentExceptionMessage() << Endl; return false; - } - -#define TEST(type) \ + } + +#define TEST(type) \ AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST - + TEST(Clear); TEST(InsertRandom); TEST(InsertSequential); TEST(InsertSequentialSimple); TEST(LookupRandom); TEST(Concurrent); - -#undef TEST - + +#undef TEST + if (tests.empty()) { LogError() << "no tests specified, choose from: " << PrintTests() << Endl; - return false; - } + return false; + } for (size_t i = 0; i < tests.size(); ++i) { if (!AllTests.contains(tests[i])) { @@ -206,13 +206,13 @@ namespace { } return true; - } - + } + void Run() { -#if !defined(NDEBUG) +#if !defined(NDEBUG) LogInfo() << "*** DEBUG build! ***" << Endl; -#endif - +#endif + for (const TTest& test : Tests) { LogInfo() << "Starting test " << test.Name << Endl; @@ -224,7 +224,7 @@ namespace { << " failed: " << CurrentExceptionMessage() << Endl; } - + LogInfo() << "List size = " << List.GetSize() << Endl; TDuration duration = TInstant::Now() - started; @@ -234,31 +234,31 @@ namespace { << Endl; LogInfo() << "Finished test " << test.Name << Endl; } - } - + } + private: void AddTest(const char* name, TTestFunc func) { AllTests[name] = TTest(name, func); } - + TString PrintTests() const { TVector<TString> names; for (const auto& it : AllTests) { names.push_back(it.first); } return JoinSeq(", ", names); - } - + } + void TEST_Clear() { List.Clear(); } - + void TEST_InsertRandom() { for (size_t i = 0; i < Iterations; ++i) { List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); } - } - + } + void TEST_InsertSequential() { TString key; for (size_t i = 0; i < Iterations;) { @@ -269,9 +269,9 @@ namespace { key.append((char)j); List.Insert(TListItem(key, Random.GetString(ValueLen))); } - } - } - + } + } + void TEST_InsertSequentialSimple() { for (size_t i = 0; i < Iterations; ++i) { List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); @@ -282,11 +282,11 @@ namespace { for (size_t i = 0; i < Iterations; ++i) { List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf())); } - } - + } + void TEST_Concurrent() { LogInfo() << "starting producers..." << Endl; - + TVector<TAutoPtr<TWorkerThread>> producers(NumWriters); for (size_t i1 = 0; i1 < producers.size(); ++i1) { producers[i1] = StartThread([&] { @@ -304,9 +304,9 @@ namespace { << Endl; }); } - + LogInfo() << "starting consumers..." << Endl; - + TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders); for (size_t i1 = 0; i1 < consumers.size(); ++i1) { consumers[i1] = StartThread([&] { @@ -321,42 +321,42 @@ namespace { << Endl; }); } - + LogInfo() << "wait for producers..." << Endl; - + TDuration producerTime; for (size_t i = 0; i < producers.size(); ++i) { producers[i]->Join(); producerTime += producers[i]->GetTime(); } - + LogInfo() << "wait for consumers..." << Endl; - + TDuration consumerTime; for (size_t i = 0; i < consumers.size(); ++i) { consumers[i]->Join(); consumerTime += consumers[i]->GetTime(); } - + LogInfo() << "average producer time: " << producerTime.SecondsFloat() / producers.size() << " seconds" << Endl; - + LogInfo() << "average consumer time: " << consumerTime.SecondsFloat() / consumers.size() << " seconds" << Endl; } }; - + } - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + int main(int argc, const char* argv[]) { - TTestSuite suite; - if (!suite.Init(argc, argv)) { - return -1; - } - suite.Run(); - return 0; -} + TTestSuite suite; + if (!suite.Init(argc, argv)) { + return -1; + } + suite.Run(); + return 0; +} diff --git a/library/cpp/threading/skip_list/perf/ya.make b/library/cpp/threading/skip_list/perf/ya.make index d64a58a60e..01bfafa404 100644 --- a/library/cpp/threading/skip_list/perf/ya.make +++ b/library/cpp/threading/skip_list/perf/ya.make @@ -1,15 +1,15 @@ -PROGRAM(skiplist-perf) - +PROGRAM(skiplist-perf) + OWNER(g:rtmr) - -PEERDIR( + +PEERDIR( library/cpp/charset library/cpp/getopt/small library/cpp/threading/skip_list -) - -SRCS( - main.cpp -) - -END() +) + +SRCS( + main.cpp +) + +END() diff --git a/library/cpp/threading/skip_list/skiplist.cpp b/library/cpp/threading/skip_list/skiplist.cpp index 386b9546d4..c6e98816fb 100644 --- a/library/cpp/threading/skip_list/skiplist.cpp +++ b/library/cpp/threading/skip_list/skiplist.cpp @@ -1 +1 @@ -#include "skiplist.h" +#include "skiplist.h" diff --git a/library/cpp/threading/skip_list/skiplist.h b/library/cpp/threading/skip_list/skiplist.h index 054a1b10b9..914a7c6ee7 100644 --- a/library/cpp/threading/skip_list/skiplist.h +++ b/library/cpp/threading/skip_list/skiplist.h @@ -1,69 +1,69 @@ -#pragma once - -#include "compare.h" - -#include <util/generic/algorithm.h> -#include <util/generic/noncopyable.h> -#include <util/generic/typetraits.h> -#include <util/memory/pool.h> -#include <util/random/random.h> -#include <util/system/atomic.h> - -namespace NThreading { +#pragma once + +#include "compare.h" + +#include <util/generic/algorithm.h> +#include <util/generic/noncopyable.h> +#include <util/generic/typetraits.h> +#include <util/memory/pool.h> +#include <util/random/random.h> +#include <util/system/atomic.h> + +namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - + class TNopCounter { protected: template <typename T> void OnInsert(const T&) { } - + template <typename T> void OnUpdate(const T&) { } - + void Reset() { } }; - + //////////////////////////////////////////////////////////////////////////////// - + class TSizeCounter { - private: + private: size_t Size; - - public: + + public: TSizeCounter() : Size(0) - { - } - + { + } + size_t GetSize() const { return Size; - } - + } + protected: template <typename T> void OnInsert(const T&) { ++Size; - } - + } + template <typename T> void OnUpdate(const T&) { - } - + } + void Reset() { Size = 0; - } - }; - + } + }; + //////////////////////////////////////////////////////////////////////////////// // Append-only concurrent skip-list // // Readers do not require any synchronization. // Writers should be externally synchronized. // Nodes will be allocated using TMemoryPool instance. - + template < typename T, typename TComparer = TCompare<T>, @@ -104,41 +104,41 @@ namespace NThreading { } }; - public: + public: class TIterator { private: const TSkipList* List; const TNode* Node; - + public: TIterator() : List(nullptr) , Node(nullptr) { } - + TIterator(const TSkipList* list, const TNode* node) : List(list) , Node(node) { } - + TIterator(const TIterator& other) : List(other.List) , Node(other.Node) { } - + TIterator& operator=(const TIterator& other) { List = other.List; Node = other.Node; return *this; } - + void Next() { Node = Node ? Node->GetNext(0) : nullptr; - } - + } + // much less efficient than Next as our list is single-linked void Prev() { if (Node) { @@ -146,34 +146,34 @@ namespace NThreading { Node = (node != List->Head ? node : nullptr); } } - + void Reset() { Node = nullptr; } - + bool IsValid() const { return Node != nullptr; } - + const T& GetValue() const { Y_ASSERT(IsValid()); return Node->GetValue(); } }; - + private: TAllocator& Allocator; TComparer Comparer; - + TNode* Head; TAtomic Height; TCounter Counter; - + TNode* Prev[MaxHeight]; - + template <typename TValue> using TComparerReturnType = std::invoke_result_t<TComparer, const T&, const TValue&>; - + public: TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer()) : Allocator(allocator) @@ -181,28 +181,28 @@ namespace NThreading { { Init(); } - + ~TSkipList() { CallDtors(); } - + void Clear() { CallDtors(); Allocator.ClearKeepFirstChunk(); Init(); - } - + } + bool Insert(T value) { TNode* node = PrepareInsert(value); if (Y_UNLIKELY(node && Compare(node, value) == 0)) { // we do not allow duplicates return false; - } + } node = DoInsert(std::move(value)); TCounter::OnInsert(node->GetValue()); return true; - } - + } + template <typename TInsertAction, typename TUpdateAction> bool Insert(const T& value, TInsertAction insert, TUpdateAction update) { TNode* node = PrepareInsert(value); @@ -218,27 +218,27 @@ namespace NThreading { TCounter::OnInsert(node->GetValue()); return true; } - + template <typename TValue> bool Contains(const TValue& value) const { TNode* node = FindGreaterThanOrEqual(value); return node && Compare(node, value) == 0; } - + TIterator SeekToFirst() const { return TIterator(this, FindFirst()); } - + TIterator SeekToLast() const { TNode* last = FindLast(); return TIterator(this, last != Head ? last : nullptr); } - + template <typename TValue> TIterator SeekTo(const TValue& value) const { return TIterator(this, FindGreaterThanOrEqual(value)); - } - + } + private: static int RandomHeight() { int height = 1; @@ -247,7 +247,7 @@ namespace NThreading { } return height; } - + void Init() { Head = AllocateRootNode(); Height = 1; @@ -256,8 +256,8 @@ namespace NThreading { for (int i = 0; i < MaxHeight; ++i) { Prev[i] = Head; } - } - + } + void CallDtors() { if (!TTypeTraits<T>::IsPod) { // we should explicitly call destructors for our nodes @@ -267,56 +267,56 @@ namespace NThreading { node->~TNode(); node = next; } - } - } - + } + } + TNode* AllocateRootNode() { size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight; void* buffer = Allocator.Allocate(size); memset(buffer, 0, size); return static_cast<TNode*>(buffer); } - + TNode* AllocateNode(T&& value, int height) { size_t size = sizeof(TNode) + sizeof(TNode*) * height; void* buffer = Allocator.Allocate(size); memset(buffer, 0, size); return new (buffer) TNode(std::move(value)); } - + TNode* FindFirst() const { return Head->GetNext(0); } - + TNode* FindLast() const { TNode* node = Head; int height = AtomicGet(Height) - 1; - + while (true) { TNode* next = node->GetNext(height); if (next) { node = next; continue; } - + if (height) { --height; } else { return node; } - } - } - + } + } + template <typename TValue> TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const { return Comparer(node->GetValue(), value); } - + template <typename TValue> TNode* FindLessThan(const TValue& value, TNode** links) const { TNode* node = Head; int height = AtomicGet(Height) - 1; - + TNode* prev = nullptr; while (true) { TNode* next = node->GetNext(height); @@ -326,27 +326,27 @@ namespace NThreading { node = next; continue; } - } - + } + if (links) { // collect links from upper levels links[height] = node; } - + if (height) { prev = next; --height; } else { return node; } - } - } - + } + } + template <typename TValue> TNode* FindGreaterThanOrEqual(const TValue& value) const { TNode* node = Head; int height = AtomicGet(Height) - 1; - + TNode* prev = nullptr; while (true) { TNode* next = node->GetNext(height); @@ -359,29 +359,29 @@ namespace NThreading { if (cmp == 0) { return next; } - } + } if (height) { prev = next; --height; } else { - return next; - } - } + return next; + } + } } - + TNode* PrepareInsert(const T& value) { TNode* prev = Prev[0]; TNode* next = prev->GetNext(0); if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) { // avoid seek in case of sequential insert - } else { + } else { prev = FindLessThan(value, Prev); next = prev->GetNext(0); - } + } return next; - } - + } + TNode* DoInsert(T&& value) { // choose level to place new node int currentHeight = AtomicGet(Height); @@ -392,17 +392,17 @@ namespace NThreading { Prev[i] = Head; } AtomicSet(Height, height); - } - + } + TNode* node = AllocateNode(std::move(value), height); node->Link(height, Prev); - + // keep last inserted node to optimize sequential inserts for (int i = 0; i < height; i++) { Prev[i] = node; } return node; - } + } }; - + } diff --git a/library/cpp/threading/skip_list/skiplist_ut.cpp b/library/cpp/threading/skip_list/skiplist_ut.cpp index e7d0b62873..52fcffda66 100644 --- a/library/cpp/threading/skip_list/skiplist_ut.cpp +++ b/library/cpp/threading/skip_list/skiplist_ut.cpp @@ -1,91 +1,91 @@ -#include "skiplist.h" - +#include "skiplist.h" + #include <library/cpp/testing/unittest/registar.h> - -namespace NThreading { + +namespace NThreading { namespace { struct TTestObject { static size_t Count; int Tag; - + TTestObject(int tag) : Tag(tag) { ++Count; } - + TTestObject(const TTestObject& other) : Tag(other.Tag) { ++Count; } - + ~TTestObject() { --Count; } - + bool operator<(const TTestObject& other) const { return Tag < other.Tag; } }; - + size_t TTestObject::Count = 0; - - } - + + } + //////////////////////////////////////////////////////////////////////////////// - + Y_UNIT_TEST_SUITE(TSkipListTest) { Y_UNIT_TEST(ShouldBeEmptyAfterCreation) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT_EQUAL(list.GetSize(), 0); } - + Y_UNIT_TEST(ShouldAllowInsertion) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT(list.Insert(12345678)); UNIT_ASSERT_EQUAL(list.GetSize(), 1); } - + Y_UNIT_TEST(ShouldNotAllowDuplicates) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT(list.Insert(12345678)); UNIT_ASSERT_EQUAL(list.GetSize(), 1); - + UNIT_ASSERT(!list.Insert(12345678)); UNIT_ASSERT_EQUAL(list.GetSize(), 1); } - + Y_UNIT_TEST(ShouldContainInsertedItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT(list.Insert(12345678)); UNIT_ASSERT(list.Contains(12345678)); } - + Y_UNIT_TEST(ShouldNotContainNotInsertedItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT(list.Insert(12345678)); UNIT_ASSERT(!list.Contains(87654321)); } - + Y_UNIT_TEST(ShouldIterateAllItems) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + for (int i = 8; i > 0; --i) { UNIT_ASSERT(list.Insert(i)); } - + TSkipList<int>::TIterator it = list.SeekToFirst(); for (int i = 1; i <= 8; ++i) { UNIT_ASSERT(it.IsValid()); @@ -94,15 +94,15 @@ namespace NThreading { } UNIT_ASSERT(!it.IsValid()); } - + Y_UNIT_TEST(ShouldIterateAllItemsInReverseDirection) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + for (int i = 8; i > 0; --i) { UNIT_ASSERT(list.Insert(i)); } - + TSkipList<int>::TIterator it = list.SeekToLast(); for (int i = 8; i > 0; --i) { UNIT_ASSERT(it.IsValid()); @@ -111,75 +111,75 @@ namespace NThreading { } UNIT_ASSERT(!it.IsValid()); } - + Y_UNIT_TEST(ShouldSeekToFirstItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + for (int i = 1; i < 10; ++i) { UNIT_ASSERT(list.Insert(i)); } - + TSkipList<int>::TIterator it = list.SeekToFirst(); UNIT_ASSERT(it.IsValid()); UNIT_ASSERT_EQUAL(it.GetValue(), 1); } - + Y_UNIT_TEST(ShouldSeekToLastItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + for (int i = 1; i < 10; ++i) { UNIT_ASSERT(list.Insert(i)); } - + TSkipList<int>::TIterator it = list.SeekToLast(); UNIT_ASSERT(it.IsValid()); UNIT_ASSERT_EQUAL(it.GetValue(), 9); } - + Y_UNIT_TEST(ShouldSeekToExistingItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT(list.Insert(12345678)); - + TSkipList<int>::TIterator it = list.SeekTo(12345678); UNIT_ASSERT(it.IsValid()); } - + Y_UNIT_TEST(ShouldSeekAfterMissedItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); - + UNIT_ASSERT(list.Insert(100)); UNIT_ASSERT(list.Insert(300)); - + TSkipList<int>::TIterator it = list.SeekTo(200); UNIT_ASSERT(it.IsValid()); UNIT_ASSERT_EQUAL(it.GetValue(), 300); - + it.Prev(); UNIT_ASSERT(it.IsValid()); UNIT_ASSERT_EQUAL(it.GetValue(), 100); } - + Y_UNIT_TEST(ShouldCallDtorsOfNonPodTypes) { UNIT_ASSERT(!TTypeTraits<TTestObject>::IsPod); UNIT_ASSERT_EQUAL(TTestObject::Count, 0); - + { TMemoryPool pool(1024); TSkipList<TTestObject> list(pool); - + UNIT_ASSERT(list.Insert(TTestObject(1))); UNIT_ASSERT(list.Insert(TTestObject(2))); - + UNIT_ASSERT_EQUAL(TTestObject::Count, 2); } UNIT_ASSERT_EQUAL(TTestObject::Count, 0); } - } - + } + } diff --git a/library/cpp/threading/skip_list/ut/ya.make b/library/cpp/threading/skip_list/ut/ya.make index ae07423e71..704a31e9a2 100644 --- a/library/cpp/threading/skip_list/ut/ya.make +++ b/library/cpp/threading/skip_list/ut/ya.make @@ -1,9 +1,9 @@ UNITTEST_FOR(library/cpp/threading/skip_list) - + OWNER(g:rtmr) - -SRCS( - skiplist_ut.cpp -) - -END() + +SRCS( + skiplist_ut.cpp +) + +END() diff --git a/library/cpp/threading/skip_list/ya.make b/library/cpp/threading/skip_list/ya.make index 923fcb3566..d338aeae2b 100644 --- a/library/cpp/threading/skip_list/ya.make +++ b/library/cpp/threading/skip_list/ya.make @@ -1,9 +1,9 @@ -LIBRARY() - +LIBRARY() + OWNER(g:rtmr) - -SRCS( - skiplist.cpp -) - -END() + +SRCS( + skiplist.cpp +) + +END() |