aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/chunk_queue
diff options
context:
space:
mode:
authorvskipin <vskipin@yandex-team.ru>2022-02-10 16:46:00 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:00 +0300
commit4d8b546b89b5afc08cf3667e176271c7ba935f33 (patch)
tree1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /library/cpp/threading/chunk_queue
parent4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (diff)
downloadydb-4d8b546b89b5afc08cf3667e176271c7ba935f33.tar.gz
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/chunk_queue')
-rw-r--r--library/cpp/threading/chunk_queue/queue.cpp2
-rw-r--r--library/cpp/threading/chunk_queue/queue.h316
-rw-r--r--library/cpp/threading/chunk_queue/queue_ut.cpp118
-rw-r--r--library/cpp/threading/chunk_queue/readme.txt120
-rw-r--r--library/cpp/threading/chunk_queue/ut/ya.make14
-rw-r--r--library/cpp/threading/chunk_queue/ya.make16
6 files changed, 293 insertions, 293 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.cpp b/library/cpp/threading/chunk_queue/queue.cpp
index 52dd119921..4ebd3f3205 100644
--- a/library/cpp/threading/chunk_queue/queue.cpp
+++ b/library/cpp/threading/chunk_queue/queue.cpp
@@ -1 +1 @@
-#include "queue.h"
+#include "queue.h"
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
index fdf4c93f92..55859601a1 100644
--- a/library/cpp/threading/chunk_queue/queue.h
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -1,35 +1,35 @@
-#pragma once
-
-#include <util/datetime/base.h>
-#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
-#include <util/generic/typetraits.h>
-#include <util/generic/vector.h>
-#include <util/generic/ylimits.h>
-#include <util/system/atomic.h>
-#include <util/system/guard.h>
-#include <util/system/spinlock.h>
-#include <util/system/yassert.h>
-
-#include <type_traits>
-#include <utility>
-
-namespace NThreading {
-////////////////////////////////////////////////////////////////////////////////
-// Platform helpers
-
-#if !defined(PLATFORM_CACHE_LINE)
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/vector.h>
+#include <util/generic/ylimits.h>
+#include <util/system/atomic.h>
+#include <util/system/guard.h>
+#include <util/system/spinlock.h>
+#include <util/system/yassert.h>
+
+#include <type_traits>
+#include <utility>
+
+namespace NThreading {
+////////////////////////////////////////////////////////////////////////////////
+// Platform helpers
+
+#if !defined(PLATFORM_CACHE_LINE)
#define PLATFORM_CACHE_LINE 64
-#endif
-
-#if !defined(PLATFORM_PAGE_SIZE)
+#endif
+
+#if !defined(PLATFORM_PAGE_SIZE)
#define PLATFORM_PAGE_SIZE 4 * 1024
-#endif
-
+#endif
+
template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
struct TPadded: public T {
char Pad[PadSize - sizeof(T) % PadSize];
-
+
TPadded() {
static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
Y_UNUSED(Pad);
@@ -43,10 +43,10 @@ namespace NThreading {
Y_UNUSED(Pad);
}
};
-
+
////////////////////////////////////////////////////////////////////////////////
// Type helpers
-
+
namespace NImpl {
template <typename T>
struct TPodTypeHelper {
@@ -54,99 +54,99 @@ namespace NThreading {
static void Write(T* ptr, TT&& value) {
*ptr = value;
}
-
+
static T Read(T* ptr) {
return *ptr;
}
-
+
static void Destroy(T* ptr) {
Y_UNUSED(ptr);
}
};
-
+
template <typename T>
struct TNonPodTypeHelper {
template <typename TT>
static void Write(T* ptr, TT&& value) {
new (ptr) T(std::forward<TT>(value));
}
-
+
static T Read(T* ptr) {
return std::move(*ptr);
}
-
+
static void Destroy(T* ptr) {
(void)ptr; /* Make MSVC happy. */
ptr->~T();
}
};
-
+
template <typename T>
using TTypeHelper = std::conditional_t<
TTypeTraits<T>::IsPod,
TPodTypeHelper<T>,
TNonPodTypeHelper<T>>;
-
+
}
-
+
////////////////////////////////////////////////////////////////////////////////
// One producer/one consumer chunked queue.
-
+
template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
class TOneOneQueue: private TNonCopyable {
using TTypeHelper = NImpl::TTypeHelper<T>;
-
+
struct TChunk;
-
+
struct TChunkHeader {
size_t Count = 0;
TChunk* Next = nullptr;
};
-
+
struct TChunk: public TChunkHeader {
static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
-
+
char Entries[MaxCount * sizeof(T)];
-
+
TChunk() {
Y_UNUSED(Entries); // uninitialized
}
-
+
~TChunk() {
for (size_t i = 0; i < this->Count; ++i) {
TTypeHelper::Destroy(GetPtr(i));
}
- }
-
+ }
+
T* GetPtr(size_t i) {
return (T*)Entries + i;
}
};
-
+
struct TWriterState {
TChunk* Chunk = nullptr;
};
-
+
struct TReaderState {
TChunk* Chunk = nullptr;
size_t Count = 0;
};
-
+
private:
TPadded<TWriterState> Writer;
TPadded<TReaderState> Reader;
-
+
public:
using TItem = T;
-
+
TOneOneQueue() {
Writer.Chunk = Reader.Chunk = new TChunk();
}
-
+
~TOneOneQueue() {
DeleteChunks(Reader.Chunk);
}
-
+
template <typename TT>
void Enqueue(TT&& value) {
T* ptr = PrepareWrite();
@@ -154,7 +154,7 @@ namespace NThreading {
TTypeHelper::Write(ptr, std::forward<TT>(value));
CompleteWrite();
}
-
+
bool Dequeue(T& value) {
if (T* ptr = PrepareRead()) {
value = TTypeHelper::Read(ptr);
@@ -162,17 +162,17 @@ namespace NThreading {
return true;
}
return false;
- }
-
+ }
+
bool IsEmpty() {
return !PrepareRead();
}
-
+
protected:
T* PrepareWrite() {
TChunk* chunk = Writer.Chunk;
Y_ASSERT(chunk && !chunk->Next);
-
+
if (chunk->Count != TChunk::MaxCount) {
return chunk->GetPtr(chunk->Count);
}
@@ -181,41 +181,41 @@ namespace NThreading {
AtomicSet(Writer.Chunk->Next, chunk);
Writer.Chunk = chunk;
return chunk->GetPtr(0);
- }
-
+ }
+
void CompleteWrite() {
AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
}
-
+
T* PrepareRead() {
TChunk* chunk = Reader.Chunk;
Y_ASSERT(chunk);
-
+
for (;;) {
size_t writerCount = AtomicGet(chunk->Count);
if (Reader.Count != writerCount) {
return chunk->GetPtr(Reader.Count);
}
-
+
if (writerCount != TChunk::MaxCount) {
return nullptr;
}
-
+
chunk = AtomicGet(chunk->Next);
if (!chunk) {
return nullptr;
}
-
+
delete Reader.Chunk;
Reader.Chunk = chunk;
Reader.Count = 0;
- }
+ }
}
-
+
void CompleteRead() {
++Reader.Count;
- }
-
+ }
+
private:
static void DeleteChunks(TChunk* chunk) {
while (chunk) {
@@ -223,51 +223,51 @@ namespace NThreading {
delete chunk;
chunk = next;
}
- }
+ }
};
-
+
////////////////////////////////////////////////////////////////////////////////
// Multiple producers/single consumer partitioned queue.
// Provides FIFO guaranties for each producer.
-
+
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
class TManyOneQueue: private TNonCopyable {
using TTypeHelper = NImpl::TTypeHelper<T>;
-
+
struct TEntry {
T Value;
ui64 Tag;
};
-
+
struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
TAtomic WriteLock = 0;
-
+
using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
-
+
using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
};
-
+
private:
union {
TAtomic WriteTag = 0;
char Pad[PLATFORM_CACHE_LINE];
};
-
+
TQueueType Queues[Concurrency];
-
+
public:
using TItem = T;
-
+
template <typename TT>
void Enqueue(TT&& value) {
ui64 tag = NextTag();
while (!TryEnqueue(std::forward<TT>(value), tag)) {
SpinLockPause();
}
- }
-
+ }
+
bool Dequeue(T& value) {
size_t index = 0;
if (TEntry* entry = PrepareRead(index)) {
@@ -276,24 +276,24 @@ namespace NThreading {
return true;
}
return false;
- }
-
+ }
+
bool IsEmpty() {
for (size_t i = 0; i < Concurrency; ++i) {
if (!Queues[i].IsEmpty()) {
return false;
}
- }
+ }
return true;
- }
-
+ }
+
private:
ui64 NextTag() {
// TODO: can we avoid synchronization here? it costs 1.5x performance penalty
// return GetCycleCount();
return AtomicIncrement(WriteTag);
}
-
+
template <typename TT>
bool TryEnqueue(TT&& value, ui64 tag) {
for (size_t i = 0; i < Concurrency; ++i) {
@@ -307,22 +307,22 @@ namespace NThreading {
AtomicUnlock(&queue.WriteLock);
return true;
}
- }
+ }
return false;
- }
-
+ }
+
TEntry* PrepareRead(size_t& index) {
TEntry* entry = nullptr;
ui64 tag = Max();
-
+
for (size_t i = 0; i < Concurrency; ++i) {
- TEntry* e = Queues[i].PrepareRead();
- if (e && e->Tag < tag) {
- index = i;
- entry = e;
- tag = e->Tag;
- }
- }
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
if (entry) {
// need second pass to catch updates within already scanned range
@@ -338,91 +338,91 @@ namespace NThreading {
}
return entry;
- }
+ }
};
-
+
////////////////////////////////////////////////////////////////////////////////
// Concurrent many-many queue with strong FIFO guaranties.
// Writers will not block readers (and vice versa), but will block each other.
-
+
template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
class TManyManyQueue: private TNonCopyable {
private:
TPadded<TLock> WriteLock;
TPadded<TLock> ReadLock;
-
+
TOneOneQueue<T, ChunkSize> Queue;
-
+
public:
using TItem = T;
-
+
template <typename TT>
void Enqueue(TT&& value) {
with_lock (WriteLock) {
Queue.Enqueue(std::forward<TT>(value));
}
- }
-
+ }
+
bool Dequeue(T& value) {
with_lock (ReadLock) {
return Queue.Dequeue(value);
}
- }
-
+ }
+
bool IsEmpty() {
with_lock (ReadLock) {
return Queue.IsEmpty();
}
- }
+ }
};
-
+
////////////////////////////////////////////////////////////////////////////////
// Multiple producers/single consumer partitioned queue.
// Because of random partitioning reordering possible - FIFO not guaranteed!
-
+
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
class TRelaxedManyOneQueue: private TNonCopyable {
struct TQueueType: public TOneOneQueue<T, ChunkSize> {
TAtomic WriteLock = 0;
};
-
+
private:
union {
size_t ReadPos = 0;
char Pad[PLATFORM_CACHE_LINE];
};
-
+
TQueueType Queues[Concurrency];
-
+
public:
using TItem = T;
-
+
template <typename TT>
void Enqueue(TT&& value) {
while (!TryEnqueue(std::forward<TT>(value))) {
SpinLockPause();
}
- }
-
+ }
+
bool Dequeue(T& value) {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[ReadPos++ % Concurrency];
if (queue.Dequeue(value)) {
return true;
}
- }
+ }
return false;
- }
-
+ }
+
bool IsEmpty() {
for (size_t i = 0; i < Concurrency; ++i) {
if (!Queues[i].IsEmpty()) {
return false;
}
- }
+ }
return true;
- }
-
+ }
+
private:
template <typename TT>
bool TryEnqueue(TT&& value) {
@@ -434,15 +434,15 @@ namespace NThreading {
AtomicUnlock(&queue.WriteLock);
return true;
}
- }
+ }
return false;
- }
+ }
};
-
+
////////////////////////////////////////////////////////////////////////////////
// Concurrent many-many partitioned queue.
// Because of random partitioning reordering possible - FIFO not guaranteed!
-
+
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
class TRelaxedManyManyQueue: private TNonCopyable {
struct TQueueType: public TOneOneQueue<T, ChunkSize> {
@@ -454,21 +454,21 @@ namespace NThreading {
TAtomic ReadLock = 0;
char Pad2[PLATFORM_CACHE_LINE];
};
- };
-
+ };
+
private:
TQueueType Queues[Concurrency];
-
+
public:
using TItem = T;
-
+
template <typename TT>
void Enqueue(TT&& value) {
while (!TryEnqueue(std::forward<TT>(value))) {
SpinLockPause();
}
- }
-
+ }
+
bool Dequeue(T& value) {
size_t readPos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
@@ -479,11 +479,11 @@ namespace NThreading {
if (dequeued) {
return true;
}
- }
- }
+ }
+ }
return false;
- }
-
+ }
+
bool IsEmpty() {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
@@ -493,11 +493,11 @@ namespace NThreading {
if (!empty) {
return false;
}
- }
- }
+ }
+ }
return true;
- }
-
+ }
+
private:
template <typename TT>
bool TryEnqueue(TT&& value) {
@@ -509,34 +509,34 @@ namespace NThreading {
AtomicUnlock(&queue.WriteLock);
return true;
}
- }
+ }
return false;
- }
+ }
};
-
+
////////////////////////////////////////////////////////////////////////////////
// Simple wrapper to deal with AutoPtrs
-
+
template <typename T, typename TImpl>
class TAutoQueueBase: private TNonCopyable {
private:
TImpl Impl;
-
+
public:
using TItem = TAutoPtr<T>;
-
+
~TAutoQueueBase() {
TAutoPtr<T> value;
while (Dequeue(value)) {
// do nothing
}
- }
-
+ }
+
void Enqueue(TAutoPtr<T> value) {
Impl.Enqueue(value.Get());
Y_UNUSED(value.Release());
}
-
+
bool Dequeue(TAutoPtr<T>& value) {
T* ptr = nullptr;
if (Impl.Dequeue(ptr)) {
@@ -544,25 +544,25 @@ namespace NThreading {
return true;
}
return false;
- }
-
+ }
+
bool IsEmpty() {
return Impl.IsEmpty();
}
};
-
+
template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
-
+
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
-
+
template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
-
+
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
-
+
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
}
diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp
index dc103202e8..8cb36d8dd1 100644
--- a/library/cpp/threading/chunk_queue/queue_ut.cpp
+++ b/library/cpp/threading/chunk_queue/queue_ut.cpp
@@ -1,202 +1,202 @@
-#include "queue.h"
-
+#include "queue.h"
+
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/generic/set.h>
-
-namespace NThreading {
+
+#include <util/generic/set.h>
+
+namespace NThreading {
////////////////////////////////////////////////////////////////////////////////
-
+
Y_UNIT_TEST_SUITE(TOneOneQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
TOneOneQueue<int> queue;
-
+
int result = 0;
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-
+
Y_UNIT_TEST(ShouldReturnEntries) {
TOneOneQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
-
+
int result = 0;
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 1);
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 2);
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 3);
-
+
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-
+
Y_UNIT_TEST(ShouldStoreMultipleChunks) {
TOneOneQueue<int, 100> queue;
for (int i = 0; i < 1000; ++i) {
queue.Enqueue(i);
- }
-
+ }
+
for (int i = 0; i < 1000; ++i) {
int result = 0;
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, i);
- }
+ }
}
}
;
-
-////////////////////////////////////////////////////////////////////////////////
-
+
+////////////////////////////////////////////////////////////////////////////////
+
Y_UNIT_TEST_SUITE(TManyOneQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
- TManyOneQueue<int> queue;
-
+ TManyOneQueue<int> queue;
+
int result;
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-
+
Y_UNIT_TEST(ShouldReturnEntries) {
TManyOneQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
-
+
int result = 0;
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 1);
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 2);
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 3);
-
+
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
}
;
-
-////////////////////////////////////////////////////////////////////////////////
-
+
+////////////////////////////////////////////////////////////////////////////////
+
Y_UNIT_TEST_SUITE(TManyManyQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
- TManyManyQueue<int> queue;
-
+ TManyManyQueue<int> queue;
+
int result = 0;
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-
+
Y_UNIT_TEST(ShouldReturnEntries) {
TManyManyQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
-
+
int result = 0;
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 1);
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 2);
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT_EQUAL(result, 3);
-
+
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
}
;
-
-////////////////////////////////////////////////////////////////////////////////
-
+
+////////////////////////////////////////////////////////////////////////////////
+
Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
- TRelaxedManyOneQueue<int> queue;
-
+ TRelaxedManyOneQueue<int> queue;
+
int result;
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-
+
Y_UNIT_TEST(ShouldReturnEntries) {
TSet<int> items = {1, 2, 3};
-
+
TRelaxedManyOneQueue<int> queue;
for (int item : items) {
queue.Enqueue(item);
}
-
+
int result = 0;
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT(items.erase(result));
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT(items.erase(result));
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT(items.erase(result));
-
+
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
}
;
-
-////////////////////////////////////////////////////////////////////////////////
-
+
+////////////////////////////////////////////////////////////////////////////////
+
Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){
Y_UNIT_TEST(ShouldBeEmptyAtStart){
- TRelaxedManyManyQueue<int> queue;
-
+ TRelaxedManyManyQueue<int> queue;
+
int result = 0;
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-
+
Y_UNIT_TEST(ShouldReturnEntries) {
TSet<int> items = {1, 2, 3};
-
+
TRelaxedManyManyQueue<int> queue;
for (int item : items) {
queue.Enqueue(item);
}
-
+
int result = 0;
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT(items.erase(result));
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT(items.erase(result));
-
+
UNIT_ASSERT(!queue.IsEmpty());
UNIT_ASSERT(queue.Dequeue(result));
UNIT_ASSERT(items.erase(result));
-
+
UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
diff --git a/library/cpp/threading/chunk_queue/readme.txt b/library/cpp/threading/chunk_queue/readme.txt
index 104a8ec744..7c9f046a86 100644
--- a/library/cpp/threading/chunk_queue/readme.txt
+++ b/library/cpp/threading/chunk_queue/readme.txt
@@ -1,60 +1,60 @@
-vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64
-2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000
-2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4
-2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4
-2016-05-08T11:49:56.729502Z INFO: starting consumers...
-2016-05-08T11:49:56.729621Z INFO: starting producers...
-2016-05-08T11:49:56.729711Z INFO: wait for producers...
-2016-05-08T11:50:14.650803Z INFO: wait for consumers...
-2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds
-2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds
-2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration)
-2016-05-08T11:50:14.650913Z INFO: starting consumers...
-2016-05-08T11:50:14.651028Z INFO: starting producers...
-2016-05-08T11:50:14.651122Z INFO: wait for producers...
-2016-05-08T11:50:31.426378Z INFO: wait for consumers...
-2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds
-2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds
-2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration)
-2016-05-08T11:50:31.426584Z INFO: starting consumers...
-2016-05-08T11:50:31.426655Z INFO: starting producers...
-2016-05-08T11:50:31.426749Z INFO: wait for producers...
-2016-05-08T11:50:40.578425Z INFO: wait for consumers...
-2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds
-2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds
-2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration)
-2016-05-08T11:50:40.578670Z INFO: starting consumers...
-2016-05-08T11:50:40.578742Z INFO: starting producers...
-2016-05-08T11:50:40.578893Z INFO: wait for producers...
-2016-05-08T11:50:47.447686Z INFO: wait for consumers...
-2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds
-2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds
-2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration)
-2016-05-08T11:50:47.447901Z INFO: starting consumers...
-2016-05-08T11:50:47.447967Z INFO: starting producers...
-2016-05-08T11:50:47.448058Z INFO: wait for producers...
-2016-05-08T11:50:50.469710Z INFO: wait for consumers...
-2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds
-2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds
-2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration)
-2016-05-08T11:50:50.469947Z INFO: starting consumers...
-2016-05-08T11:50:50.470012Z INFO: starting producers...
-2016-05-08T11:50:50.470104Z INFO: wait for producers...
-2016-05-08T11:50:53.139964Z INFO: wait for consumers...
-2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds
-2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds
-2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration)
-2016-05-08T11:50:53.140206Z INFO: starting consumers...
-2016-05-08T11:50:53.140281Z INFO: starting producers...
-2016-05-08T11:50:53.140371Z INFO: wait for producers...
-2016-05-08T11:50:59.067812Z INFO: wait for consumers...
-2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds
-2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds
-2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration)
-2016-05-08T11:50:59.068068Z INFO: starting consumers...
-2016-05-08T11:50:59.068179Z INFO: starting producers...
-2016-05-08T11:50:59.068288Z INFO: wait for producers...
-2016-05-08T11:51:03.427416Z INFO: wait for consumers...
-2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds
-2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds
-2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration)
+vskipin@dev-kiwi09:~$ ./rtmr-queue-perf -w 4 -r 4 AdaptiveLock64 Mutex64 LFManyMany64 FastLFManyMany64 LFManyOne64 FastLFManyOne64 ManyMany64 ManyOne64
+2016-05-08T11:49:56.729254Z INFO: [-i] Iterations: 10000000
+2016-05-08T11:49:56.729319Z INFO: [-r] NumReaders: 4
+2016-05-08T11:49:56.729355Z INFO: [-w] NumWriters: 4
+2016-05-08T11:49:56.729502Z INFO: starting consumers...
+2016-05-08T11:49:56.729621Z INFO: starting producers...
+2016-05-08T11:49:56.729711Z INFO: wait for producers...
+2016-05-08T11:50:14.650803Z INFO: wait for consumers...
+2016-05-08T11:50:14.650859Z INFO: average producer time: 15.96846675 seconds
+2016-05-08T11:50:14.650885Z INFO: average consumer time: 17.9209995 seconds
+2016-05-08T11:50:14.650897Z INFO: test AdaptiveLock64 duration: 17.921395s (0.448034875us per iteration)
+2016-05-08T11:50:14.650913Z INFO: starting consumers...
+2016-05-08T11:50:14.651028Z INFO: starting producers...
+2016-05-08T11:50:14.651122Z INFO: wait for producers...
+2016-05-08T11:50:31.426378Z INFO: wait for consumers...
+2016-05-08T11:50:31.426447Z INFO: average producer time: 15.58770475 seconds
+2016-05-08T11:50:31.426491Z INFO: average consumer time: 16.775301 seconds
+2016-05-08T11:50:31.426527Z INFO: test Mutex64 duration: 16.775614s (0.41939035us per iteration)
+2016-05-08T11:50:31.426584Z INFO: starting consumers...
+2016-05-08T11:50:31.426655Z INFO: starting producers...
+2016-05-08T11:50:31.426749Z INFO: wait for producers...
+2016-05-08T11:50:40.578425Z INFO: wait for consumers...
+2016-05-08T11:50:40.578523Z INFO: average producer time: 8.69236075 seconds
+2016-05-08T11:50:40.578577Z INFO: average consumer time: 9.15165125 seconds
+2016-05-08T11:50:40.578617Z INFO: test LFManyMany64 duration: 9.152033s (0.228800825us per iteration)
+2016-05-08T11:50:40.578670Z INFO: starting consumers...
+2016-05-08T11:50:40.578742Z INFO: starting producers...
+2016-05-08T11:50:40.578893Z INFO: wait for producers...
+2016-05-08T11:50:47.447686Z INFO: wait for consumers...
+2016-05-08T11:50:47.447758Z INFO: average producer time: 6.81136025 seconds
+2016-05-08T11:50:47.447793Z INFO: average consumer time: 6.86875825 seconds
+2016-05-08T11:50:47.447834Z INFO: test FastLFManyMany64 duration: 6.869165s (0.171729125us per iteration)
+2016-05-08T11:50:47.447901Z INFO: starting consumers...
+2016-05-08T11:50:47.447967Z INFO: starting producers...
+2016-05-08T11:50:47.448058Z INFO: wait for producers...
+2016-05-08T11:50:50.469710Z INFO: wait for consumers...
+2016-05-08T11:50:50.469798Z INFO: average producer time: 2.9915505 seconds
+2016-05-08T11:50:50.469848Z INFO: average consumer time: 3.02161675 seconds
+2016-05-08T11:50:50.469883Z INFO: test LFManyOne64 duration: 3.021983s (0.075549575us per iteration)
+2016-05-08T11:50:50.469947Z INFO: starting consumers...
+2016-05-08T11:50:50.470012Z INFO: starting producers...
+2016-05-08T11:50:50.470104Z INFO: wait for producers...
+2016-05-08T11:50:53.139964Z INFO: wait for consumers...
+2016-05-08T11:50:53.140050Z INFO: average producer time: 2.5656465 seconds
+2016-05-08T11:50:53.140102Z INFO: average consumer time: 2.6697755 seconds
+2016-05-08T11:50:53.140149Z INFO: test FastLFManyOne64 duration: 2.670202s (0.06675505us per iteration)
+2016-05-08T11:50:53.140206Z INFO: starting consumers...
+2016-05-08T11:50:53.140281Z INFO: starting producers...
+2016-05-08T11:50:53.140371Z INFO: wait for producers...
+2016-05-08T11:50:59.067812Z INFO: wait for consumers...
+2016-05-08T11:50:59.067895Z INFO: average producer time: 5.8925505 seconds
+2016-05-08T11:50:59.067946Z INFO: average consumer time: 5.9273365 seconds
+2016-05-08T11:50:59.067978Z INFO: test ManyMany64 duration: 5.927773s (0.148194325us per iteration)
+2016-05-08T11:50:59.068068Z INFO: starting consumers...
+2016-05-08T11:50:59.068179Z INFO: starting producers...
+2016-05-08T11:50:59.068288Z INFO: wait for producers...
+2016-05-08T11:51:03.427416Z INFO: wait for consumers...
+2016-05-08T11:51:03.427514Z INFO: average producer time: 4.1055505 seconds
+2016-05-08T11:51:03.427560Z INFO: average consumer time: 4.35914975 seconds
+2016-05-08T11:51:03.427596Z INFO: test ManyOne64 duration: 4.359529s (0.108988225us per iteration)
diff --git a/library/cpp/threading/chunk_queue/ut/ya.make b/library/cpp/threading/chunk_queue/ut/ya.make
index d69e219f66..a35ed6bc4b 100644
--- a/library/cpp/threading/chunk_queue/ut/ya.make
+++ b/library/cpp/threading/chunk_queue/ut/ya.make
@@ -1,9 +1,9 @@
UNITTEST_FOR(library/cpp/threading/chunk_queue)
-
+
OWNER(g:rtmr)
-
-SRCS(
- queue_ut.cpp
-)
-
-END()
+
+SRCS(
+ queue_ut.cpp
+)
+
+END()
diff --git a/library/cpp/threading/chunk_queue/ya.make b/library/cpp/threading/chunk_queue/ya.make
index 7e6ead7b36..2f883140ba 100644
--- a/library/cpp/threading/chunk_queue/ya.make
+++ b/library/cpp/threading/chunk_queue/ya.make
@@ -1,9 +1,9 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(g:rtmr)
-
-SRCS(
- queue.cpp
-)
-
-END()
+
+SRCS(
+ queue.cpp
+)
+
+END()