diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/chunk_queue | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/chunk_queue')
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.cpp | 1 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 568 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/queue_ut.cpp | 205 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/readme.txt | 60 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/ut/ya.make | 9 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/ya.make | 9 |
6 files changed, 852 insertions, 0 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.cpp b/library/cpp/threading/chunk_queue/queue.cpp new file mode 100644 index 0000000000..4ebd3f3205 --- /dev/null +++ b/library/cpp/threading/chunk_queue/queue.cpp @@ -0,0 +1 @@ +#include "queue.h" diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h new file mode 100644 index 0000000000..55859601a1 --- /dev/null +++ b/library/cpp/threading/chunk_queue/queue.h @@ -0,0 +1,568 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/typetraits.h> +#include <util/generic/vector.h> +#include <util/generic/ylimits.h> +#include <util/system/atomic.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> +#include <util/system/yassert.h> + +#include <type_traits> +#include <utility> + +namespace NThreading { +//////////////////////////////////////////////////////////////////////////////// +// Platform helpers + +#if !defined(PLATFORM_CACHE_LINE) +#define PLATFORM_CACHE_LINE 64 +#endif + +#if !defined(PLATFORM_PAGE_SIZE) +#define PLATFORM_PAGE_SIZE 4 * 1024 +#endif + + template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> + struct TPadded: public T { + char Pad[PadSize - sizeof(T) % PadSize]; + + TPadded() { + static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); + Y_UNUSED(Pad); + } + + template<typename... Args> + TPadded(Args&&... args) + : T(std::forward<Args>(args)...) + { + static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); + Y_UNUSED(Pad); + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Type helpers + + namespace NImpl { + template <typename T> + struct TPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + *ptr = value; + } + + static T Read(T* ptr) { + return *ptr; + } + + static void Destroy(T* ptr) { + Y_UNUSED(ptr); + } + }; + + template <typename T> + struct TNonPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + new (ptr) T(std::forward<TT>(value)); + } + + static T Read(T* ptr) { + return std::move(*ptr); + } + + static void Destroy(T* ptr) { + (void)ptr; /* Make MSVC happy. */ + ptr->~T(); + } + }; + + template <typename T> + using TTypeHelper = std::conditional_t< + TTypeTraits<T>::IsPod, + TPodTypeHelper<T>, + TNonPodTypeHelper<T>>; + + } + + //////////////////////////////////////////////////////////////////////////////// + // One producer/one consumer chunked queue. + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TOneOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TChunk; + + struct TChunkHeader { + size_t Count = 0; + TChunk* Next = nullptr; + }; + + struct TChunk: public TChunkHeader { + static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); + + char Entries[MaxCount * sizeof(T)]; + + TChunk() { + Y_UNUSED(Entries); // uninitialized + } + + ~TChunk() { + for (size_t i = 0; i < this->Count; ++i) { + TTypeHelper::Destroy(GetPtr(i)); + } + } + + T* GetPtr(size_t i) { + return (T*)Entries + i; + } + }; + + struct TWriterState { + TChunk* Chunk = nullptr; + }; + + struct TReaderState { + TChunk* Chunk = nullptr; + size_t Count = 0; + }; + + private: + TPadded<TWriterState> Writer; + TPadded<TReaderState> Reader; + + public: + using TItem = T; + + TOneOneQueue() { + Writer.Chunk = Reader.Chunk = new TChunk(); + } + + ~TOneOneQueue() { + DeleteChunks(Reader.Chunk); + } + + template <typename TT> + void Enqueue(TT&& value) { + T* ptr = PrepareWrite(); + Y_ASSERT(ptr); + TTypeHelper::Write(ptr, std::forward<TT>(value)); + CompleteWrite(); + } + + bool Dequeue(T& value) { + if (T* ptr = PrepareRead()) { + value = TTypeHelper::Read(ptr); + CompleteRead(); + return true; + } + return false; + } + + bool IsEmpty() { + return !PrepareRead(); + } + + protected: + T* PrepareWrite() { + TChunk* chunk = Writer.Chunk; + Y_ASSERT(chunk && !chunk->Next); + + if (chunk->Count != TChunk::MaxCount) { + return chunk->GetPtr(chunk->Count); + } + + chunk = new TChunk(); + AtomicSet(Writer.Chunk->Next, chunk); + Writer.Chunk = chunk; + return chunk->GetPtr(0); + } + + void CompleteWrite() { + AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); + } + + T* PrepareRead() { + TChunk* chunk = Reader.Chunk; + Y_ASSERT(chunk); + + for (;;) { + size_t writerCount = AtomicGet(chunk->Count); + if (Reader.Count != writerCount) { + return chunk->GetPtr(Reader.Count); + } + + if (writerCount != TChunk::MaxCount) { + return nullptr; + } + + chunk = AtomicGet(chunk->Next); + if (!chunk) { + return nullptr; + } + + delete Reader.Chunk; + Reader.Chunk = chunk; + Reader.Count = 0; + } + } + + void CompleteRead() { + ++Reader.Count; + } + + private: + static void DeleteChunks(TChunk* chunk) { + while (chunk) { + TChunk* next = chunk->Next; + delete chunk; + chunk = next; + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Provides FIFO guaranties for each producer. + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TManyOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TEntry { + T Value; + ui64 Tag; + }; + + struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { + TAtomic WriteLock = 0; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; + using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; + using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; + }; + + private: + union { + TAtomic WriteTag = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + ui64 tag = NextTag(); + while (!TryEnqueue(std::forward<TT>(value), tag)) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t index = 0; + if (TEntry* entry = PrepareRead(index)) { + value = TTypeHelper::Read(&entry->Value); + Queues[index].CompleteRead(); + return true; + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; + } + + private: + ui64 NextTag() { + // TODO: can we avoid synchronization here? it costs 1.5x performance penalty + // return GetCycleCount(); + return AtomicIncrement(WriteTag); + } + + template <typename TT> + bool TryEnqueue(TT&& value, ui64 tag) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + TEntry* entry = queue.PrepareWrite(); + Y_ASSERT(entry); + TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); + entry->Tag = tag; + queue.CompleteWrite(); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + + TEntry* PrepareRead(size_t& index) { + TEntry* entry = nullptr; + ui64 tag = Max(); + + for (size_t i = 0; i < Concurrency; ++i) { + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } + + if (entry) { + // need second pass to catch updates within already scanned range + size_t candidate = index; + for (size_t i = 0; i < candidate; ++i) { + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } + } + + return entry; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many queue with strong FIFO guaranties. + // Writers will not block readers (and vice versa), but will block each other. + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + class TManyManyQueue: private TNonCopyable { + private: + TPadded<TLock> WriteLock; + TPadded<TLock> ReadLock; + + TOneOneQueue<T, ChunkSize> Queue; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + with_lock (WriteLock) { + Queue.Enqueue(std::forward<TT>(value)); + } + } + + bool Dequeue(T& value) { + with_lock (ReadLock) { + return Queue.Dequeue(value); + } + } + + bool IsEmpty() { + with_lock (ReadLock) { + return Queue.IsEmpty(); + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyOneQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + TAtomic WriteLock = 0; + }; + + private: + union { + size_t ReadPos = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[ReadPos++ % Concurrency]; + if (queue.Dequeue(value)) { + return true; + } + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; + } + + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyManyQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + union { + TAtomic WriteLock = 0; + char Pad1[PLATFORM_CACHE_LINE]; + }; + union { + TAtomic ReadLock = 0; + char Pad2[PLATFORM_CACHE_LINE]; + }; + }; + + private: + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t readPos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[readPos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool dequeued = queue.Dequeue(value); + AtomicUnlock(&queue.ReadLock); + if (dequeued) { + return true; + } + } + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool empty = queue.IsEmpty(); + AtomicUnlock(&queue.ReadLock); + if (!empty) { + return false; + } + } + } + return true; + } + + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Simple wrapper to deal with AutoPtrs + + template <typename T, typename TImpl> + class TAutoQueueBase: private TNonCopyable { + private: + TImpl Impl; + + public: + using TItem = TAutoPtr<T>; + + ~TAutoQueueBase() { + TAutoPtr<T> value; + while (Dequeue(value)) { + // do nothing + } + } + + void Enqueue(TAutoPtr<T> value) { + Impl.Enqueue(value.Get()); + Y_UNUSED(value.Release()); + } + + bool Dequeue(TAutoPtr<T>& value) { + T* ptr = nullptr; + if (Impl.Dequeue(ptr)) { + value.Reset(ptr); + return true; + } + return false; + } + + bool IsEmpty() { + return Impl.IsEmpty(); + } + }; + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; +} diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp new file mode 100644 index 0000000000..8cb36d8dd1 --- /dev/null +++ b/library/cpp/threading/chunk_queue/queue_ut.cpp @@ -0,0 +1,205 @@ +#include "queue.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/set.h> + +namespace NThreading { + //////////////////////////////////////////////////////////////////////////////// + + Y_UNIT_TEST_SUITE(TOneOneQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ + TOneOneQueue<int> queue; + + int result = 0; + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} + +Y_UNIT_TEST(ShouldReturnEntries) { + TOneOneQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 1); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 2); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 3); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} + +Y_UNIT_TEST(ShouldStoreMultipleChunks) { + TOneOneQueue<int, 100> queue; + for (int i = 0; i < 1000; ++i) { + queue.Enqueue(i); + } + + for (int i = 0; i < 1000; ++i) { + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, i); + } +} +} +; + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TManyOneQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ + TManyOneQueue<int> queue; + +int result; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} + +Y_UNIT_TEST(ShouldReturnEntries) { + TManyOneQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 1); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 2); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 3); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TManyManyQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ + TManyManyQueue<int> queue; + +int result = 0; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} + +Y_UNIT_TEST(ShouldReturnEntries) { + TManyManyQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 1); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 2); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT_EQUAL(result, 3); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ + TRelaxedManyOneQueue<int> queue; + +int result; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} + +Y_UNIT_TEST(ShouldReturnEntries) { + TSet<int> items = {1, 2, 3}; + + TRelaxedManyOneQueue<int> queue; + for (int item : items) { + queue.Enqueue(item); + } + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ + TRelaxedManyManyQueue<int> queue; + +int result = 0; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +} + +Y_UNIT_TEST(ShouldReturnEntries) { + TSet<int> items = {1, 2, 3}; + + TRelaxedManyManyQueue<int> queue; + for (int item : items) { + queue.Enqueue(item); + } + + int result = 0; + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(!queue.IsEmpty()); + UNIT_ASSERT(queue.Dequeue(result)); + UNIT_ASSERT(items.erase(result)); + + UNIT_ASSERT(queue.IsEmpty()); + UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; +} diff --git a/library/cpp/threading/chunk_queue/readme.txt b/library/cpp/threading/chunk_queue/readme.txt new file mode 100644 index 0000000000..7c9f046a86 --- /dev/null +++ b/library/cpp/threading/chunk_queue/readme.txt @@ -0,0 +1,60 @@ +vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64 +2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000 +2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4 +2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4 +2016-05-08T11:49:56.729502Z INFO: starting consumers... +2016-05-08T11:49:56.729621Z INFO: starting producers... +2016-05-08T11:49:56.729711Z INFO: wait for producers... +2016-05-08T11:50:14.650803Z INFO: wait for consumers... +2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds +2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds +2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration) +2016-05-08T11:50:14.650913Z INFO: starting consumers... +2016-05-08T11:50:14.651028Z INFO: starting producers... +2016-05-08T11:50:14.651122Z INFO: wait for producers... +2016-05-08T11:50:31.426378Z INFO: wait for consumers... +2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds +2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds +2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration) +2016-05-08T11:50:31.426584Z INFO: starting consumers... +2016-05-08T11:50:31.426655Z INFO: starting producers... +2016-05-08T11:50:31.426749Z INFO: wait for producers... +2016-05-08T11:50:40.578425Z INFO: wait for consumers... +2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds +2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds +2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration) +2016-05-08T11:50:40.578670Z INFO: starting consumers... +2016-05-08T11:50:40.578742Z INFO: starting producers... +2016-05-08T11:50:40.578893Z INFO: wait for producers... +2016-05-08T11:50:47.447686Z INFO: wait for consumers... +2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds +2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds +2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration) +2016-05-08T11:50:47.447901Z INFO: starting consumers... +2016-05-08T11:50:47.447967Z INFO: starting producers... +2016-05-08T11:50:47.448058Z INFO: wait for producers... +2016-05-08T11:50:50.469710Z INFO: wait for consumers... +2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds +2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds +2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration) +2016-05-08T11:50:50.469947Z INFO: starting consumers... +2016-05-08T11:50:50.470012Z INFO: starting producers... +2016-05-08T11:50:50.470104Z INFO: wait for producers... +2016-05-08T11:50:53.139964Z INFO: wait for consumers... +2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds +2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds +2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration) +2016-05-08T11:50:53.140206Z INFO: starting consumers... +2016-05-08T11:50:53.140281Z INFO: starting producers... +2016-05-08T11:50:53.140371Z INFO: wait for producers... +2016-05-08T11:50:59.067812Z INFO: wait for consumers... +2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds +2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds +2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration) +2016-05-08T11:50:59.068068Z INFO: starting consumers... +2016-05-08T11:50:59.068179Z INFO: starting producers... +2016-05-08T11:50:59.068288Z INFO: wait for producers... +2016-05-08T11:51:03.427416Z INFO: wait for consumers... +2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds +2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds +2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration) diff --git a/library/cpp/threading/chunk_queue/ut/ya.make b/library/cpp/threading/chunk_queue/ut/ya.make new file mode 100644 index 0000000000..a35ed6bc4b --- /dev/null +++ b/library/cpp/threading/chunk_queue/ut/ya.make @@ -0,0 +1,9 @@ +UNITTEST_FOR(library/cpp/threading/chunk_queue) + +OWNER(g:rtmr) + +SRCS( + queue_ut.cpp +) + +END() diff --git a/library/cpp/threading/chunk_queue/ya.make b/library/cpp/threading/chunk_queue/ya.make new file mode 100644 index 0000000000..2f883140ba --- /dev/null +++ b/library/cpp/threading/chunk_queue/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +OWNER(g:rtmr) + +SRCS( + queue.cpp +) + +END() |