diff options
author | vskipin <vskipin@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
commit | 4d8b546b89b5afc08cf3667e176271c7ba935f33 (patch) | |
tree | 1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /library/cpp/threading/chunk_queue | |
parent | 4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (diff) | |
download | ydb-4d8b546b89b5afc08cf3667e176271c7ba935f33.tar.gz |
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/chunk_queue')
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.cpp | 2 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 316 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/queue_ut.cpp | 118 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/readme.txt | 120 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/ut/ya.make | 14 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/ya.make | 16 |
6 files changed, 293 insertions, 293 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.cpp b/library/cpp/threading/chunk_queue/queue.cpp index 52dd119921..4ebd3f3205 100644 --- a/library/cpp/threading/chunk_queue/queue.cpp +++ b/library/cpp/threading/chunk_queue/queue.cpp @@ -1 +1 @@ -#include "queue.h" +#include "queue.h" diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index fdf4c93f92..55859601a1 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -1,35 +1,35 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/generic/typetraits.h> -#include <util/generic/vector.h> -#include <util/generic/ylimits.h> -#include <util/system/atomic.h> -#include <util/system/guard.h> -#include <util/system/spinlock.h> -#include <util/system/yassert.h> - -#include <type_traits> -#include <utility> - -namespace NThreading { -//////////////////////////////////////////////////////////////////////////////// -// Platform helpers - -#if !defined(PLATFORM_CACHE_LINE) +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/typetraits.h> +#include <util/generic/vector.h> +#include <util/generic/ylimits.h> +#include <util/system/atomic.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> +#include <util/system/yassert.h> + +#include <type_traits> +#include <utility> + +namespace NThreading { +//////////////////////////////////////////////////////////////////////////////// +// Platform helpers + +#if !defined(PLATFORM_CACHE_LINE) #define PLATFORM_CACHE_LINE 64 -#endif - -#if !defined(PLATFORM_PAGE_SIZE) +#endif + +#if !defined(PLATFORM_PAGE_SIZE) #define PLATFORM_PAGE_SIZE 4 * 1024 -#endif - +#endif + template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> struct TPadded: public T { char Pad[PadSize - sizeof(T) % PadSize]; - + TPadded() { static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); Y_UNUSED(Pad); @@ -43,10 +43,10 @@ namespace NThreading { Y_UNUSED(Pad); } }; - + //////////////////////////////////////////////////////////////////////////////// // Type helpers - + namespace NImpl { template <typename T> struct TPodTypeHelper { @@ -54,99 +54,99 @@ namespace NThreading { static void Write(T* ptr, TT&& value) { *ptr = value; } - + static T Read(T* ptr) { return *ptr; } - + static void Destroy(T* ptr) { Y_UNUSED(ptr); } }; - + template <typename T> struct TNonPodTypeHelper { template <typename TT> static void Write(T* ptr, TT&& value) { new (ptr) T(std::forward<TT>(value)); } - + static T Read(T* ptr) { return std::move(*ptr); } - + static void Destroy(T* ptr) { (void)ptr; /* Make MSVC happy. */ ptr->~T(); } }; - + template <typename T> using TTypeHelper = std::conditional_t< TTypeTraits<T>::IsPod, TPodTypeHelper<T>, TNonPodTypeHelper<T>>; - + } - + //////////////////////////////////////////////////////////////////////////////// // One producer/one consumer chunked queue. - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TOneOneQueue: private TNonCopyable { using TTypeHelper = NImpl::TTypeHelper<T>; - + struct TChunk; - + struct TChunkHeader { size_t Count = 0; TChunk* Next = nullptr; }; - + struct TChunk: public TChunkHeader { static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); - + char Entries[MaxCount * sizeof(T)]; - + TChunk() { Y_UNUSED(Entries); // uninitialized } - + ~TChunk() { for (size_t i = 0; i < this->Count; ++i) { TTypeHelper::Destroy(GetPtr(i)); } - } - + } + T* GetPtr(size_t i) { return (T*)Entries + i; } }; - + struct TWriterState { TChunk* Chunk = nullptr; }; - + struct TReaderState { TChunk* Chunk = nullptr; size_t Count = 0; }; - + private: TPadded<TWriterState> Writer; TPadded<TReaderState> Reader; - + public: using TItem = T; - + TOneOneQueue() { Writer.Chunk = Reader.Chunk = new TChunk(); } - + ~TOneOneQueue() { DeleteChunks(Reader.Chunk); } - + template <typename TT> void Enqueue(TT&& value) { T* ptr = PrepareWrite(); @@ -154,7 +154,7 @@ namespace NThreading { TTypeHelper::Write(ptr, std::forward<TT>(value)); CompleteWrite(); } - + bool Dequeue(T& value) { if (T* ptr = PrepareRead()) { value = TTypeHelper::Read(ptr); @@ -162,17 +162,17 @@ namespace NThreading { return true; } return false; - } - + } + bool IsEmpty() { return !PrepareRead(); } - + protected: T* PrepareWrite() { TChunk* chunk = Writer.Chunk; Y_ASSERT(chunk && !chunk->Next); - + if (chunk->Count != TChunk::MaxCount) { return chunk->GetPtr(chunk->Count); } @@ -181,41 +181,41 @@ namespace NThreading { AtomicSet(Writer.Chunk->Next, chunk); Writer.Chunk = chunk; return chunk->GetPtr(0); - } - + } + void CompleteWrite() { AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); } - + T* PrepareRead() { TChunk* chunk = Reader.Chunk; Y_ASSERT(chunk); - + for (;;) { size_t writerCount = AtomicGet(chunk->Count); if (Reader.Count != writerCount) { return chunk->GetPtr(Reader.Count); } - + if (writerCount != TChunk::MaxCount) { return nullptr; } - + chunk = AtomicGet(chunk->Next); if (!chunk) { return nullptr; } - + delete Reader.Chunk; Reader.Chunk = chunk; Reader.Count = 0; - } + } } - + void CompleteRead() { ++Reader.Count; - } - + } + private: static void DeleteChunks(TChunk* chunk) { while (chunk) { @@ -223,51 +223,51 @@ namespace NThreading { delete chunk; chunk = next; } - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Multiple producers/single consumer partitioned queue. // Provides FIFO guaranties for each producer. - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TManyOneQueue: private TNonCopyable { using TTypeHelper = NImpl::TTypeHelper<T>; - + struct TEntry { T Value; ui64 Tag; }; - + struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { TAtomic WriteLock = 0; - + using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; - + using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; }; - + private: union { TAtomic WriteTag = 0; char Pad[PLATFORM_CACHE_LINE]; }; - + TQueueType Queues[Concurrency]; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { ui64 tag = NextTag(); while (!TryEnqueue(std::forward<TT>(value), tag)) { SpinLockPause(); } - } - + } + bool Dequeue(T& value) { size_t index = 0; if (TEntry* entry = PrepareRead(index)) { @@ -276,24 +276,24 @@ namespace NThreading { return true; } return false; - } - + } + bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { if (!Queues[i].IsEmpty()) { return false; } - } + } return true; - } - + } + private: ui64 NextTag() { // TODO: can we avoid synchronization here? it costs 1.5x performance penalty // return GetCycleCount(); return AtomicIncrement(WriteTag); } - + template <typename TT> bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { @@ -307,22 +307,22 @@ namespace NThreading { AtomicUnlock(&queue.WriteLock); return true; } - } + } return false; - } - + } + TEntry* PrepareRead(size_t& index) { TEntry* entry = nullptr; ui64 tag = Max(); - + for (size_t i = 0; i < Concurrency; ++i) { - TEntry* e = Queues[i].PrepareRead(); - if (e && e->Tag < tag) { - index = i; - entry = e; - tag = e->Tag; - } - } + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } if (entry) { // need second pass to catch updates within already scanned range @@ -338,91 +338,91 @@ namespace NThreading { } return entry; - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Concurrent many-many queue with strong FIFO guaranties. // Writers will not block readers (and vice versa), but will block each other. - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> class TManyManyQueue: private TNonCopyable { private: TPadded<TLock> WriteLock; TPadded<TLock> ReadLock; - + TOneOneQueue<T, ChunkSize> Queue; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { with_lock (WriteLock) { Queue.Enqueue(std::forward<TT>(value)); } - } - + } + bool Dequeue(T& value) { with_lock (ReadLock) { return Queue.Dequeue(value); } - } - + } + bool IsEmpty() { with_lock (ReadLock) { return Queue.IsEmpty(); } - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Multiple producers/single consumer partitioned queue. // Because of random partitioning reordering possible - FIFO not guaranteed! - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TRelaxedManyOneQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue<T, ChunkSize> { TAtomic WriteLock = 0; }; - + private: union { size_t ReadPos = 0; char Pad[PLATFORM_CACHE_LINE]; }; - + TQueueType Queues[Concurrency]; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { while (!TryEnqueue(std::forward<TT>(value))) { SpinLockPause(); } - } - + } + bool Dequeue(T& value) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[ReadPos++ % Concurrency]; if (queue.Dequeue(value)) { return true; } - } + } return false; - } - + } + bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { if (!Queues[i].IsEmpty()) { return false; } - } + } return true; - } - + } + private: template <typename TT> bool TryEnqueue(TT&& value) { @@ -434,15 +434,15 @@ namespace NThreading { AtomicUnlock(&queue.WriteLock); return true; } - } + } return false; - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Concurrent many-many partitioned queue. // Because of random partitioning reordering possible - FIFO not guaranteed! - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TRelaxedManyManyQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue<T, ChunkSize> { @@ -454,21 +454,21 @@ namespace NThreading { TAtomic ReadLock = 0; char Pad2[PLATFORM_CACHE_LINE]; }; - }; - + }; + private: TQueueType Queues[Concurrency]; - + public: using TItem = T; - + template <typename TT> void Enqueue(TT&& value) { while (!TryEnqueue(std::forward<TT>(value))) { SpinLockPause(); } - } - + } + bool Dequeue(T& value) { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { @@ -479,11 +479,11 @@ namespace NThreading { if (dequeued) { return true; } - } - } + } + } return false; - } - + } + bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; @@ -493,11 +493,11 @@ namespace NThreading { if (!empty) { return false; } - } - } + } + } return true; - } - + } + private: template <typename TT> bool TryEnqueue(TT&& value) { @@ -509,34 +509,34 @@ namespace NThreading { AtomicUnlock(&queue.WriteLock); return true; } - } + } return false; - } + } }; - + //////////////////////////////////////////////////////////////////////////////// // Simple wrapper to deal with AutoPtrs - + template <typename T, typename TImpl> class TAutoQueueBase: private TNonCopyable { private: TImpl Impl; - + public: using TItem = TAutoPtr<T>; - + ~TAutoQueueBase() { TAutoPtr<T> value; while (Dequeue(value)) { // do nothing } - } - + } + void Enqueue(TAutoPtr<T> value) { Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); } - + bool Dequeue(TAutoPtr<T>& value) { T* ptr = nullptr; if (Impl.Dequeue(ptr)) { @@ -544,25 +544,25 @@ namespace NThreading { return true; } return false; - } - + } + bool IsEmpty() { return Impl.IsEmpty(); } }; - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; - + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; - + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; } diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp index dc103202e8..8cb36d8dd1 100644 --- a/library/cpp/threading/chunk_queue/queue_ut.cpp +++ b/library/cpp/threading/chunk_queue/queue_ut.cpp @@ -1,202 +1,202 @@ -#include "queue.h" - +#include "queue.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/set.h> - -namespace NThreading { + +#include <util/generic/set.h> + +namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - + Y_UNIT_TEST_SUITE(TOneOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ TOneOneQueue<int> queue; - + int result = 0; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TOneOneQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); queue.Enqueue(3); - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 1); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 2); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 3); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldStoreMultipleChunks) { TOneOneQueue<int, 100> queue; for (int i = 0; i < 1000; ++i) { queue.Enqueue(i); - } - + } + for (int i = 0; i < 1000; ++i) { int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, i); - } + } } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TManyOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TManyOneQueue<int> queue; - + TManyOneQueue<int> queue; + int result; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TManyOneQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); queue.Enqueue(3); - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 1); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 2); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 3); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TManyManyQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TManyManyQueue<int> queue; - + TManyManyQueue<int> queue; + int result = 0; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TManyManyQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); queue.Enqueue(3); - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 1); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 2); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT_EQUAL(result, 3); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TRelaxedManyOneQueue<int> queue; - + TRelaxedManyOneQueue<int> queue; + int result; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TSet<int> items = {1, 2, 3}; - + TRelaxedManyOneQueue<int> queue; for (int item : items) { queue.Enqueue(item); } - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } } ; - -//////////////////////////////////////////////////////////////////////////////// - + +//////////////////////////////////////////////////////////////////////////////// + Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){ Y_UNIT_TEST(ShouldBeEmptyAtStart){ - TRelaxedManyManyQueue<int> queue; - + TRelaxedManyManyQueue<int> queue; + int result = 0; UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } - + Y_UNIT_TEST(ShouldReturnEntries) { TSet<int> items = {1, 2, 3}; - + TRelaxedManyManyQueue<int> queue; for (int item : items) { queue.Enqueue(item); } - + int result = 0; UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(!queue.IsEmpty()); UNIT_ASSERT(queue.Dequeue(result)); UNIT_ASSERT(items.erase(result)); - + UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } diff --git a/library/cpp/threading/chunk_queue/readme.txt b/library/cpp/threading/chunk_queue/readme.txt index 104a8ec744..7c9f046a86 100644 --- a/library/cpp/threading/chunk_queue/readme.txt +++ b/library/cpp/threading/chunk_queue/readme.txt @@ -1,60 +1,60 @@ -vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64 -2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000 -2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4 -2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4 -2016-05-08T11:49:56.729502Z INFO: starting consumers... -2016-05-08T11:49:56.729621Z INFO: starting producers... -2016-05-08T11:49:56.729711Z INFO: wait for producers... -2016-05-08T11:50:14.650803Z INFO: wait for consumers... -2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds -2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds -2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration) -2016-05-08T11:50:14.650913Z INFO: starting consumers... -2016-05-08T11:50:14.651028Z INFO: starting producers... -2016-05-08T11:50:14.651122Z INFO: wait for producers... -2016-05-08T11:50:31.426378Z INFO: wait for consumers... -2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds -2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds -2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration) -2016-05-08T11:50:31.426584Z INFO: starting consumers... -2016-05-08T11:50:31.426655Z INFO: starting producers... -2016-05-08T11:50:31.426749Z INFO: wait for producers... -2016-05-08T11:50:40.578425Z INFO: wait for consumers... -2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds -2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds -2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration) -2016-05-08T11:50:40.578670Z INFO: starting consumers... -2016-05-08T11:50:40.578742Z INFO: starting producers... -2016-05-08T11:50:40.578893Z INFO: wait for producers... -2016-05-08T11:50:47.447686Z INFO: wait for consumers... -2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds -2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds -2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration) -2016-05-08T11:50:47.447901Z INFO: starting consumers... -2016-05-08T11:50:47.447967Z INFO: starting producers... -2016-05-08T11:50:47.448058Z INFO: wait for producers... -2016-05-08T11:50:50.469710Z INFO: wait for consumers... -2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds -2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds -2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration) -2016-05-08T11:50:50.469947Z INFO: starting consumers... -2016-05-08T11:50:50.470012Z INFO: starting producers... -2016-05-08T11:50:50.470104Z INFO: wait for producers... -2016-05-08T11:50:53.139964Z INFO: wait for consumers... -2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds -2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds -2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration) -2016-05-08T11:50:53.140206Z INFO: starting consumers... -2016-05-08T11:50:53.140281Z INFO: starting producers... -2016-05-08T11:50:53.140371Z INFO: wait for producers... -2016-05-08T11:50:59.067812Z INFO: wait for consumers... -2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds -2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds -2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration) -2016-05-08T11:50:59.068068Z INFO: starting consumers... -2016-05-08T11:50:59.068179Z INFO: starting producers... -2016-05-08T11:50:59.068288Z INFO: wait for producers... -2016-05-08T11:51:03.427416Z INFO: wait for consumers... -2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds -2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds -2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration) +vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64 +2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000 +2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4 +2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4 +2016-05-08T11:49:56.729502Z INFO: starting consumers... +2016-05-08T11:49:56.729621Z INFO: starting producers... +2016-05-08T11:49:56.729711Z INFO: wait for producers... +2016-05-08T11:50:14.650803Z INFO: wait for consumers... +2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds +2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds +2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration) +2016-05-08T11:50:14.650913Z INFO: starting consumers... +2016-05-08T11:50:14.651028Z INFO: starting producers... +2016-05-08T11:50:14.651122Z INFO: wait for producers... +2016-05-08T11:50:31.426378Z INFO: wait for consumers... +2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds +2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds +2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration) +2016-05-08T11:50:31.426584Z INFO: starting consumers... +2016-05-08T11:50:31.426655Z INFO: starting producers... +2016-05-08T11:50:31.426749Z INFO: wait for producers... +2016-05-08T11:50:40.578425Z INFO: wait for consumers... +2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds +2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds +2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration) +2016-05-08T11:50:40.578670Z INFO: starting consumers... +2016-05-08T11:50:40.578742Z INFO: starting producers... +2016-05-08T11:50:40.578893Z INFO: wait for producers... +2016-05-08T11:50:47.447686Z INFO: wait for consumers... +2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds +2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds +2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration) +2016-05-08T11:50:47.447901Z INFO: starting consumers... +2016-05-08T11:50:47.447967Z INFO: starting producers... +2016-05-08T11:50:47.448058Z INFO: wait for producers... +2016-05-08T11:50:50.469710Z INFO: wait for consumers... +2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds +2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds +2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration) +2016-05-08T11:50:50.469947Z INFO: starting consumers... +2016-05-08T11:50:50.470012Z INFO: starting producers... +2016-05-08T11:50:50.470104Z INFO: wait for producers... +2016-05-08T11:50:53.139964Z INFO: wait for consumers... +2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds +2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds +2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration) +2016-05-08T11:50:53.140206Z INFO: starting consumers... +2016-05-08T11:50:53.140281Z INFO: starting producers... +2016-05-08T11:50:53.140371Z INFO: wait for producers... +2016-05-08T11:50:59.067812Z INFO: wait for consumers... +2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds +2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds +2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration) +2016-05-08T11:50:59.068068Z INFO: starting consumers... +2016-05-08T11:50:59.068179Z INFO: starting producers... +2016-05-08T11:50:59.068288Z INFO: wait for producers... +2016-05-08T11:51:03.427416Z INFO: wait for consumers... +2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds +2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds +2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration) diff --git a/library/cpp/threading/chunk_queue/ut/ya.make b/library/cpp/threading/chunk_queue/ut/ya.make index d69e219f66..a35ed6bc4b 100644 --- a/library/cpp/threading/chunk_queue/ut/ya.make +++ b/library/cpp/threading/chunk_queue/ut/ya.make @@ -1,9 +1,9 @@ UNITTEST_FOR(library/cpp/threading/chunk_queue) - + OWNER(g:rtmr) - -SRCS( - queue_ut.cpp -) - -END() + +SRCS( + queue_ut.cpp +) + +END() diff --git a/library/cpp/threading/chunk_queue/ya.make b/library/cpp/threading/chunk_queue/ya.make index 7e6ead7b36..2f883140ba 100644 --- a/library/cpp/threading/chunk_queue/ya.make +++ b/library/cpp/threading/chunk_queue/ya.make @@ -1,9 +1,9 @@ -LIBRARY() - +LIBRARY() + OWNER(g:rtmr) - -SRCS( - queue.cpp -) - -END() + +SRCS( + queue.cpp +) + +END() |