aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/chunk_queue/queue.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/chunk_queue/queue.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/chunk_queue/queue.h')
-rw-r--r--library/cpp/threading/chunk_queue/queue.h568
1 files changed, 568 insertions, 0 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
new file mode 100644
index 0000000000..55859601a1
--- /dev/null
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -0,0 +1,568 @@
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/vector.h>
+#include <util/generic/ylimits.h>
+#include <util/system/atomic.h>
+#include <util/system/guard.h>
+#include <util/system/spinlock.h>
+#include <util/system/yassert.h>
+
+#include <type_traits>
+#include <utility>
+
+namespace NThreading {
+////////////////////////////////////////////////////////////////////////////////
+// Platform helpers
+
+#if !defined(PLATFORM_CACHE_LINE)
+#define PLATFORM_CACHE_LINE 64
+#endif
+
+#if !defined(PLATFORM_PAGE_SIZE)
+#define PLATFORM_PAGE_SIZE 4 * 1024
+#endif
+
+ template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
+ struct TPadded: public T {
+ char Pad[PadSize - sizeof(T) % PadSize];
+
+ TPadded() {
+ static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
+ Y_UNUSED(Pad);
+ }
+
+ template<typename... Args>
+ TPadded(Args&&... args)
+ : T(std::forward<Args>(args)...)
+ {
+ static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
+ Y_UNUSED(Pad);
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Type helpers
+
+ namespace NImpl {
+ template <typename T>
+ struct TPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ *ptr = value;
+ }
+
+ static T Read(T* ptr) {
+ return *ptr;
+ }
+
+ static void Destroy(T* ptr) {
+ Y_UNUSED(ptr);
+ }
+ };
+
+ template <typename T>
+ struct TNonPodTypeHelper {
+ template <typename TT>
+ static void Write(T* ptr, TT&& value) {
+ new (ptr) T(std::forward<TT>(value));
+ }
+
+ static T Read(T* ptr) {
+ return std::move(*ptr);
+ }
+
+ static void Destroy(T* ptr) {
+ (void)ptr; /* Make MSVC happy. */
+ ptr->~T();
+ }
+ };
+
+ template <typename T>
+ using TTypeHelper = std::conditional_t<
+ TTypeTraits<T>::IsPod,
+ TPodTypeHelper<T>,
+ TNonPodTypeHelper<T>>;
+
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // One producer/one consumer chunked queue.
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TOneOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TChunk;
+
+ struct TChunkHeader {
+ size_t Count = 0;
+ TChunk* Next = nullptr;
+ };
+
+ struct TChunk: public TChunkHeader {
+ static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
+
+ char Entries[MaxCount * sizeof(T)];
+
+ TChunk() {
+ Y_UNUSED(Entries); // uninitialized
+ }
+
+ ~TChunk() {
+ for (size_t i = 0; i < this->Count; ++i) {
+ TTypeHelper::Destroy(GetPtr(i));
+ }
+ }
+
+ T* GetPtr(size_t i) {
+ return (T*)Entries + i;
+ }
+ };
+
+ struct TWriterState {
+ TChunk* Chunk = nullptr;
+ };
+
+ struct TReaderState {
+ TChunk* Chunk = nullptr;
+ size_t Count = 0;
+ };
+
+ private:
+ TPadded<TWriterState> Writer;
+ TPadded<TReaderState> Reader;
+
+ public:
+ using TItem = T;
+
+ TOneOneQueue() {
+ Writer.Chunk = Reader.Chunk = new TChunk();
+ }
+
+ ~TOneOneQueue() {
+ DeleteChunks(Reader.Chunk);
+ }
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ T* ptr = PrepareWrite();
+ Y_ASSERT(ptr);
+ TTypeHelper::Write(ptr, std::forward<TT>(value));
+ CompleteWrite();
+ }
+
+ bool Dequeue(T& value) {
+ if (T* ptr = PrepareRead()) {
+ value = TTypeHelper::Read(ptr);
+ CompleteRead();
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ return !PrepareRead();
+ }
+
+ protected:
+ T* PrepareWrite() {
+ TChunk* chunk = Writer.Chunk;
+ Y_ASSERT(chunk && !chunk->Next);
+
+ if (chunk->Count != TChunk::MaxCount) {
+ return chunk->GetPtr(chunk->Count);
+ }
+
+ chunk = new TChunk();
+ AtomicSet(Writer.Chunk->Next, chunk);
+ Writer.Chunk = chunk;
+ return chunk->GetPtr(0);
+ }
+
+ void CompleteWrite() {
+ AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
+ }
+
+ T* PrepareRead() {
+ TChunk* chunk = Reader.Chunk;
+ Y_ASSERT(chunk);
+
+ for (;;) {
+ size_t writerCount = AtomicGet(chunk->Count);
+ if (Reader.Count != writerCount) {
+ return chunk->GetPtr(Reader.Count);
+ }
+
+ if (writerCount != TChunk::MaxCount) {
+ return nullptr;
+ }
+
+ chunk = AtomicGet(chunk->Next);
+ if (!chunk) {
+ return nullptr;
+ }
+
+ delete Reader.Chunk;
+ Reader.Chunk = chunk;
+ Reader.Count = 0;
+ }
+ }
+
+ void CompleteRead() {
+ ++Reader.Count;
+ }
+
+ private:
+ static void DeleteChunks(TChunk* chunk) {
+ while (chunk) {
+ TChunk* next = chunk->Next;
+ delete chunk;
+ chunk = next;
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Provides FIFO guaranties for each producer.
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TManyOneQueue: private TNonCopyable {
+ using TTypeHelper = NImpl::TTypeHelper<T>;
+
+ struct TEntry {
+ T Value;
+ ui64 Tag;
+ };
+
+ struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
+ TAtomic WriteLock = 0;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
+
+ using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
+ using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
+ };
+
+ private:
+ union {
+ TAtomic WriteTag = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ ui64 tag = NextTag();
+ while (!TryEnqueue(std::forward<TT>(value), tag)) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t index = 0;
+ if (TEntry* entry = PrepareRead(index)) {
+ value = TTypeHelper::Read(&entry->Value);
+ Queues[index].CompleteRead();
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ ui64 NextTag() {
+ // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
+ // return GetCycleCount();
+ return AtomicIncrement(WriteTag);
+ }
+
+ template <typename TT>
+ bool TryEnqueue(TT&& value, ui64 tag) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ TEntry* entry = queue.PrepareWrite();
+ Y_ASSERT(entry);
+ TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
+ entry->Tag = tag;
+ queue.CompleteWrite();
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ TEntry* PrepareRead(size_t& index) {
+ TEntry* entry = nullptr;
+ ui64 tag = Max();
+
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
+
+ if (entry) {
+ // need second pass to catch updates within already scanned range
+ size_t candidate = index;
+ for (size_t i = 0; i < candidate; ++i) {
+ TEntry* e = Queues[i].PrepareRead();
+ if (e && e->Tag < tag) {
+ index = i;
+ entry = e;
+ tag = e->Tag;
+ }
+ }
+ }
+
+ return entry;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many queue with strong FIFO guaranties.
+ // Writers will not block readers (and vice versa), but will block each other.
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ class TManyManyQueue: private TNonCopyable {
+ private:
+ TPadded<TLock> WriteLock;
+ TPadded<TLock> ReadLock;
+
+ TOneOneQueue<T, ChunkSize> Queue;
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ with_lock (WriteLock) {
+ Queue.Enqueue(std::forward<TT>(value));
+ }
+ }
+
+ bool Dequeue(T& value) {
+ with_lock (ReadLock) {
+ return Queue.Dequeue(value);
+ }
+ }
+
+ bool IsEmpty() {
+ with_lock (ReadLock) {
+ return Queue.IsEmpty();
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Multiple producers/single consumer partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyOneQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ TAtomic WriteLock = 0;
+ };
+
+ private:
+ union {
+ size_t ReadPos = 0;
+ char Pad[PLATFORM_CACHE_LINE];
+ };
+
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[ReadPos++ % Concurrency];
+ if (queue.Dequeue(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ if (!Queues[i].IsEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Concurrent many-many partitioned queue.
+ // Because of random partitioning reordering possible - FIFO not guaranteed!
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ class TRelaxedManyManyQueue: private TNonCopyable {
+ struct TQueueType: public TOneOneQueue<T, ChunkSize> {
+ union {
+ TAtomic WriteLock = 0;
+ char Pad1[PLATFORM_CACHE_LINE];
+ };
+ union {
+ TAtomic ReadLock = 0;
+ char Pad2[PLATFORM_CACHE_LINE];
+ };
+ };
+
+ private:
+ TQueueType Queues[Concurrency];
+
+ public:
+ using TItem = T;
+
+ template <typename TT>
+ void Enqueue(TT&& value) {
+ while (!TryEnqueue(std::forward<TT>(value))) {
+ SpinLockPause();
+ }
+ }
+
+ bool Dequeue(T& value) {
+ size_t readPos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[readPos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool dequeued = queue.Dequeue(value);
+ AtomicUnlock(&queue.ReadLock);
+ if (dequeued) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[i];
+ if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ bool empty = queue.IsEmpty();
+ AtomicUnlock(&queue.ReadLock);
+ if (!empty) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private:
+ template <typename TT>
+ bool TryEnqueue(TT&& value) {
+ size_t writePos = GetCycleCount();
+ for (size_t i = 0; i < Concurrency; ++i) {
+ TQueueType& queue = Queues[writePos++ % Concurrency];
+ if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ queue.Enqueue(std::forward<TT>(value));
+ AtomicUnlock(&queue.WriteLock);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Simple wrapper to deal with AutoPtrs
+
+ template <typename T, typename TImpl>
+ class TAutoQueueBase: private TNonCopyable {
+ private:
+ TImpl Impl;
+
+ public:
+ using TItem = TAutoPtr<T>;
+
+ ~TAutoQueueBase() {
+ TAutoPtr<T> value;
+ while (Dequeue(value)) {
+ // do nothing
+ }
+ }
+
+ void Enqueue(TAutoPtr<T> value) {
+ Impl.Enqueue(value.Get());
+ Y_UNUSED(value.Release());
+ }
+
+ bool Dequeue(TAutoPtr<T>& value) {
+ T* ptr = nullptr;
+ if (Impl.Dequeue(ptr)) {
+ value.Reset(ptr);
+ return true;
+ }
+ return false;
+ }
+
+ bool IsEmpty() {
+ return Impl.IsEmpty();
+ }
+ };
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
+
+ template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
+ using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
+
+ template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
+ using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
+}