aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/chunk_queue
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/chunk_queue
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/chunk_queue')
-rw-r--r--library/cpp/threading/chunk_queue/queue.cpp1
-rw-r--r--library/cpp/threading/chunk_queue/queue.h568
-rw-r--r--library/cpp/threading/chunk_queue/queue_ut.cpp205
-rw-r--r--library/cpp/threading/chunk_queue/readme.txt60
-rw-r--r--library/cpp/threading/chunk_queue/ut/ya.make9
-rw-r--r--library/cpp/threading/chunk_queue/ya.make9
6 files changed, 852 insertions, 0 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.cpp b/library/cpp/threading/chunk_queue/queue.cpp
new file mode 100644
index 0000000000..4ebd3f3205
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/queue.cpp
@@ -0,0 +1 @@
+#include "queue.h"
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
new file mode 100644
index 0000000000..55859601a1
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -0,0 +1,568 @@
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/vector.h>
+#include <util/generic/ylimits.h>
+#include <util/system/atomic.h>
+#include <util/system/guard.h>
+#include <util/system/spinlock.h>
+#include <util/system/yassert.h>
+
+#include <type_traits>
+#include <utility>
+
+namespace NThreading {
+////////////////////////////////////////////////////////////////////////////////
+// Platform helpers
+
+#if !defined(PLATFORM_CACHE_LINE)
+#define PLATFORM_CACHE_LINE 64
+#endif
+
+#if !defined(PLATFORM_PAGE_SIZE)
+#define PLATFORM_PAGE_SIZE 4 * 1024
+#endif
+
+ template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
+ struct TPadded: public T {
+ char Pad[PadSize - sizeof(T) % PadSize];
+
+ TPadded() {
+ static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
+ Y_UNUSED(Pad);
+ }
+
+ template<typename... Args>
+ TPadded(Args&&... args)
+ : T(std::forward<Args>(args)...)
+ {
+ static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
+ Y_UNUSED(Pad);
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Type helpers
+
+ namespace NImpl {
+ template <typename T>
+ struct TPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ *ptr = value;
+ }
+
+ static T Read(T* ptr) {
+ return *ptr;
+ }
+
+ static void Destroy(T* ptr) {
+ Y_UNUSED(ptr);
+ }
+ };
+
+ template <typename T>
+ struct TNonPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ new (ptr) T(std::forward<TT>(value));
+ }
+
+ static T Read(T* ptr) {
+ return std::move(*ptr);
+ }
+
+ static void Destroy(T* ptr) {
+ (void)ptr; /* Make MSVC happy. */
+ ptr->~T();
+ }
+ };
+
+ template <typename T>
+ using TTypeHelper = std::conditional_t<
+ TTypeTraits<T>::IsPod,
+ TPodTypeHelper<T>,
+ TNonPodTypeHelper<T>>;
+
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // One producer/one consumer chunked queue.
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TOneOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TChunk;
+
+ struct TChunkHeader {
+ size_t Count = 0;
+ TChunk* Next = nullptr;
+ };
+
+ struct TChunk: public TChunkHeader {
+ static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
+
+ char Entries[MaxCount * sizeof(T)];
+
+ TChunk() {
+ Y_UNUSED(Entries); // uninitialized
+ }
+
+ ~TChunk() {
+ for (size_t i = 0; i < this->Count; ++i) {
+ TTypeHelper::Destroy(GetPtr(i));
+ }
+ }
+
+ T* GetPtr(size_t i) {
+ return (T*)Entries + i;
+ }
+ };
+
+ struct TWriterState {
+ TChunk* Chunk = nullptr;
+ };
+
+ struct TReaderState {
+ TChunk* Chunk = nullptr;
+ size_t Count = 0;
+ };
+
+ private:
+ TPadded<TWriterState> Writer;
+ TPadded<TReaderState> Reader;
+
+ public:
+ using TItem = T;
+
+ TOneOneQueue() {
+ Writer.Chunk = Reader.Chunk = new TChunk();
+ }
+
+ ~TOneOneQueue() {
+ DeleteChunks(Reader.Chunk);
+ }
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ T* ptr = PrepareWrite();
+ Y_ASSERT(ptr);
+ TTypeHelper::Write(ptr, std::forward<TT>(value));
+ CompleteWrite();
+ }
+
+ bool Dequeue(T& value) {
+ if (T* ptr = PrepareRead()) {
+ value = TTypeHelper::Read(ptr);
+ CompleteRead();
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ return !PrepareRead();
+ }
+
+ protected:
+ T* PrepareWrite() {
+ TChunk* chunk = Writer.Chunk;
+ Y_ASSERT(chunk && !chunk->Next);
+
+ if (chunk->Count != TChunk::MaxCount) {
+ return chunk->GetPtr(chunk->Count);
+ }
+
+ chunk = new TChunk();
+ AtomicSet(Writer.Chunk->Next, chunk);
+ Writer.Chunk = chunk;
+ return chunk->GetPtr(0);
+ }
+
+ void CompleteWrite() {
+ AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
+ }
+
+ T* PrepareRead() {
+ TChunk* chunk = Reader.Chunk;
+ Y_ASSERT(chunk);
+
+ for (;;) {
+ size_t writerCount = AtomicGet(chunk->Count);
+ if (Reader.Count != writerCount) {
+ return chunk->GetPtr(Reader.Count);
+ }
+
+ if (writerCount != TChunk::MaxCount) {
+ return nullptr;
+ }
+
+ chunk = AtomicGet(chunk->Next);
+ if (!chunk) {
+ return nullptr;
+ }
+
+ delete Reader.Chunk;
+ Reader.Chunk = chunk;
+ Reader.Count = 0;
+ }
+ }
+
+ void CompleteRead() {
+ ++Reader.Count;
+ }
+
+ private:
+ static void DeleteChunks(TChunk* chunk) {
+ while (chunk) {
+ TChunk* next = chunk->Next;
+ delete chunk;
+ chunk = next;
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Provides FIFO guaranties for each producer.
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TManyOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TEntry {
+ T Value;
+ ui64 Tag;
+ };
+
+ struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
+ TAtomic WriteLock = 0;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
+ };
+
+ private:
+ union {
+ TAtomic WriteTag = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ ui64 tag = NextTag();
+ while (!TryEnqueue(std::forward<TT>(value), tag)) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t index = 0;
+ if (TEntry* entry = PrepareRead(index)) {
+ value = TTypeHelper::Read(&entry->Value);
+ Queues[index].CompleteRead();
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ ui64 NextTag() {
+ // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
+ // return GetCycleCount();
+ return AtomicIncrement(WriteTag);
+ }
+
+ template <typename TT>
+ bool TryEnqueue(TT&& value, ui64 tag) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ TEntry* entry = queue.PrepareWrite();
+ Y_ASSERT(entry);
+ TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
+ entry->Tag = tag;
+ queue.CompleteWrite();
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ TEntry* PrepareRead(size_t& index) {
+ TEntry* entry = nullptr;
+ ui64 tag = Max();
+
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
+
+ if (entry) {
+ // need second pass to catch updates within already scanned range
+ size_t candidate = index;
+ for (size_t i = 0; i < candidate; ++i) {
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
+ }
+
+ return entry;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many queue with strong FIFO guaranties.
+ // Writers will not block readers (and vice versa), but will block each other.
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ class TManyManyQueue: private TNonCopyable {
+ private:
+ TPadded<TLock> WriteLock;
+ TPadded<TLock> ReadLock;
+
+ TOneOneQueue<T, ChunkSize> Queue;
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ with_lock (WriteLock) {
+ Queue.Enqueue(std::forward<TT>(value));
+ }
+ }
+
+ bool Dequeue(T& value) {
+ with_lock (ReadLock) {
+ return Queue.Dequeue(value);
+ }
+ }
+
+ bool IsEmpty() {
+ with_lock (ReadLock) {
+ return Queue.IsEmpty();
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyOneQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ TAtomic WriteLock = 0;
+ };
+
+ private:
+ union {
+ size_t ReadPos = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[ReadPos++ % Concurrency];
+ if (queue.Dequeue(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyManyQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ union {
+ TAtomic WriteLock = 0;
+ char Pad1[PLATFORM_CACHE_LINE];
+ };
+ union {
+ TAtomic ReadLock = 0;
+ char Pad2[PLATFORM_CACHE_LINE];
+ };
+ };
+
+ private:
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t readPos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[readPos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool dequeued = queue.Dequeue(value);
+ AtomicUnlock(&queue.ReadLock);
+ if (dequeued) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool empty = queue.IsEmpty();
+ AtomicUnlock(&queue.ReadLock);
+ if (!empty) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Simple wrapper to deal with AutoPtrs
+
+ template <typename T, typename TImpl>
+ class TAutoQueueBase: private TNonCopyable {
+ private:
+ TImpl Impl;
+
+ public:
+ using TItem = TAutoPtr<T>;
+
+ ~TAutoQueueBase() {
+ TAutoPtr<T> value;
+ while (Dequeue(value)) {
+ // do nothing
+ }
+ }
+
+ void Enqueue(TAutoPtr<T> value) {
+ Impl.Enqueue(value.Get());
+ Y_UNUSED(value.Release());
+ }
+
+ bool Dequeue(TAutoPtr<T>& value) {
+ T* ptr = nullptr;
+ if (Impl.Dequeue(ptr)) {
+ value.Reset(ptr);
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ return Impl.IsEmpty();
+ }
+ };
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
+}
diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp
new file mode 100644
index 0000000000..8cb36d8dd1
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/queue_ut.cpp
@@ -0,0 +1,205 @@
+#include "queue.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/set.h>
+
+namespace NThreading {
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Y_UNIT_TEST_SUITE(TOneOneQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
+ TOneOneQueue<int> queue;
+
+ int result = 0;
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+
+Y_UNIT_TEST(ShouldReturnEntries) {
+ TOneOneQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 1);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 2);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 3);
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+
+Y_UNIT_TEST(ShouldStoreMultipleChunks) {
+ TOneOneQueue<int, 100> queue;
+ for (int i = 0; i < 1000; ++i) {
+ queue.Enqueue(i);
+ }
+
+ for (int i = 0; i < 1000; ++i) {
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, i);
+ }
+}
+}
+;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_UNIT_TEST_SUITE(TManyOneQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
+ TManyOneQueue<int> queue;
+
+int result;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
+
+Y_UNIT_TEST(ShouldReturnEntries) {
+ TManyOneQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 1);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 2);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 3);
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_UNIT_TEST_SUITE(TManyManyQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
+ TManyManyQueue<int> queue;
+
+int result = 0;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
+
+Y_UNIT_TEST(ShouldReturnEntries) {
+ TManyManyQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 1);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 2);
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT_EQUAL(result, 3);
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
+ TRelaxedManyOneQueue<int> queue;
+
+int result;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
+
+Y_UNIT_TEST(ShouldReturnEntries) {
+ TSet<int> items = {1, 2, 3};
+
+ TRelaxedManyOneQueue<int> queue;
+ for (int item : items) {
+ queue.Enqueue(item);
+ }
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
+ TRelaxedManyManyQueue<int> queue;
+
+int result = 0;
+UNIT_ASSERT(queue.IsEmpty());
+UNIT_ASSERT(!queue.Dequeue(result));
+}
+
+Y_UNIT_TEST(ShouldReturnEntries) {
+ TSet<int> items = {1, 2, 3};
+
+ TRelaxedManyManyQueue<int> queue;
+ for (int item : items) {
+ queue.Enqueue(item);
+ }
+
+ int result = 0;
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(!queue.IsEmpty());
+ UNIT_ASSERT(queue.Dequeue(result));
+ UNIT_ASSERT(items.erase(result));
+
+ UNIT_ASSERT(queue.IsEmpty());
+ UNIT_ASSERT(!queue.Dequeue(result));
+}
+}
+;
+}
diff --git a/library/cpp/threading/chunk_queue/readme.txt b/library/cpp/threading/chunk_queue/readme.txt
new file mode 100644
index 0000000000..7c9f046a86
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/readme.txt
@@ -0,0 +1,60 @@
+vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64
+2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000
+2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4
+2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4
+2016-05-08T11:49:56.729502Z INFO: starting consumers...
+2016-05-08T11:49:56.729621Z INFO: starting producers...
+2016-05-08T11:49:56.729711Z INFO: wait for producers...
+2016-05-08T11:50:14.650803Z INFO: wait for consumers...
+2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds
+2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds
+2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration)
+2016-05-08T11:50:14.650913Z INFO: starting consumers...
+2016-05-08T11:50:14.651028Z INFO: starting producers...
+2016-05-08T11:50:14.651122Z INFO: wait for producers...
+2016-05-08T11:50:31.426378Z INFO: wait for consumers...
+2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds
+2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds
+2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration)
+2016-05-08T11:50:31.426584Z INFO: starting consumers...
+2016-05-08T11:50:31.426655Z INFO: starting producers...
+2016-05-08T11:50:31.426749Z INFO: wait for producers...
+2016-05-08T11:50:40.578425Z INFO: wait for consumers...
+2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds
+2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds
+2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration)
+2016-05-08T11:50:40.578670Z INFO: starting consumers...
+2016-05-08T11:50:40.578742Z INFO: starting producers...
+2016-05-08T11:50:40.578893Z INFO: wait for producers...
+2016-05-08T11:50:47.447686Z INFO: wait for consumers...
+2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds
+2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds
+2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration)
+2016-05-08T11:50:47.447901Z INFO: starting consumers...
+2016-05-08T11:50:47.447967Z INFO: starting producers...
+2016-05-08T11:50:47.448058Z INFO: wait for producers...
+2016-05-08T11:50:50.469710Z INFO: wait for consumers...
+2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds
+2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds
+2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration)
+2016-05-08T11:50:50.469947Z INFO: starting consumers...
+2016-05-08T11:50:50.470012Z INFO: starting producers...
+2016-05-08T11:50:50.470104Z INFO: wait for producers...
+2016-05-08T11:50:53.139964Z INFO: wait for consumers...
+2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds
+2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds
+2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration)
+2016-05-08T11:50:53.140206Z INFO: starting consumers...
+2016-05-08T11:50:53.140281Z INFO: starting producers...
+2016-05-08T11:50:53.140371Z INFO: wait for producers...
+2016-05-08T11:50:59.067812Z INFO: wait for consumers...
+2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds
+2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds
+2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration)
+2016-05-08T11:50:59.068068Z INFO: starting consumers...
+2016-05-08T11:50:59.068179Z INFO: starting producers...
+2016-05-08T11:50:59.068288Z INFO: wait for producers...
+2016-05-08T11:51:03.427416Z INFO: wait for consumers...
+2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds
+2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds
+2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration)
diff --git a/library/cpp/threading/chunk_queue/ut/ya.make b/library/cpp/threading/chunk_queue/ut/ya.make
new file mode 100644
index 0000000000..a35ed6bc4b
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/ut/ya.make
@@ -0,0 +1,9 @@
+UNITTEST_FOR(library/cpp/threading/chunk_queue)
+
+OWNER(g:rtmr)
+
+SRCS(
+ queue_ut.cpp
+)
+
+END()
diff --git a/library/cpp/threading/chunk_queue/ya.make b/library/cpp/threading/chunk_queue/ya.make
new file mode 100644
index 0000000000..2f883140ba
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/ya.make
@@ -0,0 +1,9 @@
+LIBRARY()
+
+OWNER(g:rtmr)
+
+SRCS(
+ queue.cpp
+)
+
+END()