aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/chunk_queue/queue.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/threading/chunk_queue/queue.h
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/chunk_queue/queue.h')
-rw-r--r--library/cpp/threading/chunk_queue/queue.h958
1 files changed, 479 insertions, 479 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
index 55859601a1..1959b0258c 100644
--- a/library/cpp/threading/chunk_queue/queue.h
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -19,21 +19,21 @@ namespace NThreading {
// Platform helpers
#if !defined(PLATFORM_CACHE_LINE)
-#define PLATFORM_CACHE_LINE 64
+#define PLATFORM_CACHE_LINE 64
#endif
#if !defined(PLATFORM_PAGE_SIZE)
-#define PLATFORM_PAGE_SIZE 4 * 1024
+#define PLATFORM_PAGE_SIZE 4 * 1024
#endif
- template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
- struct TPadded: public T {
- char Pad[PadSize - sizeof(T) % PadSize];
+ template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
+ struct TPadded: public T {
+ char Pad[PadSize - sizeof(T) % PadSize];
- TPadded() {
- static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
- Y_UNUSED(Pad);
- }
+ TPadded() {
+ static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
+ Y_UNUSED(Pad);
+ }
template<typename... Args>
TPadded(Args&&... args)
@@ -42,280 +42,280 @@ namespace NThreading {
static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
Y_UNUSED(Pad);
}
- };
-
- ////////////////////////////////////////////////////////////////////////////////
- // Type helpers
-
- namespace NImpl {
- template <typename T>
- struct TPodTypeHelper {
- template <typename TT>
- static void Write(T* ptr, TT&& value) {
- *ptr = value;
- }
-
- static T Read(T* ptr) {
- return *ptr;
- }
-
- static void Destroy(T* ptr) {
- Y_UNUSED(ptr);
- }
- };
-
- template <typename T>
- struct TNonPodTypeHelper {
- template <typename TT>
- static void Write(T* ptr, TT&& value) {
- new (ptr) T(std::forward<TT>(value));
- }
-
- static T Read(T* ptr) {
- return std::move(*ptr);
- }
-
- static void Destroy(T* ptr) {
- (void)ptr; /* Make MSVC happy. */
- ptr->~T();
- }
- };
-
- template <typename T>
- using TTypeHelper = std::conditional_t<
- TTypeTraits<T>::IsPod,
- TPodTypeHelper<T>,
- TNonPodTypeHelper<T>>;
-
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- // One producer/one consumer chunked queue.
-
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TOneOneQueue: private TNonCopyable {
- using TTypeHelper = NImpl::TTypeHelper<T>;
-
- struct TChunk;
-
- struct TChunkHeader {
- size_t Count = 0;
- TChunk* Next = nullptr;
- };
-
- struct TChunk: public TChunkHeader {
- static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
-
- char Entries[MaxCount * sizeof(T)];
-
- TChunk() {
- Y_UNUSED(Entries); // uninitialized
- }
-
- ~TChunk() {
- for (size_t i = 0; i < this->Count; ++i) {
- TTypeHelper::Destroy(GetPtr(i));
- }
- }
-
- T* GetPtr(size_t i) {
- return (T*)Entries + i;
- }
- };
-
- struct TWriterState {
- TChunk* Chunk = nullptr;
- };
-
- struct TReaderState {
- TChunk* Chunk = nullptr;
- size_t Count = 0;
- };
-
- private:
- TPadded<TWriterState> Writer;
- TPadded<TReaderState> Reader;
-
- public:
- using TItem = T;
-
- TOneOneQueue() {
- Writer.Chunk = Reader.Chunk = new TChunk();
- }
-
- ~TOneOneQueue() {
- DeleteChunks(Reader.Chunk);
- }
-
- template <typename TT>
- void Enqueue(TT&& value) {
- T* ptr = PrepareWrite();
- Y_ASSERT(ptr);
- TTypeHelper::Write(ptr, std::forward<TT>(value));
- CompleteWrite();
- }
-
- bool Dequeue(T& value) {
- if (T* ptr = PrepareRead()) {
- value = TTypeHelper::Read(ptr);
- CompleteRead();
- return true;
- }
- return false;
- }
-
- bool IsEmpty() {
- return !PrepareRead();
- }
-
- protected:
- T* PrepareWrite() {
- TChunk* chunk = Writer.Chunk;
- Y_ASSERT(chunk && !chunk->Next);
-
- if (chunk->Count != TChunk::MaxCount) {
- return chunk->GetPtr(chunk->Count);
- }
-
- chunk = new TChunk();
- AtomicSet(Writer.Chunk->Next, chunk);
- Writer.Chunk = chunk;
- return chunk->GetPtr(0);
- }
-
- void CompleteWrite() {
- AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
- }
-
- T* PrepareRead() {
- TChunk* chunk = Reader.Chunk;
- Y_ASSERT(chunk);
-
- for (;;) {
- size_t writerCount = AtomicGet(chunk->Count);
- if (Reader.Count != writerCount) {
- return chunk->GetPtr(Reader.Count);
- }
-
- if (writerCount != TChunk::MaxCount) {
- return nullptr;
- }
-
- chunk = AtomicGet(chunk->Next);
- if (!chunk) {
- return nullptr;
- }
-
- delete Reader.Chunk;
- Reader.Chunk = chunk;
- Reader.Count = 0;
- }
- }
+ };
- void CompleteRead() {
- ++Reader.Count;
- }
+ ////////////////////////////////////////////////////////////////////////////////
+ // Type helpers
- private:
- static void DeleteChunks(TChunk* chunk) {
- while (chunk) {
- TChunk* next = chunk->Next;
- delete chunk;
- chunk = next;
- }
- }
- };
+ namespace NImpl {
+ template <typename T>
+ struct TPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ *ptr = value;
+ }
- ////////////////////////////////////////////////////////////////////////////////
- // Multiple producers/single consumer partitioned queue.
- // Provides FIFO guaranties for each producer.
+ static T Read(T* ptr) {
+ return *ptr;
+ }
+
+ static void Destroy(T* ptr) {
+ Y_UNUSED(ptr);
+ }
+ };
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TManyOneQueue: private TNonCopyable {
- using TTypeHelper = NImpl::TTypeHelper<T>;
+ template <typename T>
+ struct TNonPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ new (ptr) T(std::forward<TT>(value));
+ }
+
+ static T Read(T* ptr) {
+ return std::move(*ptr);
+ }
+
+ static void Destroy(T* ptr) {
+ (void)ptr; /* Make MSVC happy. */
+ ptr->~T();
+ }
+ };
+
+ template <typename T>
+ using TTypeHelper = std::conditional_t<
+ TTypeTraits<T>::IsPod,
+ TPodTypeHelper<T>,
+ TNonPodTypeHelper<T>>;
+
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // One producer/one consumer chunked queue.
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TOneOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TChunk;
+
+ struct TChunkHeader {
+ size_t Count = 0;
+ TChunk* Next = nullptr;
+ };
+
+ struct TChunk: public TChunkHeader {
+ static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
+
+ char Entries[MaxCount * sizeof(T)];
+
+ TChunk() {
+ Y_UNUSED(Entries); // uninitialized
+ }
+
+ ~TChunk() {
+ for (size_t i = 0; i < this->Count; ++i) {
+ TTypeHelper::Destroy(GetPtr(i));
+ }
+ }
+
+ T* GetPtr(size_t i) {
+ return (T*)Entries + i;
+ }
+ };
+
+ struct TWriterState {
+ TChunk* Chunk = nullptr;
+ };
+
+ struct TReaderState {
+ TChunk* Chunk = nullptr;
+ size_t Count = 0;
+ };
+
+ private:
+ TPadded<TWriterState> Writer;
+ TPadded<TReaderState> Reader;
+
+ public:
+ using TItem = T;
+
+ TOneOneQueue() {
+ Writer.Chunk = Reader.Chunk = new TChunk();
+ }
+
+ ~TOneOneQueue() {
+ DeleteChunks(Reader.Chunk);
+ }
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ T* ptr = PrepareWrite();
+ Y_ASSERT(ptr);
+ TTypeHelper::Write(ptr, std::forward<TT>(value));
+ CompleteWrite();
+ }
+
+ bool Dequeue(T& value) {
+ if (T* ptr = PrepareRead()) {
+ value = TTypeHelper::Read(ptr);
+ CompleteRead();
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ return !PrepareRead();
+ }
- struct TEntry {
- T Value;
- ui64 Tag;
- };
-
- struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
- TAtomic WriteLock = 0;
+ protected:
+ T* PrepareWrite() {
+ TChunk* chunk = Writer.Chunk;
+ Y_ASSERT(chunk && !chunk->Next);
+
+ if (chunk->Count != TChunk::MaxCount) {
+ return chunk->GetPtr(chunk->Count);
+ }
+
+ chunk = new TChunk();
+ AtomicSet(Writer.Chunk->Next, chunk);
+ Writer.Chunk = chunk;
+ return chunk->GetPtr(0);
+ }
+
+ void CompleteWrite() {
+ AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
+ }
- using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
- using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
-
- using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
- using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
- };
+ T* PrepareRead() {
+ TChunk* chunk = Reader.Chunk;
+ Y_ASSERT(chunk);
- private:
- union {
- TAtomic WriteTag = 0;
- char Pad[PLATFORM_CACHE_LINE];
- };
-
- TQueueType Queues[Concurrency];
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- ui64 tag = NextTag();
- while (!TryEnqueue(std::forward<TT>(value), tag)) {
- SpinLockPause();
- }
- }
-
- bool Dequeue(T& value) {
- size_t index = 0;
- if (TEntry* entry = PrepareRead(index)) {
- value = TTypeHelper::Read(&entry->Value);
- Queues[index].CompleteRead();
- return true;
- }
- return false;
- }
+ for (;;) {
+ size_t writerCount = AtomicGet(chunk->Count);
+ if (Reader.Count != writerCount) {
+ return chunk->GetPtr(Reader.Count);
+ }
+
+ if (writerCount != TChunk::MaxCount) {
+ return nullptr;
+ }
+
+ chunk = AtomicGet(chunk->Next);
+ if (!chunk) {
+ return nullptr;
+ }
+
+ delete Reader.Chunk;
+ Reader.Chunk = chunk;
+ Reader.Count = 0;
+ }
+ }
+
+ void CompleteRead() {
+ ++Reader.Count;
+ }
+
+ private:
+ static void DeleteChunks(TChunk* chunk) {
+ while (chunk) {
+ TChunk* next = chunk->Next;
+ delete chunk;
+ chunk = next;
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Provides FIFO guaranties for each producer.
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TManyOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TEntry {
+ T Value;
+ ui64 Tag;
+ };
+
+ struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
+ TAtomic WriteLock = 0;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
+ };
+
+ private:
+ union {
+ TAtomic WriteTag = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ ui64 tag = NextTag();
+ while (!TryEnqueue(std::forward<TT>(value), tag)) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t index = 0;
+ if (TEntry* entry = PrepareRead(index)) {
+ value = TTypeHelper::Read(&entry->Value);
+ Queues[index].CompleteRead();
+ return true;
+ }
+ return false;
+ }
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- if (!Queues[i].IsEmpty()) {
- return false;
- }
- }
- return true;
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
}
- private:
- ui64 NextTag() {
- // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
- // return GetCycleCount();
- return AtomicIncrement(WriteTag);
- }
+ private:
+ ui64 NextTag() {
+ // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
+ // return GetCycleCount();
+ return AtomicIncrement(WriteTag);
+ }
- template <typename TT>
- bool TryEnqueue(TT&& value, ui64 tag) {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- TEntry* entry = queue.PrepareWrite();
- Y_ASSERT(entry);
- TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
- entry->Tag = tag;
- queue.CompleteWrite();
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
+ template <typename TT>
+ bool TryEnqueue(TT&& value, ui64 tag) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ TEntry* entry = queue.PrepareWrite();
+ Y_ASSERT(entry);
+ TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
+ entry->Tag = tag;
+ queue.CompleteWrite();
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
}
- return false;
+ return false;
}
- TEntry* PrepareRead(size_t& index) {
- TEntry* entry = nullptr;
- ui64 tag = Max();
+ TEntry* PrepareRead(size_t& index) {
+ TEntry* entry = nullptr;
+ ui64 tag = Max();
- for (size_t i = 0; i < Concurrency; ++i) {
+ for (size_t i = 0; i < Concurrency; ++i) {
TEntry* e = Queues[i].PrepareRead();
if (e && e->Tag < tag) {
index = i;
@@ -323,246 +323,246 @@ namespace NThreading {
tag = e->Tag;
}
}
-
- if (entry) {
- // need second pass to catch updates within already scanned range
- size_t candidate = index;
- for (size_t i = 0; i < candidate; ++i) {
- TEntry* e = Queues[i].PrepareRead();
- if (e && e->Tag < tag) {
- index = i;
- entry = e;
- tag = e->Tag;
- }
- }
- }
-
- return entry;
+
+ if (entry) {
+ // need second pass to catch updates within already scanned range
+ size_t candidate = index;
+ for (size_t i = 0; i < candidate; ++i) {
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
+ }
+
+ return entry;
}
- };
+ };
- ////////////////////////////////////////////////////////////////////////////////
- // Concurrent many-many queue with strong FIFO guaranties.
- // Writers will not block readers (and vice versa), but will block each other.
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many queue with strong FIFO guaranties.
+ // Writers will not block readers (and vice versa), but will block each other.
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
- class TManyManyQueue: private TNonCopyable {
- private:
- TPadded<TLock> WriteLock;
- TPadded<TLock> ReadLock;
-
- TOneOneQueue<T, ChunkSize> Queue;
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- with_lock (WriteLock) {
- Queue.Enqueue(std::forward<TT>(value));
- }
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ class TManyManyQueue: private TNonCopyable {
+ private:
+ TPadded<TLock> WriteLock;
+ TPadded<TLock> ReadLock;
+
+ TOneOneQueue<T, ChunkSize> Queue;
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ with_lock (WriteLock) {
+ Queue.Enqueue(std::forward<TT>(value));
+ }
+ }
+
+ bool Dequeue(T& value) {
+ with_lock (ReadLock) {
+ return Queue.Dequeue(value);
+ }
+ }
+
+ bool IsEmpty() {
+ with_lock (ReadLock) {
+ return Queue.IsEmpty();
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyOneQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ TAtomic WriteLock = 0;
+ };
+
+ private:
+ union {
+ size_t ReadPos = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[ReadPos++ % Concurrency];
+ if (queue.Dequeue(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
}
+ };
- bool Dequeue(T& value) {
- with_lock (ReadLock) {
- return Queue.Dequeue(value);
- }
- }
-
- bool IsEmpty() {
- with_lock (ReadLock) {
- return Queue.IsEmpty();
- }
- }
- };
-
- ////////////////////////////////////////////////////////////////////////////////
- // Multiple producers/single consumer partitioned queue.
- // Because of random partitioning reordering possible - FIFO not guaranteed!
-
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TRelaxedManyOneQueue: private TNonCopyable {
- struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- TAtomic WriteLock = 0;
- };
-
- private:
- union {
- size_t ReadPos = 0;
- char Pad[PLATFORM_CACHE_LINE];
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyManyQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ union {
+ TAtomic WriteLock = 0;
+ char Pad1[PLATFORM_CACHE_LINE];
+ };
+ union {
+ TAtomic ReadLock = 0;
+ char Pad2[PLATFORM_CACHE_LINE];
+ };
};
- TQueueType Queues[Concurrency];
-
- public:
- using TItem = T;
-
- template <typename TT>
- void Enqueue(TT&& value) {
- while (!TryEnqueue(std::forward<TT>(value))) {
- SpinLockPause();
- }
- }
-
- bool Dequeue(T& value) {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[ReadPos++ % Concurrency];
- if (queue.Dequeue(value)) {
- return true;
+ private:
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t readPos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[readPos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool dequeued = queue.Dequeue(value);
+ AtomicUnlock(&queue.ReadLock);
+ if (dequeued) {
+ return true;
+ }
}
}
- return false;
+ return false;
}
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- if (!Queues[i].IsEmpty()) {
- return false;
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool empty = queue.IsEmpty();
+ AtomicUnlock(&queue.ReadLock);
+ if (!empty) {
+ return false;
+ }
}
}
- return true;
+ return true;
}
- private:
- template <typename TT>
- bool TryEnqueue(TT&& value) {
- size_t writePos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
}
- return false;
+ return false;
}
- };
-
- ////////////////////////////////////////////////////////////////////////////////
- // Concurrent many-many partitioned queue.
- // Because of random partitioning reordering possible - FIFO not guaranteed!
-
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- class TRelaxedManyManyQueue: private TNonCopyable {
- struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- union {
- TAtomic WriteLock = 0;
- char Pad1[PLATFORM_CACHE_LINE];
- };
- union {
- TAtomic ReadLock = 0;
- char Pad2[PLATFORM_CACHE_LINE];
- };
- };
+ };
- private:
- TQueueType Queues[Concurrency];
+ ////////////////////////////////////////////////////////////////////////////////
+ // Simple wrapper to deal with AutoPtrs
- public:
- using TItem = T;
+ template <typename T, typename TImpl>
+ class TAutoQueueBase: private TNonCopyable {
+ private:
+ TImpl Impl;
- template <typename TT>
- void Enqueue(TT&& value) {
- while (!TryEnqueue(std::forward<TT>(value))) {
- SpinLockPause();
- }
- }
+ public:
+ using TItem = TAutoPtr<T>;
- bool Dequeue(T& value) {
- size_t readPos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[readPos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
- bool dequeued = queue.Dequeue(value);
- AtomicUnlock(&queue.ReadLock);
- if (dequeued) {
- return true;
- }
- }
- }
- return false;
+ ~TAutoQueueBase() {
+ TAutoPtr<T> value;
+ while (Dequeue(value)) {
+ // do nothing
+ }
}
- bool IsEmpty() {
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
- bool empty = queue.IsEmpty();
- AtomicUnlock(&queue.ReadLock);
- if (!empty) {
- return false;
- }
- }
- }
- return true;
- }
-
- private:
- template <typename TT>
- bool TryEnqueue(TT&& value) {
- size_t writePos = GetCycleCount();
- for (size_t i = 0; i < Concurrency; ++i) {
- TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
- queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
- return true;
- }
- }
- return false;
- }
- };
-
- ////////////////////////////////////////////////////////////////////////////////
- // Simple wrapper to deal with AutoPtrs
-
- template <typename T, typename TImpl>
- class TAutoQueueBase: private TNonCopyable {
- private:
- TImpl Impl;
-
- public:
- using TItem = TAutoPtr<T>;
-
- ~TAutoQueueBase() {
- TAutoPtr<T> value;
- while (Dequeue(value)) {
- // do nothing
- }
- }
-
- void Enqueue(TAutoPtr<T> value) {
- Impl.Enqueue(value.Get());
+ void Enqueue(TAutoPtr<T> value) {
+ Impl.Enqueue(value.Get());
Y_UNUSED(value.Release());
- }
+ }
- bool Dequeue(TAutoPtr<T>& value) {
- T* ptr = nullptr;
- if (Impl.Dequeue(ptr)) {
- value.Reset(ptr);
- return true;
- }
- return false;
+ bool Dequeue(TAutoPtr<T>& value) {
+ T* ptr = nullptr;
+ if (Impl.Dequeue(ptr)) {
+ value.Reset(ptr);
+ return true;
+ }
+ return false;
}
- bool IsEmpty() {
- return Impl.IsEmpty();
- }
- };
+ bool IsEmpty() {
+ return Impl.IsEmpty();
+ }
+ };
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
- template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
- using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
- template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
- using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
-}
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
+}